Quellcode durchsuchen

Add a knob for throttling dispatch calls.

Chris McDonough vor 18 Jahren
Ursprung
Commit
cbeb36f3ab
2 geänderte Dateien mit 18 neuen und 9 gelöschten Zeilen
  1. 17 3
      src/supervisor/process.py
  2. 1 6
      src/supervisor/tests/test_process.py

+ 17 - 3
src/supervisor/process.py

@@ -573,6 +573,8 @@ class EventListenerPool(ProcessGroupBase):
             events.subscribe(event_type, self._acceptEvent)
         events.subscribe(events.EventRejectedEvent, self.handle_rejected)
         self.serial = -1
+        self.last_dispatch = 0
+        self.dispatch_throttle = 0 # in seconds: .00195 is an interesting one
 
     def handle_rejected(self, event):
         process = event.process
@@ -583,9 +585,20 @@ class EventListenerPool(ProcessGroupBase):
 
     def transition(self):
         processes = self.processes.values()
-        for proc in processes:
-            proc.transition()
-        self.dispatch()
+        dispatch_capable = False
+        for process in processes:
+            process.transition()
+            # this is redundant, we do it in _dispatchEvent too, but we
+            # want to reduce function call overhead
+            if process.state == ProcessStates.RUNNING:
+                if process.listener_state == EventListenerStates.READY:
+                    dispatch_capable = True
+        if dispatch_capable:
+            if self.dispatch_throttle:
+                now = time.time()
+                if now - self.last_dispatch < self.dispatch_throttle:
+                    return
+            self.dispatch()
 
     def dispatch(self):
         while self.event_buffer:
@@ -597,6 +610,7 @@ class EventListenerPool(ProcessGroupBase):
                 # to process any further events in the buffer
                 self._acceptEvent(event, head=True)
                 break
+        self.last_dispatch = time.time()
 
     def _acceptEvent(self, event, head=False):
         # events are required to be instances

+ 1 - 6
src/supervisor/tests/test_process.py

@@ -1306,7 +1306,7 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
             '<supervisor.process.EventListenerPool instance at'))
         self.assertTrue(s.endswith('named whatever>'))
 
-    def test_transition_nobody_listenening(self):
+    def test_transition_nobody_ready(self):
         options = DummyOptions()
         from supervisor.states import ProcessStates
         pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
@@ -1324,8 +1324,6 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         self.assertEqual(process1.transitioned, True)
         self.assertEqual(pool.event_buffer, [event])
         data = pool.config.options.logger.data
-        self.assertEqual(data[0],
-                         'rebuffering event a for pool whatever (bufsize 0)')
     
     def test_transition_event_proc_not_running(self):
         options = DummyOptions()
@@ -1346,9 +1344,6 @@ class EventListenerPoolTests(ProcessGroupBaseTests):
         self.assertEqual(pool.event_buffer, [event])
         self.assertEqual(process1.stdin_buffer, '')
         self.assertEqual(process1.listener_state, EventListenerStates.READY)
-        data = pool.config.options.logger.data
-        self.assertEqual(data[0],
-                         'rebuffering event 1 for pool whatever (bufsize 0)')
 
     def test_transition_event_proc_running(self):
         options = DummyOptions()