Pārlūkot izejas kodu

Capture (but don't yet send) data to process stdin via process.write.

Chris McDonough 18 gadi atpakaļ
vecāks
revīzija
d8e32f4dd7
3 mainītis faili ar 131 papildinājumiem un 22 dzēšanām
  1. 30 7
      src/supervisor/process.py
  2. 2 2
      src/supervisor/supervisord.py
  3. 99 13
      src/supervisor/tests.py

+ 30 - 7
src/supervisor/process.py

@@ -49,6 +49,7 @@ class Subprocess:
     eventlog = None # the log file captured to when we're in eventmode
     childlog = None # the current logger (event or main)
     logbuffer = '' # buffer of characters read from child pipes
+    writebuffer = '' # buffer of characters to be sent to child's stdin
     exitstatus = None # status attached to dead process by finsh()
     spawnerr = None # error message attached by spawn() if any
     
@@ -166,30 +167,52 @@ class Subprocess:
 
         if after:
             self.log_output()
+
+    def write(self, chars):
+        if not self.pid or self.killing:
+            raise IOError(errno.EPIPE, "Process already closed")
+        self.writebuffer = self.writebuffer + chars
             
-    def drain_stdout(self, *ignored):
+    def drain_stdout(self):
         output = self.options.readfd(self.pipes['stdout'])
         if self.config.log_stdout:
             self.logbuffer += output
 
-    def drain_stderr(self, *ignored):
+    def drain_stderr(self):
         output = self.options.readfd(self.pipes['stderr'])
         if self.config.log_stderr:
             self.logbuffer += output
 
+    def drain_stdin(self):
+        if self.writebuffer:
+            to_send = self.writebuffer[:2<<16]
+            try:
+                sent = self.options.write(self.pipes['stdin'], to_send)
+                self.writebuffer = self.writebuffer[sent:]
+            except OSError, why:
+                msg = 'failed writing to process %r stdin' % self.config.name
+                if why[0] == errno.EPIPE:
+                    self.writebuffer = ''
+                    self.options.logger.info(msg)
+                else:
+                    raise
+
     def drain(self):
         self.drain_stdout()
         self.drain_stderr()
+        self.drain_stdin()
 
-    def get_pipe_drains(self):
+    def get_output_drains(self):
         if not self.pipes:
             return []
+        return ( [ self.pipes['stdout'], self.drain_stdout],
+                 [ self.pipes['stderr'], self.drain_stderr] )
 
-        drains = ( [ self.pipes['stdout'], self.drain_stdout],
-                   [ self.pipes['stderr'], self.drain_stderr] )
+    def get_input_drains(self):
+        if not self.pipes:
+            return []
+        return ( [ self.pipes['stdin'], self.drain_stdin ], )
 
-        return drains
-        
     def get_execv_args(self):
         """Internal: turn a program name into a file name, using $PATH,
         make sure it exists / is executable, raising a ProcessException

+ 2 - 2
src/supervisor/supervisord.py

@@ -149,7 +149,7 @@ class Supervisor:
             # process output fds
             for proc in self.processes.values():
                 proc.log_output()
-                drains = proc.get_pipe_drains()
+                drains = proc.get_output_drains()
                 for fd, drain in drains:
                     r.append(fd)
                     process_map[fd] = drain
@@ -176,7 +176,7 @@ class Supervisor:
                 if process_map.has_key(fd):
                     drain = process_map[fd]
                     # drain the file descriptor
-                    drain(fd)
+                    drain()
 
                 if socket_map.has_key(fd):
                     try:

+ 99 - 13
src/supervisor/tests.py

@@ -1359,6 +1359,68 @@ class SubprocessTests(unittest.TestCase):
         instance.drain_stderr()
         self.assertEqual(instance.logbuffer, 'abc')
 
+    def test_drain_stdin_nodata(self):
+        options = DummyOptions()
+        config = DummyPConfig('test', '/test')
+        instance = self._makeOne(options, config)
+        self.assertEqual(instance.writebuffer, '')
+        instance.drain_stdin()
+        self.assertEqual(instance.writebuffer, '')
+        self.assertEqual(options.written, {})
+
+    def test_drain_stdin_normal(self):
+        options = DummyOptions()
+        config = DummyPConfig('test', '/test')
+        instance = self._makeOne(options, config)
+        instance.spawn()
+        instance.writebuffer = 'foo'
+        instance.drain_stdin()
+        self.assertEqual(instance.writebuffer, '')
+        self.assertEqual(options.written[instance.pipes['stdin']], 'foo')
+
+    def test_drain_stdin_overhardcoded_limit(self):
+        options = DummyOptions()
+        config = DummyPConfig('test', '/test')
+        instance = self._makeOne(options, config)
+        instance.spawn()
+        instance.writebuffer = 'a' * (2 << 17)
+        instance.drain_stdin()
+        self.assertEqual(len(instance.writebuffer), (2<<17)-(2<<16))
+        self.assertEqual(options.written[instance.pipes['stdin']],
+                         ('a' * (2 << 16)))
+
+    def test_drain_stdin_over_os_limit(self):
+        options = DummyOptions()
+        config = DummyPConfig('test', '/test')
+        instance = self._makeOne(options, config)
+        options.write_accept = 1
+        instance.spawn()
+        instance.writebuffer = 'a' * (2 << 16)
+        instance.drain_stdin()
+        self.assertEqual(len(instance.writebuffer), (2<<16) - 1)
+        self.assertEqual(options.written[instance.pipes['stdin']], 'a')
+
+    def test_drain_stdin_epipe(self):
+        options = DummyOptions()
+        config = DummyPConfig('test', '/test')
+        instance = self._makeOne(options, config)
+        options.write_error = errno.EPIPE
+        instance.writebuffer = 'foo'
+        instance.spawn()
+        instance.drain_stdin()
+        self.assertEqual(instance.writebuffer, '')
+        self.assertEqual(options.logger.data,
+            ["failed writing to process 'test' stdin"])
+
+    def test_drain_stdin_uncaught_oserror(self):
+        options = DummyOptions()
+        config = DummyPConfig('test', '/test')
+        instance = self._makeOne(options, config)
+        options.write_error = errno.EBADF
+        instance.writebuffer = 'foo'
+        instance.spawn()
+        self.assertRaises(OSError, instance.drain_stdin)
+
     def test_drain(self):
         options = DummyOptions()
         config = DummyPConfig('test', '/test')
@@ -1374,7 +1436,7 @@ class SubprocessTests(unittest.TestCase):
         instance.drain()
         self.assertEqual(instance.logbuffer, 'abc')
         
-    def test_get_pipe_drains(self):
+    def test_get_output_drains(self):
         options = DummyOptions()
         config = DummyPConfig('test', '/test')
         instance = self._makeOne(options, config)
@@ -1382,13 +1444,13 @@ class SubprocessTests(unittest.TestCase):
         instance.pipes['stdout'] = 'abc'
         instance.pipes['stderr'] = 'def'
 
-        drains = instance.get_pipe_drains()
+        drains = instance.get_output_drains()
         self.assertEqual(len(drains), 2)
         self.assertEqual(drains[0], ['abc', instance.drain_stdout])
         self.assertEqual(drains[1], ['def', instance.drain_stderr])
 
         instance.pipes = {}
-        drains = instance.get_pipe_drains()
+        drains = instance.get_output_drains()
         self.assertEqual(drains, [])
         
 
@@ -1560,7 +1622,7 @@ class SubprocessTests(unittest.TestCase):
         self.assertEqual(len(options.duped), 3)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
         self.assertEqual(options.written,
-             {1: ['good: error trying to setuid to 1!\n', 'good: screwed\n']})
+             {1: 'good: error trying to setuid to 1!\ngood: screwed\n'})
         self.assertEqual(options.privsdropped, None)
         self.assertEqual(options.execv_args,
                          ('/good/filename', ['/good/filename']) )
@@ -1580,7 +1642,7 @@ class SubprocessTests(unittest.TestCase):
         self.assertEqual(len(options.duped), 3)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
         self.assertEqual(options.written,
-                         {1: ["couldn't exec /good/filename: EPERM\n"]})
+                         {1: "couldn't exec /good/filename: EPERM\n"})
         self.assertEqual(options.privsdropped, None)
         self.assertEqual(options._exitcode, 127)
 
@@ -1598,7 +1660,7 @@ class SubprocessTests(unittest.TestCase):
         self.assertEqual(len(options.duped), 3)
         self.assertEqual(len(options.fds_closed), options.minfds - 3)
         self.assertEqual(len(options.written), 1)
-        msg = options.written[1][0]
+        msg = options.written[1]
         self.failUnless(msg.startswith("couldn't exec /good/filename:"))
         self.failUnless("exceptions.RuntimeError" in msg)
         self.assertEqual(options.privsdropped, None)
@@ -1629,6 +1691,21 @@ class SubprocessTests(unittest.TestCase):
         self.failUnless(instance.delay)
         self.assertEqual(instance.options.pidhistory[10], instance)
 
+    def test_write(self):
+        executable = '/bin/cat'
+        options = DummyOptions()
+        config = DummyPConfig('output', executable)
+        instance = self._makeOne(options, config)
+        sent = 'a' * (1 << 13)
+        self.assertRaises(IOError, instance.write, sent)
+        options.forkpid = 1
+        result = instance.spawn()
+        instance.write(sent)
+        received = instance.writebuffer
+        self.assertEqual(sent, received)
+        instance.killing = True
+        self.assertRaises(IOError, instance.write, sent)
+
     def dont_test_spawn_and_kill(self):
         # this is a functional test
         try:
@@ -2755,8 +2832,9 @@ class DummyProcess:
     pipes = None
     childlog = None # the current logger 
     spawnerr = None
-    logbuffer = '' # buffer of characters to send to child process' stdin
-    
+    logbuffer = '' # buffer of characters from child output to log
+    writebuffer = '' # buffer of characters to send to child process' stdin
+
     def __init__(self, options, config, state=ProcessStates.RUNNING):
         self.options = options
         self.config = config
@@ -2802,7 +2880,7 @@ class DummyProcess:
     def drain(self):
         self.drained = True
 
-    def get_pipe_drains(self):
+    def get_output_drains(self):
         return []
 
     def __cmp__(self, other):
@@ -2925,6 +3003,8 @@ class DummyOptions:
         self.privsdropped = None
         self.logs_reopened = False
         self.environment_processed = False
+        self.write_accept = None
+        self.write_error = None
 
     def getLogger(self, *args, **kw):
         logger = DummyLogger()
@@ -3004,6 +3084,16 @@ class DummyOptions:
         pipes['stderr'], pipes['child_stderr'] = (7, 8)
         return pipes
 
+    def write(self, fd, chars):
+        if self.write_error:
+            raise OSError(self.write_error)
+        if self.write_accept:
+            chars = chars[self.write_accept]
+        data = self.written.setdefault(fd, '')
+        data += chars
+        self.written[fd] = data
+        return len(chars)
+
     def fork(self):
         if self.fork_error:
             raise OSError(self.fork_error)
@@ -3024,10 +3114,6 @@ class DummyOptions:
     def dup2(self, frm, to):
         self.duped[frm] = to
 
-    def write(self, fd, data):
-        old_data = self.written.setdefault(fd, [])
-        old_data.append(data)
-
     def _exit(self, code):
         self._exitcode = code