Procházet zdrojové kódy

Make multicall API work properly when calling stopProcess then startProcess by being lazier about when things get done.

Chris McDonough před 19 roky
rodič
revize
180dca2838
2 změnil soubory, kde provedl 71 přidání a 46 odebrání
  1. 40 26
      src/supervisor/tests.py
  2. 31 20
      src/supervisor/xmlrpc.py

+ 40 - 26
src/supervisor/tests.py

@@ -400,8 +400,9 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         process.pid = 10
         supervisord = DummySupervisor({'foo':process})
         interface = self._makeOne(supervisord)
+        callback = interface.startProcess('foo')
         self._assertRPCError(xmlrpc.Faults.ALREADY_STARTED,
-                             interface.startProcess,'foo')
+                             callback)
 
     def test_startProcess_badname(self):
         supervisord = DummySupervisor()
@@ -417,9 +418,8 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         process.spawnerr = 'abc'
         supervisord = DummySupervisor({'foo':process})
         interface = self._makeOne(supervisord)
-        self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR,
-                             interface.startProcess,
-                             'foo')
+        callback = interface.startProcess('foo')
+        self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR, callback)
 
     def test_startProcess(self):
         options = DummyOptions()
@@ -427,13 +427,12 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         process = DummyProcess(options, config, state=ProcessStates.STOPPED)
         supervisord = DummySupervisor({'foo':process})
         interface = self._makeOne(supervisord)
-        callback = interface.startProcess('foo')
+        callback = interface.startProcess('foo', 100) # milliseconds
+        self.assertEqual(callback(), http.NOT_DONE_YET)
         self.assertEqual(process.spawned, True)
         self.assertEqual(interface.update_text, 'startProcess')
-        self.assertEqual(callback(), http.NOT_DONE_YET)
-
-        process.pid = 1234
-        self.assertEqual(callback(done=True), True)
+        time.sleep(.1) # 100 milliseconds
+        self.assertEqual(callback(), True)
 
     def test_startProcess_abnormal_term(self):
         options = DummyOptions()
@@ -441,12 +440,14 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         process = DummyProcess(options, config, ProcessStates.STOPPED)
         supervisord = DummySupervisor({'foo':process})
         interface = self._makeOne(supervisord)
-        callback = interface.startProcess('foo')
+        callback = interface.startProcess('foo', 100) # milliseconds
+        result = callback()
+        time.sleep(.1)
         self.assertEqual(process.spawned, True)
         self.assertEqual(interface.update_text, 'startProcess')
         process.state = ProcessStates.BACKOFF
         self._assertRPCError(xmlrpc.Faults.ABNORMAL_TERMINATION,
-                             callback, True)
+                             callback)
     
     def test_startProcess_badtimeout(self):
         options = DummyOptions()
@@ -465,20 +466,34 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         process2 = DummyProcess(options, config2, ProcessStates.STOPPED)
         supervisord = DummySupervisor({'foo':process, 'foo2':process2})
         interface = self._makeOne(supervisord)
-        callback = interface.startAllProcesses()
-        #process.pid = 1234
-        #process2.pid = 12345
-        # first process
+        callback = interface.startAllProcesses(200) # milliseconds
         from http import NOT_DONE_YET
-        self.assertEqual(callback(done=True), NOT_DONE_YET)
-        # second process
-        self.assertEqual(
-            callback(done=True),
-            [
-            {'name':'foo', 'status': 80, 'description': 'OK'},
-            {'name':'foo2', 'status': 80, 'description': 'OK'},
-            ]
-            )
+        from xmlrpc import Faults
+
+        # create callbacks in startall()
+        self.assertEqual(callback(), NOT_DONE_YET)
+        # start first process
+        self.assertEqual(callback(), NOT_DONE_YET)
+        # start second process
+        self.assertEqual(callback(), NOT_DONE_YET)
+
+        # wait for timeout 1
+        time.sleep(.2)
+        result = callback()
+        # wait for timeout 2
+        time.sleep(.2)
+
+        result = callback()
+
+        self.assertEqual(len(result), 2)
+        self.assertEqual(result[0]['name'], 'foo2')
+        self.assertEqual(result[0]['status'],  Faults.SUCCESS)
+        self.assertEqual(result[0]['description'], 'OK')
+
+        self.assertEqual(result[1]['name'], 'foo')
+        self.assertEqual(result[1]['status'],  Faults.SUCCESS)
+        self.assertEqual(result[1]['description'], 'OK')
+
         self.assertEqual(interface.update_text, 'startProcess')
 
         self.assertEqual(process.spawned, True)
@@ -503,9 +518,8 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         self.assertEqual(process.backoff, 0)
         self.assertEqual(process.delay, 0)
         self.assertEqual(process.killing, 0)
-        process.state = ProcessStates.STOPPING
         self.assertEqual(callback(), http.NOT_DONE_YET)
-        process.state = ProcessStates.STOPPED
+        self.assertEqual(process.state, ProcessStates.STOPPED)
         self.assertEqual(callback(), True)
         self.assertEqual(len(supervisord.processes), 1)
         self.assertEqual(interface.update_text, 'stopProcess')

+ 31 - 20
src/supervisor/xmlrpc.py

@@ -295,33 +295,39 @@ class SupervisorNamespaceRPCInterface:
         if process is None:
             raise RPCError(Faults.BAD_NAME, name)
 
-        if process.get_state() == ProcessStates.RUNNING:
-            raise RPCError(Faults.ALREADY_STARTED, name)
+        started = []
 
-        process.spawn()
+        def startit():
+            if not started:
 
-        if process.spawnerr:
-            raise RPCError(Faults.SPAWN_ERROR, name)
+                if process.get_state() == ProcessStates.RUNNING:
+                    raise RPCError(Faults.ALREADY_STARTED, name)
 
-        if not timeout:
-            timeout = 0
-        
-        milliseconds = timeout / 1000.0
-        start = time.time()
+                process.spawn()
+
+                if process.spawnerr:
+                    raise RPCError(Faults.SPAWN_ERROR, name)
 
-        def check_still_running(done=False): # done arg is only for unit testing
+                # we use a list here to fake out lexical scoping;
+                # using a direct assignment to 'started' in the
+                # function appears to not work (symptom: 2nd or 3rd
+                # call through, it forgets about 'started', claiming
+                # it's undeclared).
+                started.append(time.time())
+                
             t = time.time()
-            runtime = (t - start)
-            if not done and runtime < milliseconds:
+            runtime = (t - started[0])
+            milliseconds = timeout / 1000.0
+            if runtime < milliseconds:
                 return NOT_DONE_YET
             state = process.get_state()
             if state == ProcessStates.RUNNING:
                 return True
             raise RPCError(Faults.ABNORMAL_TERMINATION, name)
 
-        check_still_running.delay = milliseconds
-        check_still_running.rpcinterface = self
-        return check_still_running # deferred
+        startit.delay = 0.05
+        startit.rpcinterface = self
+        return startit # deferred
 
     def startAllProcesses(self, timeout=500):
         """ Start all processes listed in the configuration file
@@ -342,7 +348,7 @@ class SupervisorNamespaceRPCInterface:
         results = []
         callbacks = []
 
-        def startall(done=False): # done arg is for unit testing
+        def startall():
             if not callbacks:
 
                 for process in processes:
@@ -366,7 +372,7 @@ class SupervisorNamespaceRPCInterface:
             name, callback = callbacks.pop(0)
 
             try:
-                value = callback(done)
+                value = callback()
             except RPCError, e:
                 results.append({'name':name, 'status':e.code,
                                 'description':e.text})
@@ -407,10 +413,15 @@ class SupervisorNamespaceRPCInterface:
         if process is None:
             raise RPCError(Faults.BAD_NAME, name)
 
-        if process.get_state() != ProcessStates.RUNNING:
-            raise RPCError(Faults.NOT_RUNNING)
+        stopped = []
 
         def killit():
+            if not stopped:
+                if process.get_state() != ProcessStates.RUNNING:
+                    raise RPCError(Faults.NOT_RUNNING)
+                # use a mutable for lexical scoping; see startProcess
+                stopped.append(1)
+            
             if process.get_state() == ProcessStates.RUNNING:
                 msg = process.stop()
                 if msg is not None: