|
@@ -565,9 +565,10 @@ class EventListenerPool(ProcessGroupBase):
|
|
|
event = self.event_buffer.pop(0)
|
|
|
ok = self._dispatchEvent(event, buffer=False)
|
|
|
if not ok:
|
|
|
- self.config.options.logger.log(self.config.options.TRACE,
|
|
|
- 'Failed sending buffered event '
|
|
|
- '%s' % event.serial)
|
|
|
+ self.config.options.logger.log(
|
|
|
+ self.config.options.TRACE,
|
|
|
+ 'Failed sending buffered event %s (bufsize %s)' % (
|
|
|
+ event.serial, len(self.event_buffer)))
|
|
|
self.event_buffer.insert(0, event)
|
|
|
|
|
|
def _eventEnvelope(self, event_type, serial, payload):
|
|
@@ -597,22 +598,14 @@ class EventListenerPool(ProcessGroupBase):
|
|
|
|
|
|
def _dispatchEvent(self, event, buffer=True):
|
|
|
# events are required to be instances
|
|
|
- serializer = None
|
|
|
event_type = event.__class__
|
|
|
- for klass, callback in serializers.items():
|
|
|
- if isinstance(event, klass):
|
|
|
- serializer = callback
|
|
|
- if serializer is None:
|
|
|
- # this is a system programming error, we must handle
|
|
|
- # all events
|
|
|
- raise NotImplementedError(etype)
|
|
|
if not hasattr(event, 'serial'):
|
|
|
event.serial = new_serial()
|
|
|
for process in self.processes.values():
|
|
|
if process.state != ProcessStates.RUNNING:
|
|
|
continue
|
|
|
if process.listener_state == EventListenerStates.READY:
|
|
|
- payload = serializer(event)
|
|
|
+ payload = str(event)
|
|
|
try:
|
|
|
serial = event.serial
|
|
|
envelope = self._eventEnvelope(event_type, serial, payload)
|
|
@@ -626,7 +619,8 @@ class EventListenerPool(ProcessGroupBase):
|
|
|
process.event = event
|
|
|
self.config.options.logger.log(
|
|
|
self.config.options.TRACE,
|
|
|
- 'event %s sent to listener' % event.serial)
|
|
|
+ 'event %s sent to listener %s' % (
|
|
|
+ event.serial, process.config.name))
|
|
|
return True
|
|
|
if buffer:
|
|
|
self._bufferEvent(event)
|
|
@@ -640,28 +634,5 @@ def new_serial():
|
|
|
_num = _num + 1
|
|
|
return val
|
|
|
|
|
|
-serializers = {}
|
|
|
-def pcomm_event(event):
|
|
|
- return 'process_name: %s\nchannel: %s\n%s' % (
|
|
|
- event.process.config.name,
|
|
|
- event.channel,
|
|
|
- event.data)
|
|
|
-serializers[events.ProcessCommunicationEvent] = pcomm_event
|
|
|
-
|
|
|
-def overflow_event(event):
|
|
|
- name = event.group.config.name
|
|
|
- typ = getEventNameByType(event.event)
|
|
|
- return 'group_name: %s\nevent_type: %s' % (name, typ)
|
|
|
-serializers[events.EventBufferOverflowEvent] = overflow_event
|
|
|
-
|
|
|
-def proc_sc_event(event):
|
|
|
- return 'process_name: %s' % event.process.config.name
|
|
|
-
|
|
|
-serializers[events.ProcessStateChangeEvent] = proc_sc_event
|
|
|
-
|
|
|
-def supervisor_sc_event(event):
|
|
|
- return ''
|
|
|
-serializers[events.SupervisorStateChangeEvent] = supervisor_sc_event
|
|
|
-
|
|
|
|
|
|
|