فهرست منبع

- When a listener process exits (unexpectedly) before transitioning
from the BUSY state, rebuffer the event that was being processed.

Chris McDonough 18 سال پیش
والد
کامیت
53babbbe64
5فایلهای تغییر یافته به همراه51 افزوده شده و 21 حذف شده
  1. 3 0
      CHANGES.txt
  2. 10 10
      src/supervisor/dispatchers.py
  3. 16 10
      src/supervisor/process.py
  4. 1 1
      src/supervisor/tests/base.py
  5. 21 0
      src/supervisor/tests/test_process.py

+ 3 - 0
CHANGES.txt

@@ -13,6 +13,9 @@ Next Release
     to send it originally), we now rebuffer it in a way that will
     result in it being retried earlier than it used to be.
 
+  - When a listener process exits (unexpectedly) before transitioning
+    from the BUSY state, rebuffer the event that was being processed.
+
 3.0a2
 
   - Fixed the README.txt example for defining the supervisor RPC

+ 10 - 10
src/supervisor/dispatchers.py

@@ -261,7 +261,6 @@ class PEventListenerDispatcher(PDispatcher):
         return False
     
     def readable(self):
-        self.handle_listener_state_change()
         if self.closed:
             return False
         return True
@@ -286,14 +285,15 @@ class PEventListenerDispatcher(PDispatcher):
         self.handle_listener_state_change()
 
     def handle_listener_state_change(self):
-        process = self.process
-        procname = process.config.name
-        state = process.listener_state
         data = self.state_buffer
 
         if not data:
             return
 
+        process = self.process
+        procname = process.config.name
+        state = process.listener_state
+
         if state == EventListenerStates.UNKNOWN:
             # this is a fatal state
             self.state_buffer = ''
@@ -306,13 +306,13 @@ class PEventListenerDispatcher(PDispatcher):
                 return
             elif data.startswith(self.READY_FOR_EVENTS_TOKEN):
                 msg = '%s: ACKNOWLEDGED -> READY' % procname
-                self.process.config.options.logger.trace(msg)
+                process.config.options.logger.trace(msg)
                 process.listener_state = EventListenerStates.READY
                 self.state_buffer = self.state_buffer[tokenlen:]
                 process.event = None
             else:
                 msg = '%s: ACKNOWLEDGED -> UNKNOWN' % procname
-                self.process.config.options.logger.trace(msg)
+                process.config.options.logger.trace(msg)
                 process.listener_state = EventListenerStates.UNKNOWN
                 self.state_buffer = ''
                 process.event = None
@@ -325,7 +325,7 @@ class PEventListenerDispatcher(PDispatcher):
         elif state == EventListenerStates.READY:
             # the process sent some spurious data, be a hardass about it
             msg = '%s: READY -> UNKNOWN' % procname
-            self.process.config.options.logger.trace(msg)
+            process.config.options.logger.trace(msg)
             process.listener_state = EventListenerStates.UNKNOWN
             self.state_buffer = ''
             process.event = None
@@ -337,14 +337,14 @@ class PEventListenerDispatcher(PDispatcher):
                 return
             elif data.startswith(self.EVENT_PROCESSED_TOKEN):
                 msg = '%s: BUSY -> ACKNOWLEDGED (processed)' % procname
-                self.process.config.options.logger.trace(msg)
+                process.config.options.logger.trace(msg)
                 tokenlen = len(self.EVENT_PROCESSED_TOKEN)
                 self.state_buffer = self.state_buffer[tokenlen:]
                 process.listener_state = EventListenerStates.ACKNOWLEDGED
                 process.event = None
             elif data.startswith(self.EVENT_REJECTED_TOKEN):
                 msg = '%s: BUSY -> ACKNOWLEDGED (rejected)' % procname
-                self.process.config.options.logger.trace(msg)
+                process.config.options.logger.trace(msg)
                 tokenlen = len(self.EVENT_REJECTED_TOKEN)
                 self.state_buffer = self.state_buffer[tokenlen:]
                 process.listener_state = EventListenerStates.ACKNOWLEDGED
@@ -352,7 +352,7 @@ class PEventListenerDispatcher(PDispatcher):
                 process.event = None
             else:
                 msg = '%s: BUSY -> UNKNOWN' % procname
-                self.process.config.options.logger.trace(msg)
+                process.config.options.logger.trace(msg)
                 process.listener_state = EventListenerStates.UNKNOWN
                 self.state_buffer = ''
                 notify(EventRejectedEvent(process, process.event))

+ 16 - 10
src/supervisor/process.py

@@ -31,10 +31,7 @@ from supervisor.options import signame
 from supervisor.options import ProcessException
 
 from supervisor.dispatchers import EventListenerStates
-from supervisor.events import getEventNameByType
-from supervisor.events import EventBufferOverflowEvent
-from supervisor.events import notify
-from supervisor.events import subscribe
+
 from supervisor import events
 
 from supervisor.datatypes import RestartUnconditionally
@@ -50,6 +47,7 @@ class Subprocess:
     config = None # ProcessConfig instance
     state = None # process state code
     listener_state = None # listener state code (if we're an event listener)
+    event = None # event currently being processed (if we're an event listener)
     laststart = 0 # Last time the subprocess was started; 0 if never
     laststop = 0  # Last time the subprocess was stopped; 0 if never
     delay = 0 # If nonzero, delay starting or killing until this time
@@ -145,7 +143,7 @@ class Subprocess:
         if new_state is old_state:
             return False
         event_type = events.getProcessStateChangeEventType(old_state, new_state)
-        notify(event_type(self))
+        events.notify(event_type(self))
         self.state = new_state
 
     def _assertInState(self, *states):
@@ -413,6 +411,13 @@ class Subprocess:
         self.pipes = {}
         self.dispatchers = {}
 
+        # if we died before we processed the current event (only happens
+        # if we're an event listener), notify the event system that this
+        # event was rejected so it can be processed again.
+        if self.event is not None:
+            events.notify(events.EventRejectedEvent(self, self.event))
+            self.event = None
+
     def set_uid(self):
         if self.config.uid is None:
             return
@@ -579,8 +584,8 @@ class EventListenerPool(ProcessGroupBase):
         ProcessGroupBase.__init__(self, config)
         self.event_buffer = []
         for event_type in self.config.pool_events:
-            subscribe(event_type, self._dispatchEvent)
-        subscribe(events.EventRejectedEvent, self.handle_rejected)
+            events.subscribe(event_type, self._dispatchEvent)
+        events.subscribe(events.EventRejectedEvent, self.handle_rejected)
 
     def handle_rejected(self, event):
         process = event.process
@@ -628,12 +633,13 @@ class EventListenerPool(ProcessGroupBase):
         return False
 
     def _bufferEvent(self, event):
-        if isinstance(event, EventBufferOverflowEvent):
+        if isinstance(event, events.EventBufferOverflowEvent):
             return # don't ever buffer EventBufferOverflowEvents
         if len(self.event_buffer) >= self.config.buffer_size:
             if self.event_buffer:
                 discarded_event = self.event_buffer.pop(0)
-                notify(EventBufferOverflowEvent(self, discarded_event))
+                events.notify(events.EventBufferOverflowEvent(self,
+                                                              discarded_event))
                 self.config.options.logger.info(
                     'pool %s event buffer overflowed, discarding event %s' % (
                     (self.config.name, discarded_event.serial)))
@@ -645,7 +651,7 @@ class EventListenerPool(ProcessGroupBase):
         # events for a chronically failed event notification
 
     def _eventEnvelope(self, event_type, serial, payload):
-        event_name = getEventNameByType(event_type)
+        event_name = events.getEventNameByType(event_type)
         payload_len = len(payload)
         D = {'ver':'SUPERVISORD3.0',
              'len':payload_len,

+ 1 - 1
src/supervisor/tests/base.py

@@ -849,7 +849,7 @@ class DummyStream:
         return len(self.written)
         
 class DummyEvent:
-    pass
+    serial = 'abc'
         
 def lstrip(s):
     strings = [x.strip() for x in s.split('\n')]

+ 21 - 0
src/supervisor/tests/test_process.py

@@ -10,6 +10,7 @@ from supervisor.tests.base import DummyPConfig
 from supervisor.tests.base import DummyProcess
 from supervisor.tests.base import DummyPGroupConfig
 from supervisor.tests.base import DummyDispatcher
+from supervisor.tests.base import DummyEvent
 
 class SubprocessTests(unittest.TestCase):
     def _getTargetClass(self):
@@ -714,6 +715,26 @@ class SubprocessTests(unittest.TestCase):
         self.assertEqual(instance.exitstatus, None)
         self.assertEqual(L[0].__class__, events.BackoffFromStartingEvent)
 
+    def test_finish_with_current_event_sends_rejected(self):
+        from supervisor import events
+        L = []
+        events.subscribe(events.ExitedFromRunningEvent, lambda x: L.append(x))
+        events.subscribe(events.EventRejectedEvent, lambda x: L.append(x))
+        options = DummyOptions()
+        config = DummyPConfig(options, 'notthere', '/notthere',
+                              stdout_logfile='/tmp/foo', startsecs=10)
+        instance = self._makeOne(config)
+        from supervisor.states import ProcessStates
+        instance.state = ProcessStates.RUNNING
+        event = DummyEvent()
+        instance.event = event
+        instance.finish(123, 1)
+        self.assertEqual(L[0].__class__, events.ExitedFromRunningEvent)
+        self.assertEqual(L[1].__class__, events.EventRejectedEvent)
+        self.assertEqual(L[1].process, instance)
+        self.assertEqual(L[1].event, event)
+        self.assertEqual(instance.event, None)
+
     def test_set_uid_no_uid(self):
         options = DummyOptions()
         config = DummyPConfig(options, 'test', '/test')