浏览代码

Fix "busywait" bug. Remove dispatcher from dispatcher map if we
detect that the child end of the pipe has been closed. For output
dispatchers, if handle_read_event performs a read but gets no data
back, it means that the child has closed its end of the pipe (perhaps
by exiting). For input dispatchers, if we attempt to write but get an
EPIPE while doing so, it indicates the same thing. See also
http://mail.python.org/pipermail/python-dev/2004-August/046850.html

Chris McDonough 18 年之前
父节点
当前提交
9a4d7dc2c0

+ 32 - 4
src/supervisor/dispatchers.py

@@ -1,5 +1,6 @@
 import logging
 import errno
+from asyncore import compact_traceback
 
 from supervisor.events import notify
 from supervisor.events import EventRejectedEvent
@@ -15,6 +16,8 @@ class PDispatcher:
     """ Asyncore dispatcher for mainloop, representing a process channel
     (stdin, stdout, or stderr).  This class is abstract. """
 
+    closed = False # True if close() has been called
+
     def __repr__(self):
         return '<%s at %s for %s (%s)>' % (self.__class__.__name__,
                                            id(self),
@@ -34,7 +37,24 @@ class PDispatcher:
         raise NotImplementedError
 
     def handle_error(self):
-        raise NotImplementedError
+        nil, t, v, tbinfo = compact_traceback()
+
+        self.process.config.options.logger.critical(
+            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
+                repr(self),
+                t,
+                v,
+                tbinfo
+                )
+            )
+        self.close()
+
+    def close(self):
+        if not self.closed:
+            self.process.config.options.logger.debug(
+                'fd %s closed, stopped monitoring %s' % (self.fd, self))
+            self.process.remove_dispatcher(self.fd)
+            self.closed = True
 
 class POutputDispatcher(PDispatcher):
     """ Output (stdout/stderr) dispatcher, capture output sent within
@@ -186,6 +206,11 @@ class POutputDispatcher(PDispatcher):
         data = self.process.config.options.readfd(self.fd)
         self.output_buffer += data
         self.record_output()
+        if not data:
+            # if we get no data back from the pipe, it means that the
+            # child process has ended.  See
+            # mail.python.org/pipermail/python-dev/2004-August/046850.html
+            self.close()
 
 class PEventListenerDispatcher(PDispatcher):
     """ An output dispatcher that monitors and changes listener_states """
@@ -251,6 +276,11 @@ class PEventListenerDispatcher(PDispatcher):
                 if self.process.config.options.strip_ansi:
                     data = self.process.config.options.stripEscapes(data)
                 self.childlog.info(data)
+        else:
+            # if we get no data back from the pipe, it means that the
+            # child process has ended.  See
+            # mail.python.org/pipermail/python-dev/2004-August/046850.html
+            self.close()
         self.handle_listener_state_change()
 
     def _trace(self, msg):
@@ -363,10 +393,8 @@ class PInputDispatcher(PDispatcher):
                 self.input_buffer = self.input_buffer[sent:]
             except OSError, why:
                 if why[0] == errno.EPIPE:
-                    msg = ('failed write to process %r stdin' %
-                           self.process.config.name)
                     self.input_buffer = ''
-                    self.process.config.options.logger.info(msg)
+                    self.close()
                 else:
                     raise
 

+ 6 - 0
src/supervisor/process.py

@@ -454,6 +454,12 @@ class Subprocess:
                 logger.info('success: %s %s' % (self.config.name, msg))
                 self._assertInState(ProcessStates.STARTING)
                 self.change_state(ProcessStates.RUNNING)
+
+    def remove_dispatcher(self, fd):
+        # called by dispatcher itself to remove itself when it detects
+        # that the child end of the pipe has been closed
+        del self.dispatchers[fd]
+
         
 class ProcessGroupBase:
     def __init__(self, config):

+ 1 - 0
src/supervisor/supervisord.py

@@ -242,6 +242,7 @@ class Supervisor:
 
     def reap(self, once=False):
         pid, sts = self.options.waitpid()
+        self._trace('reap called, waitpid returned %s' % pid)
         if pid:
             process = self.options.pidhistory.get(pid, None)
             if process is None:

+ 4 - 0
src/supervisor/tests/base.py

@@ -321,6 +321,7 @@ class DummyProcess:
         self.output_fd_drained = None
         self.transitioned = False
         self.write_error = None
+        self.dispatchers_removed = []
 
     def reopenlogs(self):
         self.logs_reopened = True
@@ -392,6 +393,9 @@ class DummyProcess:
     def transition(self):
         self.transitioned = True
 
+    def remove_dispatcher(self, fd):
+        self.dispatchers_removed.append(fd)
+
 class DummyPConfig:
     def __init__(self, options, name, command, priority=999, autostart=True,
                  autorestart=True, startsecs=10, startretries=999,

+ 64 - 8
src/supervisor/tests/test_dispatchers.py

@@ -57,10 +57,16 @@ class POutputDispatcherTests(unittest.TestCase):
         
     def test_handle_error(self):
         options = DummyOptions()
-        config = DummyPConfig(options, 'process1', '/bin/process1')
+        config = DummyPConfig(options, 'test', '/test')
         process = DummyProcess(config)
         dispatcher = self._makeOne(process)
-        self.assertRaises(NotImplementedError, dispatcher.handle_error)
+        try:
+            raise ValueError('foo')
+        except:
+            dispatcher.handle_error()
+        result = options.logger.data[0]
+        self.assertTrue(result.startswith(
+            'uncaptured python exception, closing channel'),result)
 
     def test_toggle_capturemode_buffer_overrun(self):
         executable = '/bin/cat'
@@ -323,6 +329,19 @@ class POutputDispatcherTests(unittest.TestCase):
             drepr.find('<supervisor.tests.base.DummyProcess instance at'),
             -1)
         self.assertTrue(drepr.endswith('(stdout)>'), drepr)
+
+    def test_close(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        dispatcher.close()
+        self.assertEqual(process.dispatchers_removed, [0])
+        self.assertEqual(dispatcher.closed, True)
+        dispatcher.close() # make sure we don't error if we try to close twice
+        self.assertEqual(process.dispatchers_removed, [0])
+        self.assertEqual(dispatcher.closed, True)
+        
                         
 class PInputDispatcherTests(unittest.TestCase):
     def _getTargetClass(self):
@@ -379,8 +398,9 @@ class PInputDispatcherTests(unittest.TestCase):
         options.write_error = errno.EPIPE
         dispatcher.handle_write_event()
         self.assertEqual(dispatcher.input_buffer, '')
-        self.assertEqual(options.logger.data,
-            ["failed write to process 'test' stdin"])
+        self.assertTrue(options.logger.data[0].startswith(
+            'fd 0 closed, stopped monitoring'))
+        self.assertTrue(options.logger.data[0].endswith('(stdin)>'))
 
     def test_handle_write_event_uncaught_raised(self):
         options = DummyOptions()
@@ -409,9 +429,17 @@ class PInputDispatcherTests(unittest.TestCase):
         self.assertRaises(NotImplementedError, dispatcher.handle_read_event)
         
     def test_handle_error(self):
-        process = DummyProcess(None)
+        options = DummyOptions()
+        config = DummyPConfig(options, 'test', '/test')
+        process = DummyProcess(config)
         dispatcher = self._makeOne(process)
-        self.assertRaises(NotImplementedError, dispatcher.handle_error)
+        try:
+            raise ValueError('foo')
+        except:
+            dispatcher.handle_error()
+        result = options.logger.data[0]
+        self.assertTrue(result.startswith(
+            'uncaptured python exception, closing channel'),result)
 
     def test_repr(self):
         options = DummyOptions()
@@ -425,6 +453,17 @@ class PInputDispatcherTests(unittest.TestCase):
             -1)
         self.assertTrue(drepr.endswith('(stdin)>'), drepr)
 
+    def test_close(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        dispatcher.close()
+        self.assertEqual(process.dispatchers_removed, [0])
+        self.assertEqual(dispatcher.closed, True)
+        dispatcher.close() # make sure we don't error if we try to close twice
+        self.assertEqual(process.dispatchers_removed, [0])
+        self.assertEqual(dispatcher.closed, True)
 
 class PEventListenerDispatcherTests(unittest.TestCase):
     def _getTargetClass(self):
@@ -668,10 +707,16 @@ class PEventListenerDispatcherTests(unittest.TestCase):
 
     def test_handle_error(self):
         options = DummyOptions()
-        config = DummyPConfig(options, 'process1', '/bin/process1')
+        config = DummyPConfig(options, 'test', '/test')
         process = DummyProcess(config)
         dispatcher = self._makeOne(process)
-        self.assertRaises(NotImplementedError, dispatcher.handle_error)
+        try:
+            raise ValueError('foo')
+        except:
+            dispatcher.handle_error()
+        result = options.logger.data[0]
+        self.assertTrue(result.startswith(
+            'uncaptured python exception, closing channel'),result)
 
     def test_removelogs(self):
         options = DummyOptions()
@@ -747,6 +792,17 @@ class PEventListenerDispatcherTests(unittest.TestCase):
             -1)
         self.assertTrue(drepr.endswith('(stdout)>'), drepr)
     
+    def test_close(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'process1', '/bin/process1')
+        process = DummyProcess(config)
+        dispatcher = self._makeOne(process)
+        dispatcher.close()
+        self.assertEqual(process.dispatchers_removed, [0])
+        self.assertEqual(dispatcher.closed, True)
+        dispatcher.close() # make sure we don't error if we try to close twice
+        self.assertEqual(process.dispatchers_removed, [0])
+        self.assertEqual(dispatcher.closed, True)
 
 
 def test_suite():

+ 9 - 0
src/supervisor/tests/test_process.py

@@ -786,6 +786,15 @@ class SubprocessTests(unittest.TestCase):
         instance.state = 10
         self.assertEqual(instance.change_state(10), False)
 
+    def test_remove_dispatcher(self):
+        options = DummyOptions()
+        config = DummyPConfig(options, 'test', '/test')
+        instance = self._makeOne(config)
+        instance.dispatchers = {0:True}
+        instance.remove_dispatcher(0)
+        self.assertEqual(instance.dispatchers, {})
+        
+
 class ProcessGroupBaseTests(unittest.TestCase):
     def _getTargetClass(self):
         from supervisor.process import ProcessGroupBase