Jelajahi Sumber

full coverage for signalProcess

Chris McDonough 10 tahun lalu
induk
melakukan
a995fbe719
1 mengubah file dengan 42 tambahan dan 0 penghapusan
  1. 42 0
      supervisor/tests/test_rpcinterfaces.py

+ 42 - 0
supervisor/tests/test_rpcinterfaces.py

@@ -853,6 +853,48 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         p = supervisord.process_groups[supervisord.group_name].processes['foo']
         self.assertEqual(p.sent_signal, 10 )
 
+    def test_signalProcess_badsignal(self):
+        options = DummyOptions()
+        pconfig = DummyPConfig(options, 'foo', '/bin/foo')
+        from supervisor.process import ProcessStates
+        from supervisor import xmlrpc
+        supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
+        supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING)
+        interface = self._makeOne(supervisord)
+        self._assertRPCError(
+            xmlrpc.Faults.BAD_SIGNAL, interface.signalProcess, 'foo', 155
+            )
+
+    def test_signalProcess_notrunning(self):
+        options = DummyOptions()
+        pconfig = DummyPConfig(options, 'foo', '/bin/foo')
+        from supervisor.process import ProcessStates
+        from supervisor import xmlrpc
+        supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
+        supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED)
+        interface = self._makeOne(supervisord)
+        self._assertRPCError(
+            xmlrpc.Faults.NOT_RUNNING, interface.signalProcess, 'foo', 10
+            )
+
+    def test_signalProcess_signal_returns_message(self):
+        options = DummyOptions()
+        pconfig = DummyPConfig(options, 'foo', '/bin/foo')
+        from supervisor.process import ProcessStates
+        from supervisor import xmlrpc
+        supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
+        supervisord.set_procattr('foo', 'state', ProcessStates.RUNNING)
+        def signalreturn(sig):
+            return 'msg'
+        pgroup = supervisord.process_groups[supervisord.group_name]
+        proc = pgroup.processes['foo']
+        proc.signal = signalreturn
+        interface = self._makeOne(supervisord)
+        self._assertRPCError(
+            xmlrpc.Faults.FAILED, interface.signalProcess, 'foo', 10
+            )
+
+
     def test_signalProcess_withgroup(self):
         """ Test that sending foo:* works """
         options = DummyOptions()