Browse Source

First cut at process communication event capture and notification.

Still need to disambiguate stderr and stdout output.
Chris McDonough 18 years ago
parent
commit
be016b19e0
4 changed files with 214 additions and 37 deletions
  1. 17 0
      src/supervisor/events.py
  2. 18 10
      src/supervisor/options.py
  3. 105 20
      src/supervisor/supervisord.py
  4. 74 7
      src/supervisor/tests.py

+ 17 - 0
src/supervisor/events.py

@@ -0,0 +1,17 @@
+callbacks = []
+
+def subscribe(type, callback):
+    callbacks.append((type, callback))
+    
+def notify(event):
+    for type, callback in callbacks:
+        if isinstance(event, type):
+            callback(event)
+
+class ProcessCommunicationEvent:
+    # event mode tokens
+    BEGIN_TOKEN = '<!--XSUPERVISOR:BEGIN-->'
+    END_TOKEN   = '<!--XSUPERVISOR:END-->'
+    def __init__(self, process_name, data):
+        self.process_name = process_name
+        self.data = data

+ 18 - 10
src/supervisor/options.py

@@ -700,13 +700,18 @@ class ServerOptions(Options):
             uid = config.saneget(section, 'user', None)
             if uid is not None:
                 uid = datatypes.name_to_uid(uid)
-            logfile = config.saneget(section, 'logfile', None)
-            if logfile in ('NONE', 'OFF'):
-                logfile = None
-            elif logfile in (None, 'AUTO'):
-                logfile = self.AUTOMATIC
-            else:
-                logfile = datatypes.existing_dirpath(logfile)
+            for n in ('logfile', 'eventlogfile'):
+                val = config.saneget(section, n, None)
+                if val in ('NONE', 'OFF'):
+                    val = None
+                elif val in (None, 'AUTO'):
+                    val = self.AUTOMATIC
+                else:
+                    val = datatypes.existing_dirpath(val)
+                if n == 'logfile':
+                    logfile = val
+                else:
+                    eventlogfile = val
             logfile_backups = config.saneget(section, 'logfile_backups', 10)
             logfile_backups = datatypes.integer(logfile_backups)
             logfile_maxbytes = config.saneget(section, 'logfile_maxbytes',
@@ -736,6 +741,7 @@ class ServerOptions(Options):
                                     startretries=startretries,
                                     uid=uid,
                                     logfile=logfile,
+                                    eventlogfile = eventlogfile,
                                     logfile_backups=logfile_backups,
                                     logfile_maxbytes=logfile_maxbytes,
                                     stopsignal=stopsignal,
@@ -1306,9 +1312,10 @@ class UnhosedConfigParser(ConfigParser.RawConfigParser):
 
 class ProcessConfig:
     def __init__(self, name, command, priority, autostart, autorestart,
-                 startsecs, startretries, uid, logfile, logfile_backups,
-                 logfile_maxbytes, stopsignal, stopwaitsecs, exitcodes,
-                 log_stdout, log_stderr, environment=None):
+                 startsecs, startretries, uid, logfile, eventlogfile,
+                 logfile_backups, logfile_maxbytes, stopsignal,
+                 stopwaitsecs, exitcodes, log_stdout, log_stderr,
+                 environment=None):
         self.name = name
         self.command = command
         self.priority = priority
@@ -1318,6 +1325,7 @@ class ProcessConfig:
         self.startretries = startretries
         self.uid = uid
         self.logfile = logfile
+        self.eventlogfile = eventlogfile
         self.logfile_backups = logfile_backups
         self.logfile_maxbytes = logfile_maxbytes
         self.stopsignal = stopsignal

+ 105 - 20
src/supervisor/supervisord.py

@@ -52,6 +52,8 @@ import StringIO
 import shlex
 import logging
 
+from supervisor.events import notify
+from supervisor.events import ProcessCommunicationEvent
 from supervisor.options import ServerOptions
 from supervisor.options import decode_wait_status
 from supervisor.options import signame
@@ -95,7 +97,10 @@ class Subprocess:
     killing = 0 # flag determining whether we are trying to kill this proc
     backoff = 0 # backoff counter (to startretries)
     pipes = None # mapping of pipe descriptor purpose to file descriptor
-    childlog = None # the current logger 
+    eventmode = False # are we capturing process output event data
+    mainlog = None # the process log file
+    eventlog = None # the log file captured to when we're in eventmode
+    childlog = None # the current logger (event or main)
     logbuffer = '' # buffer of characters read from child pipes
     exitstatus = None # status attached to dead process by finsh()
     spawnerr = None # error message attached by spawn() if any
@@ -114,33 +119,107 @@ class Subprocess:
             # using "not not maxbytes" below is an optimization.  If
             # maxbytes is zero, it means we're not using rotation.  The
             # rotating logger is more expensive than the normal one.
-            self.childlog = options.getLogger(config.logfile, logging.INFO,
+            self.mainlog = options.getLogger(config.logfile, logging.INFO,
+                                             '%(message)s',
+                                             rotating=not not maxbytes,
+                                             maxbytes=maxbytes,
+                                             backups=backups)
+        if config.eventlogfile:
+            self.eventlog = options.getLogger(config.eventlogfile,
+                                              logging.INFO,
                                               '%(message)s',
-                                              rotating=not not maxbytes,
-                                              maxbytes=maxbytes,
-                                              backups=backups)
+                                              rotating=False)
+        self.childlog = self.mainlog
 
     def removelogs(self):
-        if self.childlog:
-            for handler in self.childlog.handlers:
-                handler.remove()
-                handler.reopen()
+        for log in (self.mainlog, self.eventlog):
+            if log is not None:
+                for handler in log.handlers:
+                    handler.remove()
+                    handler.reopen()
 
     def reopenlogs(self):
-        if self.childlog:
-            for handler in self.childlog.handlers:
-                handler.reopen()
+        for log in (self.mainlog, self.eventlog):
+            if log is not None:
+                for handler in log.handlers:
+                    handler.reopen()
+
+    def toggle_eventmode(self):
+        options = self.options
+        self.eventmode = not self.eventmode
+
+        if self.config.eventlogfile:
+            if self.eventmode:
+                self.childlog = self.eventlog
+            else:
+                eventlogfile = self.config.eventlogfile
+                for handler in self.eventlog.handlers:
+                    handler.flush()
+                data = ''
+                f = open(eventlogfile, 'r')
+                while 1:
+                    new = f.read(1<<20) # 1MB
+                    data += new
+                    if not new:
+                        break
+                    if len(data) > (1 << 21): #2MB
+                        data = data[:1<<21]
+                        # DWIM: don't overrun memory
+                        self.options.logger.info(
+                            'Truncated oversized EVENT mode log to 2MB')
+                        break 
+                    
+                notify(ProcessCommunicationEvent(self.config.name, data))
+                                        
+                msg = "Process '%s' emitted a comm event" % self.config.name
+                self.options.logger.info(msg)
+                                        
+                for handler in self.eventlog.handlers:
+                    handler.remove()
+                    handler.reopen()
+                self.childlog = self.mainlog
 
     def log_output(self):
-        if self.logbuffer:
-            data, self.logbuffer = self.logbuffer, ''
-            if self.childlog:
-                if self.options.strip_ansi:
-                    data = self.options.stripEscapes(data)
-                self.childlog.info(data)
-            msg = '%s output:\n%s' % (self.config.name, data)
-            self.options.logger.log(self.options.TRACE, msg)
+        if not self.logbuffer:
+            return
+        
+        if self.eventmode:
+            token = ProcessCommunicationEvent.END_TOKEN
+        else:
+            token = ProcessCommunicationEvent.BEGIN_TOKEN
+
+        data = self.logbuffer
+        self.logbuffer = ''
+
+        if len(data) + len(self.logbuffer) <= len(token):
+            self.logbuffer = data
+            return # not enough data
 
+        try:
+            before, after = data.split(token, 1)
+        except ValueError:
+            after = None
+            index = find_prefix_at_end(data, token)
+            if index:
+                self.logbuffer = self.logbuffer + data[-index:]
+                data = data[:-index]
+                # XXX log and trace data
+        else:
+            data = before
+            self.toggle_eventmode()
+            self.logbuffer = after
+
+        if self.childlog:
+            if self.options.strip_ansi:
+                data = self.options.stripEscapes(data)
+            self.childlog.info(data)
+
+        msg = '%s output:\n%s' % (self.config.name, data)
+        self.options.logger.log(self.options.TRACE, msg)
+
+        if after:
+            self.log_output()
+            
     def drain_stdout(self, *ignored):
         output = self.options.readfd(self.pipes['stdout'])
         if self.config.log_stdout:
@@ -433,6 +512,12 @@ class Subprocess:
             return ProcessStates.RUNNING
         return ProcessStates.UNKNOWN
 
+def find_prefix_at_end(haystack, needle):
+    l = len(needle) - 1
+    while l and not haystack.endswith(needle[:l]):
+        l -= 1
+    return l
+
 class Supervisor:
     mood = 1 # 1: up, 0: restarting, -1: suicidal
     stopping = False # set after we detect that we are handling a stop request

+ 74 - 7
src/supervisor/tests.py

@@ -1304,10 +1304,12 @@ class SubprocessTests(unittest.TestCase):
         options = DummyOptions()
         config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo')
         instance = self._makeOne(options, config)
-        instance.logbuffer = 'foo'
+        instance.logbuffer = 'this string is longer than a pcomm token'
         instance.log_output()
-        self.assertEqual(instance.childlog.data, ['foo'])
-        self.assertEqual(options.logger.data, [5, 'notthere output:\nfoo'])
+        self.assertEqual(instance.childlog.data,
+                         ['this string is longer than a pcomm token'])
+        self.assertEqual(options.logger.data,
+            [5, 'notthere output:\nthis string is longer than a pcomm token'])
 
     def test_drain_stdout(self):
         options = DummyOptions()
@@ -1783,6 +1785,71 @@ class SubprocessTests(unittest.TestCase):
         instance.laststart = 1
         self.assertEqual(instance.get_state(), ProcessStates.UNKNOWN)
 
+    def test_eventmode_switch(self):
+        from supervisor.events import ProcessCommunicationEvent
+        from supervisor.events import subscribe
+        events = []
+        def doit(event):
+            events.append(event)
+        subscribe(ProcessCommunicationEvent, doit)
+        import string
+        letters = string.letters
+        digits = string.digits * 4
+        BEGIN_TOKEN = ProcessCommunicationEvent.BEGIN_TOKEN
+        END_TOKEN = ProcessCommunicationEvent.END_TOKEN
+        data = (letters +  BEGIN_TOKEN + digits + END_TOKEN + letters)
+        # boundaries that split tokens
+        broken = data.split(':')
+        first = broken[0] + ':'
+        second = broken[1] + ':'
+        third = broken[2]
+
+        executable = '/bin/cat'
+        options = DummyOptions()
+        from options import getLogger
+        options.getLogger = getLogger
+        config = DummyPConfig('output', executable, logfile='/tmp/foo',
+                              eventlogfile='/tmp/bar')
+
+        try:
+            instance = self._makeOne(options, config)
+            logfile = instance.config.logfile
+            instance.logbuffer = first
+            instance.log_output()
+            [ x.flush() for x in instance.mainlog.handlers]
+            self.assertEqual(open(logfile, 'r').read(), letters)
+            self.assertEqual(instance.logbuffer, first[len(letters):])
+            self.assertEqual(len(events), 0)
+
+            instance.logbuffer += second
+            instance.log_output()
+            self.assertEqual(len(events), 0)
+            [ x.flush() for x in instance.mainlog.handlers]
+            self.assertEqual(open(logfile, 'r').read(), letters)
+            self.assertEqual(instance.logbuffer, first[len(letters):])
+            self.assertEqual(len(events), 0)
+
+            instance.logbuffer += third
+            instance.log_output()
+            [ x.flush() for x in instance.mainlog.handlers]
+            self.assertEqual(open(instance.config.logfile, 'r').read(),
+                             letters *2)
+            self.assertEqual(len(events), 1)
+            event = events[0]
+            self.assertEqual(event.__class__, ProcessCommunicationEvent)
+            self.assertEqual(event.process_name, 'output')
+            self.assertEqual(event.data, digits)
+
+        finally:
+            try:
+                os.remove(instance.config.logfile)
+            except (OSError, IOError):
+                pass
+            try:
+                os.remove(instance.config.eventlogfile)
+            except (OSError, IOError):
+                pass
+
     def test_strip_ansi(self):
         executable = '/bin/cat'
         options = DummyOptions()
@@ -1791,8 +1858,8 @@ class SubprocessTests(unittest.TestCase):
         options.strip_ansi = True
         config = DummyPConfig('output', executable, logfile='/tmp/foo')
 
-        ansi = '\x1b[34mHello world!\x1b[0m'
-        noansi = 'Hello world!'
+        ansi = '\x1b[34mHello world... this is longer than a token!\x1b[0m'
+        noansi = 'Hello world... this is longer than a token!'
 
         try:
             instance = self._makeOne(options, config)
@@ -2726,7 +2793,7 @@ class DummyProcess:
 class DummyPConfig:
     def __init__(self, name, command, priority=999, autostart=True,
                  autorestart=True, startsecs=10, startretries=999,
-                 uid=None, logfile=None, logfile_backups=0,
+                 uid=None, logfile=None, eventlogfile=None, logfile_backups=0,
                  logfile_maxbytes=0, log_stdout=True, log_stderr=False,
                  stopsignal=signal.SIGTERM, stopwaitsecs=10,
                  exitcodes=[0,2], environment=None):
@@ -2739,6 +2806,7 @@ class DummyPConfig:
         self.startretries = startretries
         self.uid = uid
         self.logfile = logfile
+        self.eventlogfile = eventlogfile
         self.logfile_backups = logfile_backups
         self.logfile_maxbytes = logfile_maxbytes
         self.log_stdout = log_stdout
@@ -2747,7 +2815,6 @@ class DummyPConfig:
         self.stopwaitsecs = stopwaitsecs
         self.exitcodes = exitcodes
         self.environment = environment
-        
 
 class DummyLogger:
     def __init__(self):