Bläddra i källkod

If the stdin pipe of a subprocess is closed when an XMLRPC consumer
calls sendProcessStdin, return a NO_FILE fault.

Fix an inversion of OSError/IOError in the error raising/catching
protocol.

Chris McDonough 18 år sedan
förälder
incheckning
4c7bc60414

+ 10 - 3
src/supervisor/dispatchers.py

@@ -55,6 +55,9 @@ class PDispatcher:
                 'fd %s closed, stopped monitoring %s' % (self.fd, self))
             self.closed = True
 
+    def flush(self):
+        pass
+
 class POutputDispatcher(PDispatcher):
     """ Output (stdout/stderr) dispatcher, capture output sent within
     <!--XSUPERVISOR:BEGIN--><!--XSUPERVISOR:END--> tags and notify
@@ -387,13 +390,17 @@ class PInputDispatcher(PDispatcher):
 
     def readable(self):
         return False
+
+    def flush(self):
+        # other code depends on this raising EPIPE if the pipe is closed
+        sent = self.process.config.options.write(self.fd,
+                                                 self.input_buffer)
+        self.input_buffer = self.input_buffer[sent:]
     
     def handle_write_event(self):
         if self.input_buffer:
             try:
-                sent = self.process.config.options.write(self.fd,
-                                                         self.input_buffer)
-                self.input_buffer = self.input_buffer[sent:]
+                self.flush()
             except OSError, why:
                 if why[0] == errno.EPIPE:
                     self.input_buffer = ''

+ 11 - 4
src/supervisor/process.py

@@ -90,11 +90,18 @@ class Subprocess:
                 
     def write(self, chars):
         if not self.pid or self.killing:
-            raise IOError(errno.EPIPE, "Process already closed")
+            raise OSError(errno.EPIPE, "Process already closed")
+
         stdin_fd = self.pipes['stdin']
-        if stdin_fd is not None:
-            dispatcher = self.dispatchers[stdin_fd]
-            dispatcher.input_buffer += chars
+        if stdin_fd is None:
+            raise OSError(errno.EPIPE, "Process has no stdin channel")
+
+        dispatcher = self.dispatchers[stdin_fd]
+        if dispatcher.closed:
+            raise OSError(errno.EPIPE, "Process' stdin channel is closed")
+            
+        dispatcher.input_buffer += chars
+        dispatcher.flush() # this must raise EPIPE if the pipe is closed
 
     def get_execv_args(self):
         """Internal: turn a program name into a file name, using $PATH,

+ 16 - 6
src/supervisor/rpcinterface.py

@@ -1,6 +1,7 @@
 import os
 import time
 import datetime
+import errno
 
 from supervisor.datatypes import Automatic
 
@@ -116,7 +117,7 @@ class SupervisorNamespaceRPCInterface:
         # there is a race condition here, but ignore it.
         try:
             self.supervisord.options.remove(logfile)
-        except (os.error, IOError):
+        except (OSError, IOError):
             raise RPCError(Faults.FAILED)
 
         for handler in self.supervisord.options.logger.handlers:
@@ -797,7 +798,7 @@ class SupervisorNamespaceRPCInterface:
         try:
             # implies a reopen
             process.removelogs()
-        except (IOError, os.error):
+        except (IOError, OSError):
             raise RPCError(Faults.FAILED, name)
 
         return True
@@ -851,9 +852,12 @@ class SupervisorNamespaceRPCInterface:
 
     def sendProcessStdin(self, name, chars):
         """ Send a string of chars to the stdin of the process name.
-        If non-7-bit data is sent (unicode), it is encoded to utf-8 before
-        being sent to the process' stdin.  If chars is not a string
-        or is not unicode, raise INCORRECT_PARAMETERS.
+        If non-7-bit data is sent (unicode), it is encoded to utf-8
+        before being sent to the process' stdin.  If chars is not a
+        string or is not unicode, raise INCORRECT_PARAMETERS.  If the
+        process has already been terminated, raise ALREADY_TERMINATED.
+        If the process' stdin cannot accept input (e.g. it was closed
+        by the child process), raise NO_FILE.
 
         @param string name        The process name to send to (or 'group:name')
         @param string chars       The character data to send to the process
@@ -875,7 +879,13 @@ class SupervisorNamespaceRPCInterface:
         if not process.pid or process.killing:
             raise RPCError(Faults.ALREADY_TERMINATED, name)
 
-        process.write(chars)
+        try:
+            process.write(chars)
+        except OSError, why:
+            if why[0] == errno.EPIPE:
+                raise RPCError(Faults.NO_FILE, name)
+            else:
+                raise
 
         return True
 

+ 10 - 1
src/supervisor/tests/base.py

@@ -386,7 +386,7 @@ class DummyProcess:
 
     def write(self, chars):
         if self.write_error:
-            raise IOError(self.write_error)
+            raise OSError(self.write_error)
         self.stdin_buffer += chars
 
     def transition(self):
@@ -756,6 +756,9 @@ class DummyDispatcher:
     error_handled = False
     logs_reopened = False
     logs_removed = False
+    closed = False
+    flush_error = None
+    flushed = False
     def __init__(self, readable=False, writable=False, error=False):
         self._readable = readable
         self._writable = writable
@@ -779,6 +782,12 @@ class DummyDispatcher:
         self.logs_reopened = True
     def removelogs(self):
         self.logs_removed = True
+    def close(self):
+        self.closed = True
+    def flush(self):
+        if self.flush_error:
+            raise OSError(self.flush_error)
+        self.flushed = True
                 
         
 def lstrip(s):

+ 30 - 3
src/supervisor/tests/test_process.py

@@ -3,6 +3,7 @@ import signal
 import time
 import unittest
 import sys
+import errno
 
 from supervisor.tests.base import DummyOptions
 from supervisor.tests.base import DummyPConfig
@@ -439,14 +440,40 @@ class SubprocessTests(unittest.TestCase):
         config = DummyPConfig(options, 'output', executable)
         instance = self._makeOne(config)
         sent = 'a' * (1 << 13)
-        self.assertRaises(IOError, instance.write, sent)
+        self.assertRaises(OSError, instance.write, sent)
         options.forkpid = 1
         result = instance.spawn()
         instance.write(sent)
         stdin_fd = instance.pipes['stdin']
         self.assertEqual(sent, instance.dispatchers[stdin_fd].input_buffer)
         instance.killing = True
-        self.assertRaises(IOError, instance.write, sent)
+        self.assertRaises(OSError, instance.write, sent)
+
+    def test_write_dispatcher_closed(self):
+        executable = '/bin/cat'
+        options = DummyOptions()
+        config = DummyPConfig(options, 'output', executable)
+        instance = self._makeOne(config)
+        sent = 'a' * (1 << 13)
+        self.assertRaises(OSError, instance.write, sent)
+        options.forkpid = 1
+        result = instance.spawn()
+        stdin_fd = instance.pipes['stdin']
+        instance.dispatchers[stdin_fd].close()
+        self.assertRaises(OSError, instance.write, sent)
+
+    def test_write_dispatcher_flush_raises_epipe(self):
+        executable = '/bin/cat'
+        options = DummyOptions()
+        config = DummyPConfig(options, 'output', executable)
+        instance = self._makeOne(config)
+        sent = 'a' * (1 << 13)
+        self.assertRaises(OSError, instance.write, sent)
+        options.forkpid = 1
+        result = instance.spawn()
+        stdin_fd = instance.pipes['stdin']
+        instance.dispatchers[stdin_fd].flush_error = errno.EPIPE
+        self.assertRaises(OSError, instance.write, sent)
 
     def dont_test_spawn_and_kill(self):
         # this is a functional test
@@ -1088,7 +1115,7 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         process1.listener_state = EventListenerStates.READY
         from supervisor.events import StartingFromStoppedEvent
         event = StartingFromStoppedEvent(process1)
-        pool._dispatchEvent(event)
+        self.assertRaises(OSError, pool._dispatchEvent, event)
         self.assertEqual(process1.stdin_buffer, '')
         self.assertEqual(process1.listener_state, EventListenerStates.READY)
 

+ 14 - 0
src/supervisor/tests/test_rpcinterfaces.py

@@ -2,6 +2,7 @@ import unittest
 import sys
 import os
 import time
+import errno
 
 from supervisor.tests.base import DummyOptions
 from supervisor.tests.base import DummySupervisor
@@ -1317,6 +1318,19 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
                              interface.sendProcessStdin,
                              'process1', 'chars for stdin')
         
+    def test_sendProcessStdin_raises_no_file_when_write_raises_epipe(self):
+        options = DummyOptions()
+        pconfig1 = DummyPConfig(options, 'process1', 'foo')
+        supervisord = PopulatedDummySupervisor(options, 'process1', pconfig1)
+        supervisord.set_procattr('process1', 'pid', 42)
+        supervisord.set_procattr('process1', 'killing', False)
+        supervisord.set_procattr('process1', 'write_error', errno.EPIPE)
+        interface   = self._makeOne(supervisord)
+        from supervisor import xmlrpc
+        self._assertRPCError(xmlrpc.Faults.NO_FILE,
+                             interface.sendProcessStdin,
+                             'process1', 'chars for stdin')
+
     def test_sendProcessStdin_writes_chars_and_returns_true(self):
         options = DummyOptions()
         pconfig1 = DummyPConfig(options, 'process1', 'foo')