|
@@ -570,14 +570,15 @@ class EventListenerPool(ProcessGroupBase):
|
|
|
'%s' % event)
|
|
|
self.event_buffer.insert(0, event)
|
|
|
|
|
|
- def _eventEnvelope(self, event_type, payload):
|
|
|
+ def _eventEnvelope(self, event_type, serial, payload):
|
|
|
event_name = getEventNameByType(event_type)
|
|
|
payload_len = len(payload)
|
|
|
- D = {'ver':'3.0',
|
|
|
+ D = {'ver':'SUPERVISORD3.0',
|
|
|
'len':payload_len,
|
|
|
'event_name':event_name,
|
|
|
- 'payload':payload}
|
|
|
- return 'SUPERVISORD%(ver)s %(event_name)s %(len)s\n%(payload)s' % D
|
|
|
+ 'payload':payload,
|
|
|
+ 'serial':serial}
|
|
|
+ return '%(ver)s %(event_name)s %(serial)s %(len)s\n%(payload)s' % D
|
|
|
|
|
|
def _bufferEvent(self, event):
|
|
|
if isinstance(event, EventBufferOverflowEvent):
|
|
@@ -605,13 +606,16 @@ class EventListenerPool(ProcessGroupBase):
|
|
|
# this is a system programming error, we must handle
|
|
|
# all events
|
|
|
raise NotImplementedError(etype)
|
|
|
+ if not hasattr(event, 'serial'):
|
|
|
+ event.serial = new_serial()
|
|
|
for process in self.processes.values():
|
|
|
if process.state != ProcessStates.RUNNING:
|
|
|
continue
|
|
|
if process.listener_state == EventListenerStates.READY:
|
|
|
payload = serializer(event)
|
|
|
try:
|
|
|
- envelope = self._eventEnvelope(event_type, payload)
|
|
|
+ serial = event.serial
|
|
|
+ envelope = self._eventEnvelope(event_type, serial, payload)
|
|
|
process.write(envelope)
|
|
|
except IOError, why:
|
|
|
if why[0] != errno.EPIPE:
|
|
@@ -625,6 +629,14 @@ class EventListenerPool(ProcessGroupBase):
|
|
|
self._bufferEvent(event)
|
|
|
return False
|
|
|
|
|
|
+_num = 0
|
|
|
+
|
|
|
+def new_serial():
|
|
|
+ global _num
|
|
|
+ val = _num
|
|
|
+ _num = _num + 1
|
|
|
+ return val
|
|
|
+
|
|
|
serializers = {}
|
|
|
def pcomm_event(event):
|
|
|
return 'process_name: %s\nchannel: %s\n%s' % (
|