浏览代码

Close pipes properly when a child dies.

Chris McDonough 18 年之前
父节点
当前提交
6968b9c2c9
共有 5 个文件被更改,包括 70 次插入85 次删除
  1. 8 0
      CHANGES.txt
  2. 11 6
      src/supervisor/options.py
  3. 1 1
      src/supervisor/supervisorctl.py
  4. 26 21
      src/supervisor/supervisord.py
  5. 24 57
      src/supervisor/tests.py

+ 8 - 0
CHANGES.txt

@@ -7,6 +7,11 @@
     socket file, so the process could not be controlled (it and all of
     its subprocesses would need to be killed by hand).
 
+  - Close subprocess file descriptors properly when a subprocess exits
+    or otherwise dies.  This should result in fewer "too many open
+    files to spawn foo" messages when supervisor is left up for long
+    periods of time.
+
   - When a process was not killable with a "normal" signal at shutdown
     time, too many "INFO: waiting for x to die" messages would be sent
     to the log until we ended up killing the process with a SIGKILL.
@@ -29,6 +34,9 @@
   - Better supervisorctl reporting on stop requests that have a FAILED
     status.
 
+  - Removed duplicated code (readLog/readMainLog), thanks to Mike
+    Naberezny.
+
 2.1b2
 
   - Added new tailProcessLog() command to the XML-RPC API that

+ 11 - 6
src/supervisor/options.py

@@ -888,7 +888,7 @@ class ServerOptions(Options):
         for x in range(start, self.minfds):
             try:
                 os.close(x)
-            except:
+            except os.error:
                 pass
 
     def kill(self, pid, signal):
@@ -1057,12 +1057,17 @@ class ServerOptions(Options):
                 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NDELAY)
             return pipes
         except OSError:
-            self.close_pipes(pipes)
-            raise
+            for fd in pipes.values():
+                self.close_fd(fd)
+            subprocess.pipes = {}
+
+    def close_parent_pipes(self, pipes):
+        for fdname in ('stdin', 'stdout', 'stderr'):
+            self.close_fd(pipes[fdname])
 
-    def close_pipes(self, pipes):
-        for fd in pipes.values():
-            self.close_fd(fd)
+    def close_child_pipes(self, pipes):
+        for fdname in ('child_stdin', 'child_stdout', 'child_stderr'):
+            self.close_fd(pipes[fdname])
 
     def close_fd(self, fd):
         try:

+ 1 - 1
src/supervisor/supervisorctl.py

@@ -259,7 +259,7 @@ class Controller(cmd.Cmd):
         supervisor = self._get_supervisor()
 
         try:
-            output = supervisor.readMainLog(-bytes, 0)
+            output = supervisor.readLog(-bytes, 0)
         except xmlrpclib.Fault, e:
             template = '%s: ERROR (%s)'
             if e.faultCode == xmlrpc.Faults.NO_FILE:

+ 26 - 21
src/supervisor/supervisord.py

@@ -246,14 +246,14 @@ class Subprocess:
                 msg = 'unknown error: %s' % errno.errorcode.get(code, code)
 
             self.record_spawnerr(msg)
-            self.options.close_pipes(self.pipes)
+            self.options.close_parent_pipes(self.pipes)
+            self.options.close_child_pipes(self.pipes)
             return
 
         if pid != 0:
             # Parent
             self.pid = pid
-            for fdname in ('child_stdin', 'child_stdout', 'child_stderr'):
-                self.options.close_fd(self.pipes[fdname])
+            self.options.close_child_pipes(self.pipes)
             self.options.logger.info('spawned: %r with pid %s' % (pname, pid))
             self.spawnerr = None
             # we use self.delay here as a mechanism to indicate that we're in
@@ -388,6 +388,7 @@ class Subprocess:
         self.options.logger.info(msg)
 
         self.pid = 0
+        self.options.close_parent_pipes(self.pipes)
         self.pipes = {}
 
     def set_uid(self):
@@ -490,23 +491,6 @@ class Supervisor:
 
             r, w, x = [], [], []
 
-            process_map = {}
-
-            # process output fds
-            for proc in self.processes.values():
-                proc.log_output()
-                drains = proc.get_pipe_drains()
-                for fd, drain in drains:
-                    r.append(fd)
-                    process_map[fd] = drain
-
-            # medusa i/o fds
-            for fd, dispatcher in socket_map.items():
-                if dispatcher.readable():
-                    r.append(fd)
-                if dispatcher.writable():
-                    w.append(fd)
-
             if self.mood < 1:
                 if not self.stopping:
                     self.stop_all()
@@ -526,15 +510,33 @@ class Supervisor:
                 else:
                     break
 
+            process_map = {}
+
+            # process output fds
+            for proc in self.processes.values():
+                proc.log_output()
+                drains = proc.get_pipe_drains()
+                for fd, drain in drains:
+                    r.append(fd)
+                    process_map[fd] = drain
+
+            # medusa i/o fds
+            for fd, dispatcher in socket_map.items():
+                if dispatcher.readable():
+                    r.append(fd)
+                if dispatcher.writable():
+                    w.append(fd)
+
             try:
                 r, w, x = select.select(r, w, x, timeout)
             except select.error, err:
+                r = w = x = []
                 if err[0] == errno.EINTR:
                     self.options.logger.log(self.options.TRACE,
                                             'EINTR encountered in select')
+                    
                 else:
                     raise
-                r = w = x = []
 
             for fd in r:
                 if process_map.has_key(fd):
@@ -692,6 +694,9 @@ class Supervisor:
                 self.options.logger.critical(
                     'received %s indicating restart request' % signame(sig))
                 self.mood = 0
+            elif sig == signal.SIGCHLD:
+                self.options.logger.info(
+                    'received %s indicating a child quit' % signame(sig))
             elif sig == signal.SIGUSR2:
                 self.options.logger.info(
                     'received %s indicating log reopen request' % signame(sig))

+ 24 - 57
src/supervisor/tests.py

@@ -985,51 +985,6 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         self.assertEqual(offset, 0)
         self.assertEqual(data, '')
 
-    def test_readMainLog_unreadable(self):
-        supervisord = DummySupervisor()
-        interface = self._makeOne(supervisord)
-        self._assertRPCError(xmlrpc.Faults.NO_FILE,
-                             interface.readMainLog, offset=0, length=1)
-
-    def test_readMainLog_badargs(self):
-        supervisord = DummySupervisor()
-        interface = self._makeOne(supervisord)
-
-        try:
-            logfile = supervisord.options.logfile
-            f = open(logfile, 'w+')
-            f.write('x' * 2048)
-            f.close()
-            self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS,
-                                 interface.readMainLog,
-                                 offset=-1, length=1)
-            self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS,
-                                 interface.readMainLog,
-                                 offset=-1, length=-1)
-        finally:
-            os.remove(logfile)
-
-    def test_readMainLog(self):
-        supervisord = DummySupervisor()
-        interface = self._makeOne(supervisord)
-        logfile = supervisord.options.logfile
-        try:
-            f = open(logfile, 'w+')
-            f.write('x' * 2048)
-            f.write('y' * 2048)
-            f.close()
-            data = interface.readMainLog(offset=0, length=0)
-            self.assertEqual(interface.update_text, 'readMainLog')
-            self.assertEqual(data, ('x' * 2048) + ('y' * 2048))
-            data = interface.readMainLog(offset=2048, length=0)
-            self.assertEqual(data, 'y' * 2048)
-            data = interface.readMainLog(offset=0, length=2048)
-            self.assertEqual(data, 'x' * 2048)
-            data = interface.readMainLog(offset=-4, length=0)
-            self.assertEqual(data, 'y' * 4)
-        finally:
-            os.remove(logfile)
-
     def test_clearProcessLog_bad_name(self):
         supervisord = DummySupervisor()
         interface = self._makeOne(supervisord)
@@ -1475,7 +1430,8 @@ class SubprocessTests(unittest.TestCase):
                          "Too many processes in process table to spawn 'good'")
         self.assertEqual(options.logger.data[0],
              "spawnerr: Too many processes in process table to spawn 'good'")
-        self.assertEqual(len(options.pipes_closed), 6)
+        self.assertEqual(len(options.parent_pipes_closed), 6)
+        self.assertEqual(len(options.child_pipes_closed), 6)
         self.failUnless(instance.delay)
         self.failUnless(instance.backoff)
 
@@ -1489,7 +1445,8 @@ class SubprocessTests(unittest.TestCase):
         self.assertEqual(instance.spawnerr, 'unknown error: EPERM')
         self.assertEqual(options.logger.data[0],
                          "spawnerr: unknown error: EPERM")
-        self.assertEqual(len(options.pipes_closed), 6)
+        self.assertEqual(len(options.parent_pipes_closed), 6)
+        self.assertEqual(len(options.child_pipes_closed), 6)
         self.failUnless(instance.delay)
         self.failUnless(instance.backoff)
 
@@ -1500,7 +1457,8 @@ class SubprocessTests(unittest.TestCase):
         instance = self._makeOne(options, config)
         result = instance.spawn()
         self.assertEqual(result, None)
-        self.assertEqual(options.pipes_closed, None)
+        self.assertEqual(options.parent_pipes_closed, None)
+        self.assertEqual(options.child_pipes_closed, None)
         self.assertEqual(options.pgrp_set, True)
         self.assertEqual(len(options.duped), 3)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
@@ -1518,7 +1476,8 @@ class SubprocessTests(unittest.TestCase):
         instance = self._makeOne(options, config)
         result = instance.spawn()
         self.assertEqual(result, None)
-        self.assertEqual(options.pipes_closed, None)
+        self.assertEqual(options.parent_pipes_closed, None)
+        self.assertEqual(options.child_pipes_closed, None)
         self.assertEqual(options.pgrp_set, True)
         self.assertEqual(len(options.duped), 3)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
@@ -1537,7 +1496,8 @@ class SubprocessTests(unittest.TestCase):
         instance = self._makeOne(options, config)
         result = instance.spawn()
         self.assertEqual(result, None)
-        self.assertEqual(options.pipes_closed, None)
+        self.assertEqual(options.parent_pipes_closed, None)
+        self.assertEqual(options.child_pipes_closed, None)
         self.assertEqual(options.pgrp_set, True)
         self.assertEqual(len(options.duped), 3)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
@@ -1554,7 +1514,8 @@ class SubprocessTests(unittest.TestCase):
         instance = self._makeOne(options, config)
         result = instance.spawn()
         self.assertEqual(result, None)
-        self.assertEqual(options.pipes_closed, None)
+        self.assertEqual(options.parent_pipes_closed, None)
+        self.assertEqual(options.child_pipes_closed, None)
         self.assertEqual(options.pgrp_set, True)
         self.assertEqual(len(options.duped), 3)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
@@ -1572,8 +1533,8 @@ class SubprocessTests(unittest.TestCase):
         instance = self._makeOne(options, config)
         result = instance.spawn()
         self.assertEqual(result, 10)
-        self.assertEqual(options.pipes_closed, None)
-        self.assertEqual(len(options.fds_closed), 3)
+        self.assertEqual(options.parent_pipes_closed, None)
+        self.assertEqual(len(options.child_pipes_closed), 6)
         self.assertEqual(options.logger.data[0], "spawned: 'good' with pid 10")
         self.assertEqual(instance.spawnerr, None)
         self.failUnless(instance.delay)
@@ -1673,10 +1634,12 @@ class SubprocessTests(unittest.TestCase):
         instance.waitstatus = (123, 1) # pid, waitstatus
         instance.options.pidhistory[123] = instance
         instance.killing = 1
-        instance.pipes = {'stdout':'','stderr':''}
+        pipes = {'stdout':'','stderr':''}
+        instance.pipes = pipes
         instance.finish(123, 1)
         self.assertEqual(instance.killing, 0)
         self.assertEqual(instance.pid, 0)
+        self.assertEqual(options.parent_pipes_closed, pipes)
         self.assertEqual(instance.pipes, {})
         self.assertEqual(options.logger.data[0], 'stopped: notthere '
                          '(terminated by SIGHUP)')
@@ -2722,7 +2685,8 @@ class DummyOptions:
         self.waitpid_return = None, None
         self.kills = {}
         self.signal = None
-        self.pipes_closed = None
+        self.parent_pipes_closed = None
+        self.child_pipes_closed = None
         self.forkpid = 0
         self.pgrp_set = None
         self.duped = {}
@@ -2821,8 +2785,11 @@ class DummyOptions:
     def close_fd(self, fd):
         self.fds_closed.append(fd)
 
-    def close_pipes(self, pipes):
-        self.pipes_closed = pipes
+    def close_parent_pipes(self, pipes):
+        self.parent_pipes_closed = pipes
+
+    def close_child_pipes(self, pipes):
+        self.child_pipes_closed = pipes
 
     def setpgrp(self):
         self.pgrp_set = True