浏览代码

- When an event notification is buffered (either because a listener
rejected it or because all listeners were busy when we attempted
to send it originally), we now rebuffer it in a way that will
result in it being retried earlier than it used to be.

Chris McDonough 18 年之前
父节点
当前提交
bb2cecb110
共有 3 个文件被更改,包括 78 次插入58 次删除
  1. 5 0
      CHANGES.txt
  2. 30 38
      src/supervisor/process.py
  3. 43 20
      src/supervisor/tests/test_process.py

+ 5 - 0
CHANGES.txt

@@ -8,6 +8,11 @@ Next Release
     'pidproxy.py' there, and place sample event listener and comm
     event programs within the directory.
 
+  - When an event notification is buffered (either because a listener
+    rejected it or because all listeners were busy when we attempted
+    to send it originally), we now rebuffer it in a way that will
+    result in it being retried earlier than it used to be.
+
 3.0a2
 
   - Fixed the README.txt example for defining the supervisor RPC

+ 30 - 38
src/supervisor/process.py

@@ -597,43 +597,9 @@ class EventListenerPool(ProcessGroupBase):
             # resend the oldest buffered event (dont rebuffer though, maintain
             # order oldest (leftmost) to newest (rightmost) in list)
             event = self.event_buffer.pop(0)
-            ok = self._dispatchEvent(event, buffer=False)
-            if ok:
-                self.config.options.logger.trace(
-                    '%s: Succeeded sending buffered event %s (bufsize %s)' % (
-                    self.config.name, event.serial, len(self.event_buffer)))
-            else:
-                self.config.options.logger.trace(
-                    '%s: Failed sending buffered event %s (bufsize %s)' % (
-                    self.config.name, event.serial, len(self.event_buffer)))
-                self.event_buffer.insert(0, event)
+            self._dispatchEvent(event)
 
-    def _eventEnvelope(self, event_type, serial, payload):
-        event_name = getEventNameByType(event_type)
-        payload_len = len(payload)
-        D = {'ver':'SUPERVISORD3.0',
-             'len':payload_len,
-             'event_name':event_name,
-             'payload':payload,
-             'serial':serial}
-        return '%(ver)s %(event_name)s %(serial)s %(len)s\n%(payload)s' % D
-
-    def _bufferEvent(self, event):
-        if isinstance(event, EventBufferOverflowEvent):
-            return # don't ever buffer EventBufferOverflowEvents
-        if len(self.event_buffer) >= self.config.buffer_size:
-            if self.event_buffer:
-                discarded_event = self.event_buffer.pop(0)
-                notify(EventBufferOverflowEvent(self, discarded_event))
-                self.config.options.logger.info(
-                    'pool %s event buffer overflowed, discarding event %s' % (
-                    (self.config.name, discarded_event.serial)))
-        self.config.options.logger.trace(
-            'pool %s busy, buffering event %s' % ((self.config.name,
-                                                   event.serial)))
-        self.event_buffer.append(event)
-
-    def _dispatchEvent(self, event, buffer=True):
+    def _dispatchEvent(self, event):
         # events are required to be instances
         event_type = event.__class__
         if not hasattr(event, 'serial'):
@@ -658,10 +624,36 @@ class EventListenerPool(ProcessGroupBase):
                     'event %s sent to listener %s' % (
                     event.serial, process.config.name))
                 return True
-        if buffer:
-            self._bufferEvent(event)
+        self._bufferEvent(event)
         return False
 
+    def _bufferEvent(self, event):
+        if isinstance(event, EventBufferOverflowEvent):
+            return # don't ever buffer EventBufferOverflowEvents
+        if len(self.event_buffer) >= self.config.buffer_size:
+            if self.event_buffer:
+                discarded_event = self.event_buffer.pop(0)
+                notify(EventBufferOverflowEvent(self, discarded_event))
+                self.config.options.logger.info(
+                    'pool %s event buffer overflowed, discarding event %s' % (
+                    (self.config.name, discarded_event.serial)))
+        self.event_buffer.insert(1, event)
+        self.config.options.logger.info(
+            'buffered event %s for pool %s (bufsize %s)' % (
+            (event.serial, self.config.name, len(self.event_buffer))))
+        # insert event into 2nd position in list so we don't block pending
+        # events for a chronically failed event notification
+
+    def _eventEnvelope(self, event_type, serial, payload):
+        event_name = getEventNameByType(event_type)
+        payload_len = len(payload)
+        D = {'ver':'SUPERVISORD3.0',
+             'len':payload_len,
+             'event_name':event_name,
+             'payload':payload,
+             'serial':serial}
+        return '%(ver)s %(event_name)s %(serial)s %(len)s\n%(payload)s' % D
+
 _num = 0
 
 def new_serial():

+ 43 - 20
src/supervisor/tests/test_process.py

@@ -1059,7 +1059,10 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         self.assertEqual(events.callbacks[1], 
             (events.EventRejectedEvent, pool.handle_rejected))
 
-    def test_handle_rejected(self):
+    def test_handle_rejected_no_overflow(self):
+        from supervisor import events
+        L = []
+        events.subscribe(events.EventBufferOverflowEvent, lambda x: L.append(x))
         options = DummyOptions()
         gconfig = DummyPGroupConfig(options)
         pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
@@ -1067,6 +1070,7 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1])
         pool = self._makeOne(gconfig)
         pool.processes = {'process1': process1}
+        pool.event_buffer = [None, None]
         class DummyEvent1:
             serial = 'abc'
         class DummyEvent2:
@@ -1075,31 +1079,45 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         dummyevent = DummyEvent2()
         dummyevent.serial = 1
         pool.handle_rejected(dummyevent)
-        self.assertEqual(pool.event_buffer, [dummyevent.event])
+        self.assertEqual(pool.event_buffer, [None, dummyevent.event, None])
+        self.assertEqual(pool.config.options.logger.data[0],
+            'buffered event abc for pool whatever (bufsize 3)')
+        self.assertEqual(len(L), 0)
         
     def test_handle_rejected_event_buffer_overflowed(self):
+        from supervisor import events
+        L = []
+        events.subscribe(events.EventBufferOverflowEvent, lambda x: L.append(x))
         options = DummyOptions()
         gconfig = DummyPGroupConfig(options)
         pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
         process1 = DummyProcess(pconfig1)
         gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1])
-        gconfig.buffer_size = 1
+        gconfig.buffer_size = 3
         pool = self._makeOne(gconfig)
         pool.processes = {'process1': process1}
-        class DummyEvent1:
-            serial = 'abc'
-        class DummyEvent2:
-            process = process1
-            event = DummyEvent1()
-        dummyevent_a = DummyEvent2()
-        dummyevent_b = DummyEvent2()
-        dummyevent_a.serial = 1
-        dummyevent_b.serial = 2
-        pool.event_buffer = [dummyevent_a]
-        pool.handle_rejected(dummyevent_b)
-        self.assertEqual(pool.event_buffer, [dummyevent_b.event])
+        class DummyEvent:
+            def __init__(self, serial):
+                self.serial = serial
+        class DummyRejectedEvent:
+            def __init__(self, serial):
+                self.process = process1
+                self.event = DummyEvent(serial)
+        event_a = DummyEvent('a')
+        event_b = DummyEvent('b')
+        event_c = DummyEvent('c')
+        rej_event = DummyRejectedEvent('rejected')
+        pool.event_buffer = [event_a, event_b, event_c]
+        pool.handle_rejected(rej_event)
+        serials = [ x.serial for x in pool.event_buffer ]
+        # we popped a, and we inserted the rejected event into the 2nd pos
+        self.assertEqual(serials, ['b', 'rejected', 'c'])
         self.assertEqual(pool.config.options.logger.data[0],
-            'pool whatever event buffer overflowed, discarding event 1')
+            'pool whatever event buffer overflowed, discarding event a')
+        self.assertEqual(len(L), 1)
+        self.assertEqual(L[0].event, event_a)
+        self.assertEqual(pool.config.options.logger.data[1],
+            'buffered event rejected for pool whatever (bufsize 3)')
 
     def test__bufferEvent_doesnt_rebufer_overflow_events(self):
         options = DummyOptions()
@@ -1151,14 +1169,15 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         from supervisor.events import StartingFromStoppedEvent
         from supervisor.states import EventListenerStates
         event = StartingFromStoppedEvent(process1)
+        event.serial = 'a'
         process1.listener_state = EventListenerStates.BUSY
-        pool.event_buffer = [event]
+        pool.event_buffer = [event, None, None, None]
         pool.transition()
         self.assertEqual(process1.transitioned, True)
-        self.assertEqual(pool.event_buffer, [event])
+        self.assertEqual(pool.event_buffer, [None, event, None, None])
         data = pool.config.options.logger.data
-        self.assertTrue(data[0].startswith(
-            'whatever: Failed sending buffered event'), data[0])
+        self.assertEqual(data[0],
+                         'buffered event a for pool whatever (bufsize 4)')
     
     def test_transition_event_proc_not_running(self):
         options = DummyOptions()
@@ -1171,6 +1190,7 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         from supervisor.events import StartingFromStoppedEvent
         from supervisor.states import EventListenerStates
         event = StartingFromStoppedEvent(process1)
+        event.serial = 1
         process1.listener_state = EventListenerStates.READY
         pool.event_buffer = [event]
         pool.transition()
@@ -1178,6 +1198,9 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         self.assertEqual(pool.event_buffer, [event])
         self.assertEqual(process1.stdin_buffer, '')
         self.assertEqual(process1.listener_state, EventListenerStates.READY)
+        data = pool.config.options.logger.data
+        self.assertEqual(data[0],
+                         'buffered event 1 for pool whatever (bufsize 1)')
 
     def test_transition_event_proc_running(self):
         options = DummyOptions()