Browse Source

work towards faster start and stop (#131)

Conflicts:
	supervisor/rpcinterface.py
Chris McDonough 10 years ago
parent
commit
c5d5cad04a
3 changed files with 118 additions and 88 deletions
  1. 105 78
      supervisor/rpcinterface.py
  2. 3 0
      supervisor/tests/base.py
  3. 10 10
      supervisor/tests/test_rpcinterfaces.py

+ 105 - 78
supervisor/rpcinterface.py

@@ -2,6 +2,7 @@ import os
 import time
 import datetime
 import errno
+import types
 
 from supervisor.options import readFile
 from supervisor.options import tailFile
@@ -23,7 +24,10 @@ from supervisor.states import SupervisorStates
 from supervisor.states import getSupervisorStateDescription
 from supervisor.states import ProcessStates
 from supervisor.states import getProcessStateDescription
-from supervisor.states import RUNNING_STATES
+from supervisor.states import (
+    RUNNING_STATES,
+    STOPPED_STATES,
+    )
 
 API_VERSION  = '3.0'
 
@@ -269,49 +273,56 @@ class SupervisorNamespaceRPCInterface:
         except (NotExecutable, NoPermission), why:
             raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
 
-        started = []
+        if process.get_state() in RUNNING_STATES:
+            raise RPCError(Faults.ALREADY_STARTED, name)
 
-        startsecs = process.config.startsecs
+        process.spawn()
 
-        def startit():
-            if not started:
+        # We call reap() in order to more quickly obtain the side effects of
+        # process.finish(), which reap() eventually ends up calling.  This
+        # might be the case if the spawn() was successful but then the process
+        # died before its startsecs elapsed or it exited with an unexpected
+        # exit code. In particular, finish() may set spawnerr, which we can
+        # check and immediately raise an RPCError, avoiding the need to
+        # defer by returning a callback.
 
-                if process.get_state() in RUNNING_STATES:
-                    raise RPCError(Faults.ALREADY_STARTED, name)
+        self.supervisord.reap()
 
-                process.spawn()
+        if process.spawnerr:
+            raise RPCError(Faults.SPAWN_ERROR, name)
 
+        # We call process.transition() in order to more quickly obtain its
+        # side effects.  In particular, it might set the process' state from
+        # STARTING->RUNNING if the process has a startsecs==0.
+        process.transition()
+
+        if wait and process.get_state() != ProcessStates.RUNNING:
+            # by default, this branch will almost always be hit for processes
+            # with default startsecs configurations, because the default number
+            # of startsecs for a process is "1", and the process will not have
+            # entered the RUNNING state yet even though we've called
+            # transition() on it.  This is because a process is not considered
+            # RUNNING until it has stayed up > startsecs.
+
+            def onwait():
                 if process.spawnerr:
                     raise RPCError(Faults.SPAWN_ERROR, name)
 
-                # we use a list here to fake out lexical scoping;
-                # using a direct assignment to 'started' in the
-                # function appears to not work (symptom: 2nd or 3rd
-                # call through, it forgets about 'started', claiming
-                # it's undeclared).
-                started.append(time.time())
+                state = process.get_state()
 
-            if not wait or not startsecs:
-                return True
+                if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
+                    raise RPCError(Faults.ABNORMAL_TERMINATION, name)
 
-            t = time.time()
-            runtime = (t - started[0])
-            state = process.get_state()
-
-            if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
-                raise RPCError(Faults.ABNORMAL_TERMINATION, name)
+                if state == ProcessStates.RUNNING:
+                    return True
 
-            if runtime < startsecs:
                 return NOT_DONE_YET
 
-            if state == ProcessStates.RUNNING:
-                return True
-
-            raise RPCError(Faults.ABNORMAL_TERMINATION, name)
+            onwait.delay = 0.05
+            onwait.rpcinterface = self
+            return onwait # deferred
 
-        startit.delay = 0.05
-        startit.rpcinterface = self
-        return startit # deferred
+        return True
 
     def startProcessGroup(self, name, wait=True):
         """ Start all processes in the group named 'name'
@@ -369,36 +380,43 @@ class SupervisorNamespaceRPCInterface:
             group_name, process_name = split_namespec(name)
             return self.stopProcessGroup(group_name, wait)
 
-        stopped = []
-        called  = []
-
-        def killit():
-            if not called:
-                if process.get_state() not in RUNNING_STATES:
-                    raise RPCError(Faults.NOT_RUNNING)
-                # use a mutable for lexical scoping; see startProcess
-                called.append(1)
-
-            if not stopped:
-                msg = process.stop()
-                if msg is not None:
-                    raise RPCError(Faults.FAILED, msg)
-                stopped.append(1)
-
-                if wait:
+        if process.get_state() not in RUNNING_STATES:
+            raise RPCError(Faults.NOT_RUNNING)
+
+        msg = process.stop()
+        if msg is not None:
+            raise RPCError(Faults.FAILED, msg)
+
+        # We'll try to reap any killed child. FWIW, reap calls waitpid, and
+        # then, if waitpid returns a pid, calls finish() on the process with
+        # that pid, which drains any I/O from the process' dispatchers and
+        # changes the process' state.  I chose to call reap without once=True
+        # because we don't really care if we reap more than one child.  Even if
+        # we only reap one child. we may not even be reaping the child that we
+        # just stopped (this is all async, and process.stop() may not work, and
+        # we'll need to wait for SIGKILL during process.transition() as the
+        # result of normal select looping).
+
+        self.supervisord.reap()
+
+        if wait and process.get_state() not in STOPPED_STATES:
+
+            def onwait():
+                # process will eventually enter a stopped state by
+                # virtue of the supervisord.reap() method being called
+                # during normal operations
+                self.supervisord.options.logger.info(
+                    'waiting for %s to stop' % process.config.name
+                    )
+                if process.get_state() not in STOPPED_STATES:
                     return NOT_DONE_YET
-                else:
-                    return True
-
-            if process.get_state() not in (ProcessStates.STOPPED,
-                                           ProcessStates.EXITED):
-                return NOT_DONE_YET
-            else:
                 return True
 
-        killit.delay = 0.2
-        killit.rpcinterface = self
-        return killit # deferred
+            onwait.delay = 0
+            onwait.rpcinterface = self
+            return onwait # deferred
+
+        return True
 
     def stopProcessGroup(self, name, wait=True):
         """ Stop all processes in the process group named 'name'
@@ -794,6 +812,7 @@ def make_allfunc(processes, predicate, func, **extra_kwargs):
         callbacks=callbacks, # used only to fool scoping, never passed by caller
         results=results, # used only to fool scoping, never passed by caller
         ):
+
         if not callbacks:
 
             for group, process in processes:
@@ -801,39 +820,47 @@ def make_allfunc(processes, predicate, func, **extra_kwargs):
                 if predicate(process):
                     try:
                         callback = func(name, **extra_kwargs)
-                        callbacks.append((group, process, callback))
                     except RPCError, e:
                         results.append({'name':process.config.name,
                                         'group':group.config.name,
                                         'status':e.code,
                                         'description':e.text})
                         continue
+                    if isinstance(callback, types.FunctionType):
+                        callbacks.append((group, process, callback))
+                    else:
+                        results.append(
+                            {'name':process.config.name,
+                             'group':group.config.name,
+                             'status':Faults.SUCCESS,
+                             'description':'OK'}
+                            )
 
         if not callbacks:
             return results
 
-        group, process, callback = callbacks.pop(0)
+        for struct in callbacks[:]:
 
-        try:
-            value = callback()
-        except RPCError, e:
-            results.append(
-                {'name':process.config.name,
-                 'group':group.config.name,
-                 'status':e.code,
-                 'description':e.text})
-            return NOT_DONE_YET
+            group, process, cb = struct
 
-        if value is NOT_DONE_YET:
-            # push it back into the queue; it will finish eventually
-            callbacks.append((group, process, callback))
-        else:
-            results.append(
-                {'name':process.config.name,
-                 'group':group.config.name,
-                 'status':Faults.SUCCESS,
-                 'description':'OK'}
-                )
+            try:
+                value = cb()
+            except RPCError, e:
+                results.append(
+                    {'name':process.config.name,
+                     'group':group.config.name,
+                     'status':e.code,
+                     'description':e.text})
+                value = None
+
+            if value is not NOT_DONE_YET:
+                results.append(
+                    {'name':process.config.name,
+                     'group':group.config.name,
+                     'status':Faults.SUCCESS,
+                     'description':'OK'}
+                    )
+                callbacks.remove(struct)
 
         if callbacks:
             return NOT_DONE_YET

+ 3 - 0
supervisor/tests/base.py

@@ -983,6 +983,9 @@ class PopulatedDummySupervisor(DummySupervisor):
         process = self.process_groups[group_name].processes[process_name]
         setattr(process, attr_name, val)
 
+    def reap(self):
+        self.reaped = True
+
 class DummyDispatcher:
     write_event_handled = False
     read_event_handled = False

+ 10 - 10
supervisor/tests/test_rpcinterfaces.py

@@ -315,9 +315,10 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
         supervisord.set_procattr('foo', 'pid', 10)
         interface = self._makeOne(supervisord)
-        callback = interface.startProcess('foo')
-        self._assertRPCError(xmlrpc.Faults.ALREADY_STARTED,
-                             callback)
+        self._assertRPCError(
+            xmlrpc.Faults.ALREADY_STARTED,
+            interface.startProcess, 'foo'
+            )
 
     def test_startProcess_bad_group_name(self):
         options = DummyOptions()
@@ -371,11 +372,13 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         process = supervisord.process_groups['foo'].processes['foo']
         process.spawnerr = 'abc'
         interface = self._makeOne(supervisord)
-        callback = interface.startProcess('foo')
-        self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR, callback)
+        self._assertRPCError(
+            xmlrpc.Faults.SPAWN_ERROR,
+            interface.startProcess,
+            'foo'
+            )
 
     def test_startProcess(self):
-        from supervisor import http
         options = DummyOptions()
         pconfig = DummyPConfig(options, 'foo', __file__, autostart=False,
                                startsecs=.01)
@@ -383,14 +386,11 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
         supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED)
         interface = self._makeOne(supervisord)
-        callback = interface.startProcess('foo')
-        self.assertEqual(callback(), http.NOT_DONE_YET)
+        result = interface.startProcess('foo')
         process = supervisord.process_groups['foo'].processes['foo']
         self.assertEqual(process.spawned, True)
         self.assertEqual(interface.update_text, 'startProcess')
         process.state = ProcessStates.RUNNING
-        time.sleep(.02)
-        result = callback()
         self.assertEqual(result, True)
 
     def test_startProcess_nowait(self):