Przeglądaj źródła

full coverage for process module

Conflicts:
	supervisor/tests/test_process.py
Chris McDonough 10 lat temu
rodzic
commit
5a7bb403dd
1 zmienionych plików z 141 dodań i 7 usunięć
  1. 141 7
      supervisor/tests/test_process.py

+ 141 - 7
supervisor/tests/test_process.py

@@ -1466,7 +1466,7 @@ class ProcessGroupBaseTests(unittest.TestCase):
         group.removelogs()
         self.assertEqual(process1.logsremoved, True)
 
-    def test_cmp(self):
+    def test_ordering_and_comparison(self):
         options = DummyOptions()
         gconfig1 = DummyPGroupConfig(options)
         group1 = self._makeOne(gconfig1)
@@ -1474,13 +1474,19 @@ class ProcessGroupBaseTests(unittest.TestCase):
         gconfig2 = DummyPGroupConfig(options)
         group2 = self._makeOne(gconfig2)
 
-        group1.priority = 5
-        group2.priority = 1
+        config3 = DummyPGroupConfig(options)
+        group3 = self._makeOne(config3)
+
+        group1.config.priority = 5
+        group2.config.priority = 1
+        group3.config.priority = 5
 
         L = [group1, group2]
         L.sort()
 
         self.assertEqual(L, [group2, group1])
+        self.assertNotEqual(group1, group2)
+        self.assertEqual(group1, group3)
 
 class ProcessGroupTests(ProcessGroupBaseTests):
     def _getTargetClass(self):
@@ -1507,6 +1513,40 @@ class ProcessGroupTests(ProcessGroupBaseTests):
         group.transition()
         self.assertEqual(process1.transitioned, True)
 
+class FastCGIProcessGroupTests(unittest.TestCase):
+    def _getTargetClass(self):
+        from supervisor.process import FastCGIProcessGroup
+        return FastCGIProcessGroup
+
+    def _makeOne(self, config, **kwargs):
+        cls = self._getTargetClass()
+        return cls(config, **kwargs)
+
+    def test___init__without_socket_error(self):
+        options = DummyOptions()
+        gconfig = DummyPGroupConfig(options)
+        gconfig.socket_config = None
+        class DummySocketManager(object):
+            def __init__(self, config, logger): pass
+            def get_socket(self): pass
+        self._makeOne(gconfig, socketManager=DummySocketManager)
+        # doesn't fail with exception
+
+    def test___init__with_socket_error(self):
+        options = DummyOptions()
+        gconfig = DummyPGroupConfig(options)
+        gconfig.socket_config = None
+        class DummySocketManager(object):
+            def __init__(self, config, logger): pass
+            def get_socket(self):
+                raise KeyError(5)
+            def config(self):
+                return 'config'
+        self.assertRaises(
+            ValueError,
+            self._makeOne, gconfig, socketManager=DummySocketManager
+            )
+
 class EventListenerPoolTests(ProcessGroupBaseTests):
     def setUp(self):
         from supervisor.events import clear
@@ -1703,9 +1743,103 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         self.assertEqual(process1.listener_state, EventListenerStates.BUSY)
         self.assertEqual(process1.event, event)
 
-def test_suite():
-    return unittest.findTestCases(sys.modules[__name__])
+    def test_transition_event_proc_running_with_dispatch_throttle_notyet(self):
+        options = DummyOptions()
+        from supervisor.states import ProcessStates
+        pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
+        process1 = DummyProcess(pconfig1, state=ProcessStates.RUNNING)
+        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1])
+        pool = self._makeOne(gconfig)
+        pool.dispatch_throttle = 5
+        pool.last_dispatch = time.time()
+        pool.processes = {'process1': process1}
+        event = DummyEvent()
+        from supervisor.states import EventListenerStates
+        process1.listener_state = EventListenerStates.READY
+        class DummyGroup:
+            config = gconfig
+        process1.group = DummyGroup
+        pool._acceptEvent(event)
+        pool.transition()
+        self.assertEqual(process1.transitioned, True)
+        self.assertEqual(pool.event_buffer, [event]) # not popped
 
-if __name__ == '__main__':
-    unittest.main(defaultTest='test_suite')
+    def test_transition_event_proc_running_with_dispatch_throttle_ready(self):
+        options = DummyOptions()
+        from supervisor.states import ProcessStates
+        pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
+        process1 = DummyProcess(pconfig1, state=ProcessStates.RUNNING)
+        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1])
+        pool = self._makeOne(gconfig)
+        pool.dispatch_throttle = 5
+        pool.last_dispatch = time.time() - 1000
+        pool.processes = {'process1': process1}
+        event = DummyEvent()
+        from supervisor.states import EventListenerStates
+        process1.listener_state = EventListenerStates.READY
+        class DummyGroup:
+            config = gconfig
+        process1.group = DummyGroup
+        pool._acceptEvent(event)
+        pool.transition()
+        self.assertEqual(process1.transitioned, True)
+        self.assertEqual(pool.event_buffer, [])
+        header, payload = process1.stdin_buffer.split('\n', 1)
+        self.assertEqual(payload, 'dummy event', payload)
+        self.assertEqual(process1.listener_state, EventListenerStates.BUSY)
+        self.assertEqual(process1.event, event)
 
+    def test__dispatchEvent_notready(self):
+        options = DummyOptions()
+        from supervisor.states import ProcessStates
+        pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
+        process1 = DummyProcess(pconfig1, state=ProcessStates.STOPPED)
+        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1])
+        pool = self._makeOne(gconfig)
+        pool.processes = {'process1': process1}
+        event = DummyEvent()
+        pool._acceptEvent(event)
+        self.assertEqual(pool._dispatchEvent(event), False)
+
+    def test__dispatchEvent_proc_write_raises_non_EPIPE_OSError(self):
+        options = DummyOptions()
+        from supervisor.states import ProcessStates
+        pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
+        process1 = DummyProcess(pconfig1, state=ProcessStates.RUNNING)
+        def raise_epipe(envelope):
+            raise OSError(errno.EAGAIN)
+        process1.write = raise_epipe
+        gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1])
+        pool = self._makeOne(gconfig)
+        pool.processes = {'process1': process1}
+        event = DummyEvent()
+        from supervisor.states import EventListenerStates
+        process1.listener_state = EventListenerStates.READY
+        class DummyGroup:
+            config = gconfig
+        process1.group = DummyGroup
+        pool._acceptEvent(event)
+        self.assertRaises(OSError, pool._dispatchEvent, event)
+
+class test_new_serial(unittest.TestCase):
+    def _callFUT(self, inst):
+        from supervisor.process import new_serial
+        return new_serial(inst)
+
+    def test_inst_serial_is_maxint(self):
+        class Inst(object):
+            def __init__(self):
+                self.serial = sys.maxint
+        inst = Inst()
+        result = self._callFUT(inst)
+        self.assertEqual(inst.serial, 0)
+        self.assertEqual(result, 0)
+
+    def test_inst_serial_is_not_maxint(self):
+        class Inst(object):
+            def __init__(self):
+                self.serial = 1
+        inst = Inst()
+        result = self._callFUT(inst)
+        self.assertEqual(inst.serial, 2)
+        self.assertEqual(result, 2)