Browse Source

Add PEventListenerDispatcher (handles event listener process state changes).

Chris McDonough 18 years ago
parent
commit
4a8173ba28

+ 147 - 1
src/supervisor/dispatchers.py

@@ -30,7 +30,9 @@ class PDispatcher:
         raise NotImplementedError
 
 class POutputDispatcher(PDispatcher):
-    """ Output (stdout/stderr) dispatcher """
+    """ Output (stdout/stderr) dispatcher, capture output sent within
+    <!--XSUPERVISOR:BEGIN--><!--XSUPERVISOR:END--> tags and notify
+    with a ProcessCommunicationEvent """
 
     process = None # process which "owns" this dispatcher
     channel = None # 'stderr' or 'stdout'
@@ -172,6 +174,139 @@ class POutputDispatcher(PDispatcher):
         self.output_buffer += data
         self.record_output()
 
+class PEventListenerDispatcher(PDispatcher):
+    """ An output dispatcher that monitors and changes listener_states """
+    process = None # process which "owns" this dispatcher
+    channel = None # 'stderr' or 'stdout'
+    childlog = None # the logger
+    state_buffer = ''  # data waiting to be reviewed for state changes
+
+    READY_FOR_EVENTS_TOKEN = 'READY\n'
+    EVENT_PROCESSED_TOKEN = 'OK\n'
+    EVENT_REJECTED_TOKEN = 'FAIL\n'
+
+    def __init__(self, process, channel, fd):
+        self.process = process
+        # the initial state of our listener is ACKNOWLEDGED; this is a
+        # "busy" state that implies we're awaiting a READY_FOR_EVENTS
+        # token
+        self.process.listener_state = EventListenerStates.ACKNOWLEDGED
+        self.channel = channel
+        self.fd = fd
+
+        logfile = getattr(process.config, '%s_logfile' % channel)
+
+        if logfile:
+            maxbytes = getattr(process.config, '%s_logfile_maxbytes' % channel)
+            backups = getattr(process.config, '%s_logfile_backups' % channel)
+            self.childlog = process.config.options.getLogger(
+                logfile,
+                logging.INFO,
+                '%(message)s',
+                rotating=not not maxbytes, # optimization
+                maxbytes=maxbytes,
+                backups=backups)
+    
+    def removelogs(self):
+        if self.childlog is not None:
+            for handler in self.childlog.handlers:
+                handler.remove()
+                handler.reopen()
+
+    def reopenlogs(self):
+        if self.childlog is not None:
+            for handler in self.childlog.handlers:
+                handler.reopen()
+
+
+    def writable(self):
+        return False
+    
+    def readable(self):
+        return True
+
+    def handle_read_event(self):
+        data = self.process.config.options.readfd(self.fd)
+        if data:
+            self.state_buffer += data
+            procname = self.process.config.name
+            msg = '%r %s output:\n%s' % (procname, self.channel, data)
+            self._trace(msg)
+
+            if self.childlog:
+                if self.process.config.options.strip_ansi:
+                    data = self.process.config.options.stripEscapes(data)
+                self.childlog.info(data)
+
+        self.handle_listener_state_change()
+
+    def _trace(self, msg):
+        TRACE = self.process.config.options.TRACE
+        self.process.config.options.logger.log(TRACE, msg)
+        
+    def handle_listener_state_change(self):
+        process = self.process
+        procname = process.config.name
+        state = process.listener_state
+        data = self.state_buffer
+
+        if not data:
+            return
+
+        if state == EventListenerStates.UNKNOWN:
+            # this is a fatal state
+            self.state_buffer = ''
+            return
+
+        if state == EventListenerStates.ACKNOWLEDGED:
+            tokenlen = len(self.READY_FOR_EVENTS_TOKEN)
+            if len(data) < tokenlen:
+                # not enough info to make a decision
+                return
+            elif data.startswith(self.READY_FOR_EVENTS_TOKEN):
+                msg = '%s: ACKNOWLEDGED -> READY' % procname
+                self._trace(msg)
+                process.listener_state = EventListenerStates.READY
+                self.state_buffer = self.state_buffer[tokenlen:]
+            else:
+                msg = '%s: ACKNOWLEDGED -> UNKNOWN' % procname
+                self._trace(msg)
+                process.listener_state = EventListenerStates.UNKNOWN
+                self.state_buffer = ''
+            return
+
+        elif state == EventListenerStates.READY:
+            # the process sent some spurious data, be a hardass about it
+            msg = '%s: READY -> UNKNOWN' % procname
+            self._trace(msg)
+            process.listener_state = EventListenerStates.UNKNOWN
+            self.state_buffer = ''
+            return
+                
+        elif state == EventListenerStates.BUSY:
+            if data.find('\n') == -1:
+                # we can't make a determination yet
+                return
+            elif data.startswith(self.EVENT_PROCESSED_TOKEN):
+                msg = '%s: BUSY -> ACKNOWLEDGED (processed)' % procname
+                self._trace(msg)
+                tokenlen = len(self.EVENT_PROCESSED_TOKEN)
+                self.state_buffer = self.state_buffer[tokenlen:]
+                process.listener_state = EventListenerStates.ACKNOWLEDGED
+            elif data.startswith(self.EVENT_REJECTED_TOKEN):
+                msg = '%s: BUSY -> ACKNOWLEDGED (rejected)' % procname
+                self._trace(msg)
+                # XXX push the event back into the notification queue
+                tokenlen = len(self.EVENT_REJECTED_TOKEN)
+                self.state_buffer = self.state_buffer[tokenlen:]
+                process.listener_state = EventListenerStates.ACKNOWLEDGED
+            else:
+                msg = '%s: BUSY -> UNKNOWN' % procname
+                self._trace(msg)
+                process.listener_state = EventListenerStates.UNKNOWN
+                self.state_buffer = ''
+            return
+
 class PInputDispatcher(PDispatcher):
     """ Input (stdin) dispatcher """
     process = None # process which "owns" this dispatcher
@@ -207,3 +342,14 @@ class PInputDispatcher(PDispatcher):
                 else:
                     raise
 
+class EventListenerStates:
+    READY = 10 # the process ready to be sent an event from supervisor
+    BUSY = 20 # event listener is processing an event sent to it by supervisor
+    ACKNOWLEDGED = 30 # the event listener processed an event
+    UNKNOWN = 40 # the event listener is in an unknown state
+
+def getEventListenerStateDescription(code):
+    for statename in EventListenerStates.__dict__:
+        if getattr(EventListenerStates, statename) == code:
+            return statename
+

+ 0 - 4
src/supervisor/events.py

@@ -37,7 +37,3 @@ EVENT_NAMES = {
     'PROCESS_COMMUNICATION_EVENT':ProcessCommunicationEvent,
     }
 
-class EventListenerStates:
-    READY = 10
-    BUSY = 20
-    

+ 1 - 2
src/supervisor/process.py

@@ -24,8 +24,6 @@ import signal
 from supervisor.options import decode_wait_status
 from supervisor.options import signame
 from supervisor.options import ProcessException
-from supervisor.dispatchers import POutputDispatcher
-from supervisor.dispatchers import PInputDispatcher
 
 class ProcessStates:
     STOPPED = 0
@@ -516,3 +514,4 @@ class EventListenerPool(ProcessGroupBase):
         for proc in self.processes.values():
             proc.transition()
 
+    

+ 277 - 0
src/supervisor/tests/test_dispatchers.py

@@ -346,6 +346,283 @@ class PInputDispatcherTests(unittest.TestCase):
 
 
 
+class PEventListenerDispatcherTests(unittest.TestCase):
+    def _getTargetClass(self):
+        from supervisor.dispatchers import PEventListenerDispatcher
+        return PEventListenerDispatcher
+
+    def _makeOne(self, process):
+        channel = 'stdout'
+        return self._getTargetClass()(process, channel, 0)
+
+    def test_writable(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertEqual(dispatcher.writable(), False)
+        
+    def test_readable(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertEqual(dispatcher.readable(), True)
+
+    def test_handle_write_event(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertRaises(NotImplementedError, dispatcher.handle_write_event)
+
+    def test_handle_read_event_nodata(self):
+        options = DummyOptions()
+        options.readfd_result = ''
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertEqual(dispatcher.handle_read_event(), None)
+        self.assertEqual(dispatcher.state_buffer, '')
+        from supervisor.dispatchers import EventListenerStates
+        self.assertEqual(dispatcher.process.listener_state,
+                         EventListenerStates.ACKNOWLEDGED)
+
+    def test_handle_read_event_logging_nologs(self):
+        options = DummyOptions()
+        options.readfd_result = 'supercalifragilisticexpialidocious'
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        # just make sure there are no errors if a child logger doesnt
+        # exist
+        self.assertEqual(dispatcher.handle_read_event(), None)
+        self.assertEqual(dispatcher.childlog, None)
+
+    def test_handle_read_event_logging_childlog(self):
+        options = DummyOptions()
+        options.readfd_result = 'supercalifragilisticexpialidocious'
+        config = DummyPConfig(options, 'process1', '/bin/process1',
+                              stdout_logfile='/tmp/foo')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertEqual(dispatcher.handle_read_event(), None)
+        self.assertEqual(len(dispatcher.childlog.data), 1)
+        self.assertEqual(dispatcher.childlog.data[0],
+                         'supercalifragilisticexpialidocious')
+
+    def test_handle_read_event_calls_handle_listener_state_change(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1',
+                              stdout_logfile='/tmp/foo')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        process.listener_state = EventListenerStates.ACKNOWLEDGED
+        dispatcher = self._makeOne(process)
+        options.readfd_result = dispatcher.READY_FOR_EVENTS_TOKEN + 'abc'
+        self.assertEqual(dispatcher.handle_read_event(), None)
+        self.assertEqual(process.listener_state, EventListenerStates.READY)
+        self.assertEqual(dispatcher.state_buffer, 'abc')
+        self.assertEqual(len(dispatcher.childlog.data), 1)
+        self.assertEqual(dispatcher.childlog.data[0],
+                         dispatcher.READY_FOR_EVENTS_TOKEN + 'abc')
+
+    def test_handle_listener_state_change_from_unknown(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.UNKNOWN
+        dispatcher.state_buffer = 'whatever'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, '')
+        self.assertEqual(options.logger.data, [])
+        self.assertEqual(process.listener_state, EventListenerStates.UNKNOWN)
+
+    def test_handle_listener_state_change_acknowledged_to_ready(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.ACKNOWLEDGED
+        dispatcher.state_buffer = 'READY\nabc'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, 'abc')
+        self.assertEqual(options.logger.data,
+                         [5, 'process1: ACKNOWLEDGED -> READY'])
+        self.assertEqual(process.listener_state, EventListenerStates.READY)
+
+    def test_handle_listener_state_change_acknowledged_to_insufficient(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.ACKNOWLEDGED
+        dispatcher.state_buffer = 'RE'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, 'RE')
+        self.assertEqual(options.logger.data, [])
+        self.assertEqual(process.listener_state,
+                         EventListenerStates.ACKNOWLEDGED)
+
+    def test_handle_listener_state_change_acknowledged_to_unknown(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.ACKNOWLEDGED
+        dispatcher.state_buffer = 'bogus data yo'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, '')
+        self.assertEqual(options.logger.data,
+                         [5, 'process1: ACKNOWLEDGED -> UNKNOWN'])
+        self.assertEqual(process.listener_state, EventListenerStates.UNKNOWN)
+
+    def test_handle_listener_state_change_ready_to_unknown(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.READY
+        dispatcher.state_buffer = 'bogus data yo'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, '')
+        self.assertEqual(options.logger.data,
+                         [5, 'process1: READY -> UNKNOWN'])
+        self.assertEqual(process.listener_state, EventListenerStates.UNKNOWN)
+
+    def test_handle_listener_state_change_busy_to_insufficient(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.BUSY
+        dispatcher.state_buffer = 'bogus data yo'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, 'bogus data yo')
+        self.assertEqual(process.listener_state, EventListenerStates.BUSY)
+
+    def test_handle_listener_state_change_busy_to_acknowledged_procd(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.BUSY
+        dispatcher.state_buffer = dispatcher.EVENT_PROCESSED_TOKEN + 'abc'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, 'abc')
+        self.assertEqual(options.logger.data,
+                         [5, 'process1: BUSY -> ACKNOWLEDGED (processed)'])
+        self.assertEqual(process.listener_state,
+                         EventListenerStates.ACKNOWLEDGED)
+
+    def test_handle_listener_state_change_busy_to_acknowledged_rejected(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.BUSY
+        dispatcher.state_buffer = dispatcher.EVENT_REJECTED_TOKEN + 'abc'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, 'abc')
+        self.assertEqual(options.logger.data,
+                         [5, 'process1: BUSY -> ACKNOWLEDGED (rejected)'])
+        self.assertEqual(process.listener_state,
+                         EventListenerStates.ACKNOWLEDGED)
+
+    def test_handle_listener_state_change_busy_to_unknown(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        from supervisor.dispatchers import EventListenerStates
+        dispatcher = self._makeOne(process)
+        process.listener_state = EventListenerStates.BUSY
+        dispatcher.state_buffer = 'bogus data\n'
+        self.assertEqual(dispatcher.handle_listener_state_change(), None)
+        self.assertEqual(dispatcher.state_buffer, '')
+        self.assertEqual(options.logger.data,
+                         [5, 'process1: BUSY -> UNKNOWN'])
+        self.assertEqual(process.listener_state,
+                         EventListenerStates.UNKNOWN)
+
+    def test_handle_error(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertRaises(NotImplementedError, dispatcher.handle_error)
+
+    def test_removelogs(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1',
+                              stdout_logfile='/tmp/foo')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        dispatcher.removelogs()
+        self.assertEqual(dispatcher.childlog.handlers[0].reopened, True)
+        self.assertEqual(dispatcher.childlog.handlers[0].removed, True)
+
+    def test_reopenlogs(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1',
+                              stdout_logfile='/tmp/foo')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        dispatcher.reopenlogs()
+        self.assertEqual(dispatcher.childlog.handlers[0].reopened, True)
+
+    def test_strip_ansi(self):
+        options = DummyOptions()
+        options.strip_ansi = True
+        config = DummyPConfig(options, 'process1', '/bin/process1',
+                              stdout_logfile='/tmp/foo')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        ansi = '\x1b[34mHello world... this is longer than a token!\x1b[0m'
+        noansi = 'Hello world... this is longer than a token!'
+
+        options.readfd_result = ansi
+        dispatcher.handle_read_event()
+        self.assertEqual(len(dispatcher.childlog.data), 1)
+        self.assertEqual(dispatcher.childlog.data[0], noansi)
+
+        options.strip_ansi = False
+
+        options.readfd_result = ansi
+        dispatcher.handle_read_event()
+        self.assertEqual(len(dispatcher.childlog.data), 2)
+        self.assertEqual(dispatcher.childlog.data[1], ansi)
+
+    def test_ctor_nologfiles(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertEqual(dispatcher.process, process)
+        self.assertEqual(dispatcher.channel, 'stdout')
+        self.assertEqual(dispatcher.fd, 0)
+        self.assertEqual(dispatcher.childlog, None)
+
+    def test_ctor_logfile_only(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1',
+                              stdout_logfile='/tmp/foo')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        self.assertEqual(dispatcher.process, process)
+        self.assertEqual(dispatcher.channel, 'stdout')
+        self.assertEqual(dispatcher.fd, 0)
+        self.assertEqual(dispatcher.childlog.__class__, DummyLogger)
+
+    
 
 
 def test_suite():