Browse Source

Add a test for make_dispatchers()

Mike Naberezny 10 years ago
parent
commit
c4fdada410
1 changed files with 48 additions and 0 deletions
  1. 48 0
      supervisor/tests/test_options.py

+ 48 - 0
supervisor/tests/test_options.py

@@ -2647,6 +2647,54 @@ class TestProcessConfig(unittest.TestCase):
             self.assertEqual(pipes['stdout'], 5)
             self.assertEqual(pipes['stderr'], None)
 
+class EventListenerConfigTests(unittest.TestCase):
+    def _getTargetClass(self):
+        from supervisor.options import EventListenerConfig
+        return EventListenerConfig
+
+    def _makeOne(self, *arg, **kw):
+        defaults = {}
+        for name in ('name', 'command', 'directory', 'umask',
+                     'priority', 'autostart', 'autorestart',
+                     'startsecs', 'startretries', 'uid',
+                     'stdout_logfile', 'stdout_capture_maxbytes',
+                     'stdout_events_enabled', 'stdout_syslog',
+                     'stderr_logfile', 'stderr_capture_maxbytes',
+                     'stderr_events_enabled', 'stderr_syslog',
+                     'stopsignal', 'stopwaitsecs', 'stopasgroup',
+                     'killasgroup', 'exitcodes', 'redirect_stderr',
+                     'environment'):
+            defaults[name] = name
+        for name in ('stdout_logfile_backups', 'stdout_logfile_maxbytes',
+                     'stderr_logfile_backups', 'stderr_logfile_maxbytes'):
+            defaults[name] = 10
+        defaults.update(kw)
+        return self._getTargetClass()(*arg, **defaults)
+
+    def test_make_dispatchers(self):
+        options = DummyOptions()
+        instance = self._makeOne(options)
+        with tempfile.NamedTemporaryFile() as stdout_logfile:
+            with tempfile.NamedTemporaryFile() as stderr_logfile:
+                instance.stdout_logfile = stdout_logfile.name
+                instance.stderr_logfile = stderr_logfile.name
+                instance.redirect_stderr = False
+                process1 = DummyProcess(instance)
+                dispatchers, pipes = instance.make_dispatchers(process1)
+                self.assertEqual(dispatchers[4].channel, 'stdin')
+                self.assertEqual(dispatchers[4].closed, False)
+                self.assertEqual(dispatchers[5].channel, 'stdout')
+                from supervisor.states import EventListenerStates
+                self.assertEqual(dispatchers[5].process.listener_state,
+                                 EventListenerStates.ACKNOWLEDGED)
+                self.assertEqual(pipes['stdout'], 5)
+                self.assertEqual(dispatchers[7].channel, 'stderr')
+                from supervisor.events import ProcessCommunicationStderrEvent
+                self.assertEqual(dispatchers[7].event_type,
+                                 ProcessCommunicationStderrEvent)
+                self.assertEqual(pipes['stderr'], 7)
+
+
 class FastCGIProcessConfigTest(unittest.TestCase):
     def _getTargetClass(self):
         from supervisor.options import FastCGIProcessConfig