Quellcode durchsuchen

Interactive testing of listeners.

Chris McDonough vor 18 Jahren
Ursprung
Commit
f5b5f92834

+ 11 - 5
src/supervisor/dispatchers.py

@@ -191,8 +191,7 @@ class PEventListenerDispatcher(PDispatcher):
     def __init__(self, process, channel, fd):
         self.process = process
         # the initial state of our listener is ACKNOWLEDGED; this is a
-        # "busy" state that implies we're awaiting a READY_FOR_EVENTS
-        # token.
+        # "busy" state that implies we're awaiting a READY_FOR_EVENTS_TOKEN
         self.process.listener_state = EventListenerStates.ACKNOWLEDGED
         self.process.event = None
         self.channel = channel
@@ -241,7 +240,6 @@ class PEventListenerDispatcher(PDispatcher):
                 if self.process.config.options.strip_ansi:
                     data = self.process.config.options.stripEscapes(data)
                 self.childlog.info(data)
-
         self.handle_listener_state_change()
 
     def _trace(self, msg):
@@ -279,7 +277,11 @@ class PEventListenerDispatcher(PDispatcher):
                 process.listener_state = EventListenerStates.UNKNOWN
                 self.state_buffer = ''
                 process.event = None
-            return
+            if self.state_buffer:
+                # keep going til its too short
+                self.handle_listener_state_change()
+            else:
+                return
 
         elif state == EventListenerStates.READY:
             # the process sent some spurious data, be a hardass about it
@@ -314,7 +316,11 @@ class PEventListenerDispatcher(PDispatcher):
                 process.listener_state = EventListenerStates.UNKNOWN
                 self.state_buffer = ''
                 process.event = None
-            return
+            if self.state_buffer:
+                # keep going til its too short
+                self.handle_listener_state_change()
+            else:
+                return
 
 class PInputDispatcher(PDispatcher):
     """ Input (stdin) dispatcher """

+ 31 - 3
src/supervisor/options.py

@@ -741,7 +741,8 @@ class ServerOptions(Options):
                     raise ValueError('Unknown event type %s in [%s] events' %
                                      (pool_event_name, section))
                 pool_events.append(pool_event)
-            processes=self.processes_from_section(parser, section, pool_name)
+            processes=self.processes_from_section(parser, section, pool_name,
+                                                  listener=True)
             groups.append(
                 EventListenerPoolConfig(self, pool_name, priority, processes,
                                         buffer_size, pool_events)
@@ -750,7 +751,8 @@ class ServerOptions(Options):
         groups.sort()
         return groups
 
-    def processes_from_section(self, parser, section, group_name):
+    def processes_from_section(self, parser, section, group_name,
+                               listener=False):
         programs = []
         get = parser.saneget
         program_name = section.split(':', 1)[1]
@@ -819,7 +821,12 @@ class ServerOptions(Options):
                 maxbytes = byte_size(get(section, mb_key, '50MB'))
                 logfiles[mb_key] = maxbytes
 
-            pconfig = ProcessConfig(
+            if listener:
+                klass = EventListenerConfig
+            else:
+                klass = ProcessConfig
+
+            pconfig = klass(
                 self,
                 name=expand(process_name, expansions, 'process_name'),
                 command=expand(command, expansions, 'command'),
@@ -1503,6 +1510,27 @@ class ProcessConfig(Config):
             dispatchers[stdin_fd] = PInputDispatcher(proc, 'stdin', stdin_fd)
         return dispatchers, p
 
+class EventListenerConfig(ProcessConfig):
+    def make_dispatchers(self, proc):
+        use_stderr = not self.redirect_stderr
+        p = self.options.make_pipes(use_stderr)
+        stdout_fd,stderr_fd,stdin_fd = p['stdout'],p['stderr'],p['stdin']
+        dispatchers = {}
+        from supervisor.dispatchers import PEventListenerDispatcher
+        from supervisor.dispatchers import PInputDispatcher
+        from supervisor import events
+        if stdout_fd is not None:
+            etype = events.ProcessCommunicationStdoutEvent
+            dispatchers[stdout_fd] = PEventListenerDispatcher(proc, 'stdout',
+                                                              stdout_fd)
+        if stderr_fd is not None:
+            etype = events.ProcessCommunicationStderrEvent
+            dispatchers[stderr_fd] = PEventListenerDispatcher(proc, 'stderr',
+                                                              stderr_fd)
+        if stdin_fd is not None:
+            dispatchers[stdin_fd] = PInputDispatcher(proc, 'stdin', stdin_fd)
+        return dispatchers, p
+
 class ProcessGroupConfig(Config):
     def __init__(self, options, name, priority, process_configs):
         self.options = options

+ 60 - 34
src/supervisor/process.py

@@ -45,6 +45,7 @@ class Subprocess:
     pid = 0 # Subprocess pid; 0 when not running
     config = None # ProcessConfig instance
     state = None # process state code
+    listener_state = None # listener state code
     laststart = 0 # Last time the subprocess was started; 0 if never
     laststop = 0  # Last time the subprocess was stopped; 0 if never
     delay = 0 # If nonzero, delay starting or killing until this time
@@ -135,6 +136,13 @@ class Subprocess:
         notify(event_type(self))
         self.state = new_state
 
+    def _assertInState(self, *states):
+        if self.state not in states:
+            current_state = getProcessStateDescription(self.state)
+            allowable_states = ' '.join(map(getProcessStateDescription, states))
+            raise AssertionError('Assertion failed for %s: %s not in %s' %  (
+                self.config.name, current_state, allowable_states))
+
     def record_spawnerr(self, msg):
         now = time.time()
         self.spawnerr = msg
@@ -163,15 +171,16 @@ class Subprocess:
         
         self.laststart = time.time()
 
-        assert(self.state in [ProcessStates.EXITED, ProcessStates.FATAL,
-                              ProcessStates.BACKOFF, ProcessStates.STOPPED])
+        self._assertInState(ProcessStates.EXITED, ProcessStates.FATAL,
+                            ProcessStates.BACKOFF, ProcessStates.STOPPED)
+
         self.change_state(ProcessStates.STARTING)
 
         try:
             filename, argv = self.get_execv_args()
         except ProcessException, what:
             self.record_spawnerr(what.args[0])
-            assert(self.state == ProcessStates.STARTING)
+            self._assertInState(ProcessStates.STARTING)
             self.change_state(ProcessStates.BACKOFF)
             return
 
@@ -185,7 +194,7 @@ class Subprocess:
             else:
                 msg = 'unknown error: %s' % errno.errorcode.get(code, code)
             self.record_spawnerr(msg)
-            assert(self.state == ProcessStates.STARTING)
+            self._assertInState(ProcessStates.STARTING)
             self.change_state(ProcessStates.BACKOFF)
             return
 
@@ -200,7 +209,7 @@ class Subprocess:
                 msg = 'unknown error: %s' % errno.errorcode.get(code, code)
 
             self.record_spawnerr(msg)
-            assert(self.state == ProcessStates.STARTING)
+            self._assertInState(ProcessStates.STARTING)
             self.change_state(ProcessStates.BACKOFF)
             options.close_parent_pipes(self.pipes)
             options.close_child_pipes(self.pipes)
@@ -270,7 +279,7 @@ class Subprocess:
         self.delay = 0
         self.backoff = 0
         self.system_stop = 1
-        assert(self.state == ProcessStates.BACKOFF)
+        self._assertInState(ProcessStates.BACKOFF)
         self.change_state(ProcessStates.FATAL)
 
     def kill(self, sig):
@@ -294,7 +303,7 @@ class Subprocess:
             # RUNNING -> STOPPING
             self.killing = 1
             self.delay = now + self.config.stopwaitsecs
-            assert(self.state in [ProcessStates.RUNNING,ProcessStates.STARTING])
+            self._assertInState(ProcessStates.RUNNING,ProcessStates.STARTING)
             self.change_state(ProcessStates.STOPPING)
             options.kill(self.pid, sig)
         except (AssertionError, NotImplementedError):
@@ -338,7 +347,7 @@ class Subprocess:
             self.delay = 0
             self.exitstatus = es
             msg = "stopped: %s (%s)" % (processname, msg)
-            assert(self.state == ProcessStates.STOPPING)
+            self._assertInState(ProcessStates.STOPPING)
             self.change_state(ProcessStates.STOPPED)
         elif expected:
             # this finish was not the result of a stop request, but
@@ -348,23 +357,24 @@ class Subprocess:
             self.backoff = 0
             self.exitstatus = es
             msg = "exited: %s (%s)" % (processname, msg + "; expected")
-            assert(self.state == ProcessStates.RUNNING)
+            self._assertInState(ProcessStates.RUNNING)
             self.change_state(ProcessStates.EXITED)
         else:
             # the program did not stay up long enough or exited with
             # an unexpected exit code
-            # implies STARTING -> BACKOFF
             self.exitstatus = None
             self.backoff = self.backoff + 1
             self.delay = now + self.backoff
             if tooquickly:
                 self.spawnerr = (
                     'Exited too quickly (process log may have details)')
+                self._assertInState(ProcessStates.STARTING)
+                self.change_state(ProcessStates.BACKOFF)
             elif badexit:
                 self.spawnerr = 'Bad exit code %s' % es
+                self._assertInState(ProcessStates.RUNNING)
+                self.change_state(ProcessStates.EXITED)
             msg = "exited: %s (%s)" % (processname, msg + "; not expected")
-            assert(self.state == ProcessStates.STARTING)
-            self.change_state(ProcessStates.BACKOFF)
 
         self.config.options.logger.info(msg)
 
@@ -421,7 +431,7 @@ class Subprocess:
                     'entered RUNNING state, process has stayed up for '
                     '> than %s seconds (startsecs)' % self.config.startsecs)
                 logger.info('success: %s %s' % (self.config.name, msg))
-                assert(self.state == ProcessStates.STARTING)
+                self._assertInState(ProcessStates.STARTING)
                 self.change_state(ProcessStates.RUNNING)
         
 class ProcessGroupBase:
@@ -525,7 +535,6 @@ class ProcessGroup(ProcessGroupBase):
             proc.transition()
 
 class EventListenerPool(ProcessGroupBase):
-    serializers = {}
     def __init__(self, config):
         ProcessGroupBase.__init__(self, config)
         self.event_buffer = []
@@ -550,6 +559,9 @@ class EventListenerPool(ProcessGroupBase):
             event = self.event_buffer.pop(0)
             ok = self._dispatchEvent(event, buffer=False)
             if not ok:
+                self.config.options.logger.log(self.config.options.TRACE,
+                                               'Failed sending buffered event '
+                                               '%s' % event)
                 self.event_buffer.insert(0, event)
 
     def _eventEnvelope(self, event, payload):
@@ -560,17 +572,22 @@ class EventListenerPool(ProcessGroupBase):
         return 'SUPERVISORD%(ver)s %(event_name)s %(len)s\n%(payload)s' % D
 
     def _bufferEvent(self, event):
-        if isinstance(EventBufferOverflowEvent, event):
+        if isinstance(event, EventBufferOverflowEvent):
             return # don't ever buffer EventBufferOverflowEvents
         if len(self.event_buffer) >= self.config.buffer_size:
             discarded_event = self.event_buffer.pop(0)
             notify(EventBufferOverflowEvent(self, discarded_event))
+        self.config.options.logger.log(self.config.options.TRACE,
+                                       'pool %s busy, buffering event %s' % (
+                                           (self.config.name, event)))
         self.event_buffer.append(event)
 
     def _dispatchEvent(self, event, buffer=True):
         # events are required to be instances
-        etype = event.__class__
-        serializer = self.serializers.get(etype, None)
+        serializer = None
+        for klass, callback in serializers.items():
+            if isinstance(event, klass):
+                serializer = callback
         if serializer is None:
             # this is a system programming error, we must handle
             # all events
@@ -578,7 +595,11 @@ class EventListenerPool(ProcessGroupBase):
         for process in self.processes.values():
             if process.listener_state == EventListenerStates.READY:
                 payload = serializer(event)
-                process.write(self._eventEnvelope(event, payload))
+                try:
+                    process.write(self._eventEnvelope(event, payload))
+                except IOError, why:
+                    if why[0] == errno.EPIPE:
+                        continue
                 process.listener_state = EventListenerStates.BUSY
                 process.event = event
                 return True
@@ -587,22 +608,27 @@ class EventListenerPool(ProcessGroupBase):
             self._bufferEvent(event)
         return False
 
-    def serialize_pcomm_event(self, event):
-         return 'process_name: %s\nchannel: %s\n%s' % (
-             event.process_name,
-             event.channel,
-             event.data)
-    serializers[events.ProcessCommunicationEvent] = serialize_pcomm_event
-
-    def serialize_overflow_event(self, event):
-        name = event.group.config.name
-        typ = getEventNameByType(event.event)
-        return 'group_name: %s\nevent_type: %s' % (name, typ)
-    serializers[events.EventBufferOverflowEvent] = serialize_overflow_event
-
-    def serialize_statechange_event(self, event):
-        pass
-    serializers[events.ProcessStateChangeEvent] = serialize_statechange_event
+serializers = {}
+def pcomm_event(event):
+    return 'process_name: %s\nchannel: %s\n%s' % (
+        event.process_name,
+        event.channel,
+        event.data)
+serializers[events.ProcessCommunicationEvent] = pcomm_event
+
+def overflow_event(event):
+    name = event.group.config.name
+    typ = getEventNameByType(event.event)
+    return 'group_name: %s\nevent_type: %s' % (name, typ)
+serializers[events.EventBufferOverflowEvent] = overflow_event
+
+def proc_sc_event(event):
+    return ''
+serializers[events.ProcessStateChangeEvent] = proc_sc_event
+
+def supervisor_sc_event(event):
+    return ''
+serializers[events.SupervisorStateChangeEvent] = supervisor_sc_event
 
             
     

+ 0 - 1
src/supervisor/supervisord.py

@@ -212,7 +212,6 @@ class Supervisor:
             if process is None:
                 self.options.logger.critical('reaped unknown pid %s)' % pid)
             else:
-                name = process.config.name
                 process.finish(pid, sts)
                 del self.options.pidhistory[pid]
             if not once:

+ 5 - 5
src/supervisor/tests/test_dispatchers.py

@@ -421,13 +421,13 @@ class PEventListenerDispatcherTests(unittest.TestCase):
         from supervisor.dispatchers import EventListenerStates
         process.listener_state = EventListenerStates.ACKNOWLEDGED
         dispatcher = self._makeOne(process)
-        options.readfd_result = dispatcher.READY_FOR_EVENTS_TOKEN + 'abc'
+        options.readfd_result = dispatcher.READY_FOR_EVENTS_TOKEN
         self.assertEqual(dispatcher.handle_read_event(), None)
         self.assertEqual(process.listener_state, EventListenerStates.READY)
-        self.assertEqual(dispatcher.state_buffer, 'abc')
+        self.assertEqual(dispatcher.state_buffer, '')
         self.assertEqual(len(dispatcher.childlog.data), 1)
         self.assertEqual(dispatcher.childlog.data[0],
-                         dispatcher.READY_FOR_EVENTS_TOKEN + 'abc')
+                         dispatcher.READY_FOR_EVENTS_TOKEN)
 
     def test_handle_listener_state_change_from_unknown(self):
         options = DummyOptions()
@@ -449,9 +449,9 @@ class PEventListenerDispatcherTests(unittest.TestCase):
         from supervisor.dispatchers import EventListenerStates
         dispatcher = self._makeOne(process)
         process.listener_state = EventListenerStates.ACKNOWLEDGED
-        dispatcher.state_buffer = 'READY\nabc'
+        dispatcher.state_buffer = 'READY\n'
         self.assertEqual(dispatcher.handle_listener_state_change(), None)
-        self.assertEqual(dispatcher.state_buffer, 'abc')
+        self.assertEqual(dispatcher.state_buffer, '')
         self.assertEqual(options.logger.data,
                          [5, 'process1: ACKNOWLEDGED -> READY'])
         self.assertEqual(process.listener_state, EventListenerStates.READY)