Browse Source

Add 'startProcessGroup' and 'stopProcessGroup'

Chris McDonough 18 năm trước cách đây
mục cha
commit
576f19d21d
2 tập tin đã thay đổi với 304 bổ sung20 xóa
  1. 174 20
      src/supervisor/rpcinterface.py
  2. 130 0
      src/supervisor/tests/test_rpcinterfaces.py

+ 174 - 20
src/supervisor/rpcinterface.py

@@ -19,6 +19,10 @@ from supervisor.states import getSupervisorStateDescription
 from supervisor.states import ProcessStates
 from supervisor.states import getProcessStateDescription
 
+RUNNING_STATES = (ProcessStates.RUNNING,
+                  ProcessStates.BACKOFF,
+                  ProcessStates.STARTING)
+
 API_VERSION  = '2.0'
 
 class SupervisorNamespaceRPCInterface:
@@ -183,7 +187,6 @@ class SupervisorNamespaceRPCInterface:
         
         return group, process
         
-
     def startProcess(self, name, wait=True):
         """ Start a process
 
@@ -208,14 +211,10 @@ class SupervisorNamespaceRPCInterface:
 
         startsecs = process.config.startsecs
 
-        running_states = (ProcessStates.RUNNING,
-                          ProcessStates.BACKOFF,
-                          ProcessStates.STARTING)
-
         def startit():
             if not started:
 
-                if process.get_state() in running_states:
+                if process.get_state() in RUNNING_STATES:
                     raise RPCError(Faults.ALREADY_STARTED, name)
 
                 process.spawn()
@@ -252,6 +251,82 @@ class SupervisorNamespaceRPCInterface:
         startit.rpcinterface = self
         return startit # deferred
 
+    def startProcessGroup(self, name, wait=True):
+        """ Start all processes in the group named 'name'
+
+        @param string name        The group name
+        @param boolean wait       Wait for each process to be fully started
+        @return struct result     A structure containing start statuses
+        """
+        self._update('startProcessGroup')
+
+        group = self.supervisord.process_groups.get(name)
+
+        if group is None:
+            raise RPCError(Faults.BAD_NAME, name)
+
+        processes = group.processes.values()
+        processes.sort()
+            
+        results = []
+        callbacks = []
+
+        def startall(group=group):
+            if not callbacks:
+
+                for process in processes:
+                    name = make_namespec(group.config.name, process.config.name)
+                    if process.get_state() not in RUNNING_STATES:
+                        # only start nonrunning processes
+                        try:
+                            callback = self.startProcess(name, wait)
+                            callbacks.append((group, process, callback))
+                        except RPCError, e:
+                            results.append({'name':process.config.name,
+                                            'group':group.config.name,
+                                            'status':e.code,
+                                            'description':e.text})
+                            continue
+
+            if not callbacks:
+                return results
+
+            group, process, callback = callbacks.pop(0)
+
+            try:
+                value = callback()
+            except RPCError, e:
+                results.append({'name':process.config.name,
+                                'group':group.config.name,
+                                'status':e.code,
+                                'description':e.text})
+                return NOT_DONE_YET
+
+            if value is NOT_DONE_YET:
+                # push it back into the queue; it will finish eventually
+                callbacks.append((group, process, callback))
+            else:
+                results.append({'name':process.config.name,
+                                'group':group.config.name,
+                                'status':Faults.SUCCESS,
+                                'description':'OK'})
+
+            if callbacks:
+                return NOT_DONE_YET
+
+            return results
+
+        # XXX the above implementation has a weakness inasmuch as the
+        # first call into each individual process callback will always
+        # return NOT_DONE_YET, so they need to be called twice.  The
+        # symptom of this is that calling this method causes the
+        # client to block for much longer than it actually requires to
+        # start all of the nonrunning processes.  See stopAllProcesses
+
+        startall.delay = 0.05
+        startall.rpcinterface = self
+        return startall # deferred
+
     def startAllProcesses(self, wait=True):
         """ Start all processes listed in the configuration file
 
@@ -264,16 +339,13 @@ class SupervisorNamespaceRPCInterface:
 
         results = []
         callbacks = []
-        running_states = (ProcessStates.RUNNING,
-                          ProcessStates.BACKOFF,
-                          ProcessStates.STARTING)
 
         def startall():
             if not callbacks:
 
                 for group, process in all_processes:
                     name = make_namespec(group.config.name, process.config.name)
-                    if process.get_state() not in running_states:
+                    if process.get_state() not in RUNNING_STATES:
                         # only start nonrunning processes
                         try:
                             callback = self.startProcess(name, wait)
@@ -337,13 +409,9 @@ class SupervisorNamespaceRPCInterface:
         stopped = []
         called  = []
 
-        running_states = (ProcessStates.RUNNING,
-                          ProcessStates.STARTING,
-                          ProcessStates.BACKOFF)
-
         def killit():
             if not called:
-                if process.get_state() not in running_states:
+                if process.get_state() not in RUNNING_STATES:
                     raise RPCError(Faults.NOT_RUNNING)
                 # use a mutable for lexical scoping; see startProcess
                 called.append(1)
@@ -365,6 +433,96 @@ class SupervisorNamespaceRPCInterface:
         killit.rpcinterface = self
         return killit # deferred
 
+    def stopProcessGroup(self, name):
+        """ Stop all processes in the process group named 'name'
+
+        @param string name  The group name
+        @return boolean result Always return true unless error.
+        """
+        self._update('stopProcessGroup')
+        
+        group = self.supervisord.process_groups.get(name)
+
+        if group is None:
+            raise RPCError(Faults.BAD_NAME, name)
+
+        processes = group.processes.values()
+        processes.sort()
+            
+        callbacks = []
+        results = []
+
+        def killall(group=group):
+            if not callbacks:
+
+                for process in processes:
+                    name = make_namespec(group.config.name, process.config.name)
+                    if process.get_state() in RUNNING_STATES:
+                        # only stop running processes
+                        try:
+                            callback = self.stopProcess(name)
+                            callbacks.append((group, process, callback))
+                        except RPCError, e:
+                            results.append({'name':process.config.name,
+                                            'group':group.config.name,
+                                            'status':e.code,
+                                            'description':e.text})
+                            continue
+
+            if not callbacks:
+                return results
+
+            group, process, callback = callbacks.pop(0)
+
+            try:
+                value = callback()
+            except RPCError, e:
+                results.append(
+                    {'name':process.config.name,
+                     'group':group.config.name,
+                     'status':e.code,
+                     'description':e.text})
+                return NOT_DONE_YET
+            
+            if value is NOT_DONE_YET:
+                # push it back into the queue; it will finish eventually
+                callbacks.append((group, process, callback))
+            else:
+                results.append(
+                    {'name':process.config.name,
+                     'group':group.config.name,
+                     'status':Faults.SUCCESS,
+                     'description':'OK'}
+                    )
+
+            if callbacks:
+                return NOT_DONE_YET
+
+            return results
+
+        # XXX the above implementation has a weakness inasmuch as the
+        # first call into each individual process callback will always
+        # return NOT_DONE_YET, so they need to be called twice.  The
+        # symptom of this is that calling this method causes the
+        # client to block for much longer than it actually requires to
+        # kill all of the running processes.  After the first call to
+        # the killit callback, the process is actually dead, but the
+        # above killall method processes the callbacks one at a time
+        # during the select loop, which, because there is no output
+        # from child processes after stopAllProcesses is called, is
+        # not busy, so hits the timeout for each callback.  I
+        # attempted to make this better, but the only way to make it
+        # better assumes totally synchronous reaping of child
+        # processes, which requires infrastructure changes to
+        # supervisord that are scary at the moment as it could take a
+        # while to pin down all of the platform differences and might
+        # require a C extension to the Python signal module to allow
+        # the setting of ignore flags to signals.
+
+        killall.delay = 0.05
+        killall.rpcinterface = self
+        return killall # deferred
+
     def stopAllProcesses(self):
         """ Stop all processes in the process list
 
@@ -377,16 +535,12 @@ class SupervisorNamespaceRPCInterface:
         callbacks = []
         results = []
 
-        running_states = (ProcessStates.RUNNING,
-                          ProcessStates.STARTING,
-                          ProcessStates.BACKOFF)
-
         def killall():
             if not callbacks:
 
                 for group, process in all_processes:
                     name = make_namespec(group.config.name, process.config.name)
-                    if process.get_state() in running_states:
+                    if process.get_state() in RUNNING_STATES:
                         # only stop running processes
                         try:
                             callback = self.stopProcess(name)

+ 130 - 0
src/supervisor/tests/test_rpcinterfaces.py

@@ -367,6 +367,101 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         from supervisor import xmlrpc
         self._assertRPCError(xmlrpc.Faults.ABNORMAL_TERMINATION, callback)
 
+    def test_startProcessGroup(self):
+        options = DummyOptions()
+        pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1,
+                                startsecs=.01)
+        pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2,
+                                startsecs=.01)
+        supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1,
+                                               pconfig2)
+        from supervisor.process import ProcessStates
+        supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED)
+        supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED)
+        interface = self._makeOne(supervisord)
+        callback = interface.startProcessGroup('foo')
+
+        from supervisor.http import NOT_DONE_YET
+
+        # create callbacks in startall()
+        self.assertEqual(callback(), NOT_DONE_YET)
+        # start first process
+        self.assertEqual(callback(), NOT_DONE_YET)
+        # start second process
+        self.assertEqual(callback(), NOT_DONE_YET)
+
+        # wait for timeout 1
+        time.sleep(.02)
+        self.assertEqual(callback(), NOT_DONE_YET)
+
+        # wait for timeout 2
+        time.sleep(.02)
+        result = callback()
+
+        self.assertEqual(len(result), 2)
+
+        from supervisor.xmlrpc import Faults
+
+        # XXX not sure about this ordering, I think process1 should
+        # probably show up first
+        self.assertEqual(result[0]['name'], 'process2')
+        self.assertEqual(result[0]['group'], 'foo')
+        self.assertEqual(result[0]['status'],  Faults.SUCCESS)
+        self.assertEqual(result[0]['description'], 'OK')
+
+        self.assertEqual(result[1]['name'], 'process1')
+        self.assertEqual(result[1]['group'], 'foo')
+        self.assertEqual(result[1]['status'],  Faults.SUCCESS)
+        self.assertEqual(result[1]['description'], 'OK')
+
+        self.assertEqual(interface.update_text, 'startProcess')
+
+        process1 = supervisord.process_groups['foo'].processes['process1']
+        self.assertEqual(process1.spawned, True)
+        process2 = supervisord.process_groups['foo'].processes['process2']
+        self.assertEqual(process2.spawned, True)
+
+    def test_startProcessGroup_nowait(self):
+        options = DummyOptions()
+        pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1,
+                                startsecs=.01)
+        pconfig2 = DummyPConfig(options, 'process2', __file__, priority=2,
+                                startsecs=.01)
+        supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1,
+                                               pconfig2)
+        from supervisor.process import ProcessStates
+        supervisord.set_procattr('process1', 'state', ProcessStates.STOPPED)
+        supervisord.set_procattr('process2', 'state', ProcessStates.STOPPED)
+        interface = self._makeOne(supervisord)
+        callback = interface.startProcessGroup('foo', wait=False)
+        from supervisor.http import NOT_DONE_YET
+        from supervisor.xmlrpc import Faults
+
+        # create callbacks in startall()
+        self.assertEqual(callback(), NOT_DONE_YET)
+
+        # get a result
+        result = callback()
+
+        self.assertEqual(len(result), 2)
+        self.assertEqual(result[0]['name'], 'process1')
+        self.assertEqual(result[0]['group'], 'foo')
+        self.assertEqual(result[0]['status'],  Faults.SUCCESS)
+        self.assertEqual(result[0]['description'], 'OK')
+
+        self.assertEqual(result[1]['name'], 'process2')
+        self.assertEqual(result[1]['group'], 'foo')
+        self.assertEqual(result[1]['status'],  Faults.SUCCESS)
+        self.assertEqual(result[1]['description'], 'OK')
+
+    def test_startProcessGroup_badname(self):
+        from supervisor import xmlrpc
+        supervisord = DummySupervisor()
+        interface = self._makeOne(supervisord)
+        self._assertRPCError(xmlrpc.Faults.BAD_NAME,
+                             interface.startProcessGroup, 'foo')
+
+
     def test_startAllProcesses(self):
         options = DummyOptions()
         pconfig1 = DummyPConfig(options, 'process1', __file__, priority=1,
@@ -482,6 +577,41 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         self.assertEqual(len(supervisord.process_groups['foo'].processes), 1)
         self.assertEqual(interface.update_text, 'stopProcess')
 
+    def test_stopProcessGroup(self):
+        options = DummyOptions()
+        pconfig1 = DummyPConfig(options, 'process1', '/bin/foo')
+        pconfig2 = DummyPConfig(options, 'process2', '/bin/foo2')
+        from supervisor.process import ProcessStates
+        supervisord = PopulatedDummySupervisor(options, 'foo', pconfig1,
+                                               pconfig2)
+        supervisord.set_procattr('process1', 'state', ProcessStates.RUNNING)
+        supervisord.set_procattr('process2', 'state', ProcessStates.RUNNING)
+        interface = self._makeOne(supervisord)
+        callback = interface.stopProcessGroup('foo')
+        self.assertEqual(interface.update_text, 'stopProcessGroup')
+        from supervisor import http
+        value = http.NOT_DONE_YET
+        while 1:
+            value = callback()
+            if value is not http.NOT_DONE_YET:
+                break
+
+        self.assertEqual(value, [
+            {'status':80,'group':'foo','name': 'process1','description': 'OK'},
+            {'status':80,'group':'foo','name': 'process2','description': 'OK'},
+            ] )
+        process1 = supervisord.process_groups['foo'].processes['process1']
+        self.assertEqual(process1.stop_called, True)
+        process2 = supervisord.process_groups['foo'].processes['process2']
+        self.assertEqual(process2.stop_called, True)
+
+    def test_stopProcessGroup_badname(self):
+        from supervisor import xmlrpc
+        supervisord = DummySupervisor()
+        interface = self._makeOne(supervisord)
+        self._assertRPCError(xmlrpc.Faults.BAD_NAME,
+                             interface.stopProcessGroup, 'foo')
+
     def test_stopAllProcesses(self):
         options = DummyOptions()
         pconfig1 = DummyPConfig(options, 'process1', '/bin/foo')