Prechádzať zdrojové kódy

Added a new event type, REMOTE_COMMUNICATION, that is emitted by a new
RPC method, supervisor.sendRemoteCommEvent().

Mike Naberezny 16 rokov pred
rodič
commit
d9248ac010

+ 3 - 0
CHANGES.txt

@@ -1,5 +1,8 @@
 Next Release
 
+  - Added a new event type, REMOTE_COMMUNICATION, that is emitted by a new
+    RPC method, supervisor.sendRemoteCommEvent().
+
   - Patch for bug #268 (KeyError on 'here' expansion for stdout/stderr_logfile)
     from David E. Kindred.
 

+ 9 - 0
src/supervisor/events.py

@@ -58,6 +58,14 @@ class ProcessCommunicationStdoutEvent(ProcessCommunicationEvent):
 class ProcessCommunicationStderrEvent(ProcessCommunicationEvent):
     channel = 'stderr'
 
+class RemoteCommunicationEvent(Event):
+    def __init__(self, type, data):
+        self.type = type
+        self.data = data
+
+    def __str__(self):
+        return 'type:%s\n%s' % (self.type, self.data)
+
 class SupervisorStateChangeEvent(Event):
     """ Abstract class """
     def __str__(self):
@@ -167,6 +175,7 @@ class EventTypes:
     PROCESS_COMMUNICATION = ProcessCommunicationEvent # abstract
     PROCESS_COMMUNICATION_STDOUT = ProcessCommunicationStdoutEvent
     PROCESS_COMMUNICATION_STDERR = ProcessCommunicationStderrEvent
+    REMOTE_COMMUNICATION = RemoteCommunicationEvent
     SUPERVISOR_STATE_CHANGE = SupervisorStateChangeEvent # abstract
     SUPERVISOR_STATE_CHANGE_RUNNING = SupervisorRunningEvent
     SUPERVISOR_STATE_CHANGE_STOPPING = SupervisorStoppingEvent

+ 19 - 0
src/supervisor/rpcinterface.py

@@ -758,6 +758,25 @@ class SupervisorNamespaceRPCInterface:
 
         return True
 
+    def sendRemoteCommEvent(self, type, data):
+        """ Send an event that will be received by event listener 
+        subprocesses subscribing to the RemoteCommunicationEvent.
+        
+        @param  string  type  String for the "type" key in the event header
+        @param  string  data  Data for the event body
+        @return boolean       Always return True unless error
+        """
+        if isinstance(type, unicode):
+            type = type.encode('utf-8')
+        if isinstance(data, unicode):
+            data = data.encode('utf-8')
+
+        import events
+        events.notify(
+            events.RemoteCommunicationEvent(type, data)
+        )
+        
+
 def make_allfunc(processes, predicate, func, **extra_kwargs):
     """ Return a closure representing a function that calls a
     function for every process, and returns a result """

+ 13 - 0
src/supervisor/tests/test_events.py

@@ -88,6 +88,12 @@ class TestEventTypes(unittest.TestCase):
         self.assertEqual(inst.data, 3)
         self.assertEqual(inst.channel, 'stderr')
 
+    def test_RemoteCommunicationEvent(self):
+        from supervisor.events import RemoteCommunicationEvent
+        inst = RemoteCommunicationEvent(1, 2)
+        self.assertEqual(inst.type, 1)
+        self.assertEqual(inst.data, 2)
+
     # nothing to test for SupervisorStateChangeEvent and subtypes
 
     def test_EventRejectedEvent(self):
@@ -185,6 +191,13 @@ class TestSerializations(unittest.TestCase):
         self.assertEqual(headers['pid'], '1', headers)
         self.assertEqual(payload, 'yo')
 
+    def test_remote_comm_event(self):
+        from supervisor.events import RemoteCommunicationEvent
+        event = RemoteCommunicationEvent('foo', 'bar')
+        headers, payload = self._deserialize(str(event))
+        self.assertEqual(headers['type'], 'foo', headers)
+        self.assertEqual(payload, 'bar')
+
     def test_process_state_events_without_extra_values(self):
         from supervisor.states import ProcessStates
         from supervisor import events

+ 45 - 0
src/supervisor/tests/test_rpcinterfaces.py

@@ -1580,6 +1580,51 @@ class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
         process1 = supervisord.process_groups['process1'].processes['process1']
         self.assertEqual(process1.stdin_buffer, 'fi\xc3\xad')
 
+    def test_sendRemoteCommEvent_notifies_subscribers(self):
+        options = DummyOptions()
+        supervisord = DummySupervisor(options)
+        interface = self._makeOne(supervisord)
+
+        from supervisor import events
+        L = []
+        def callback(event):
+            L.append(event)
+        
+        try:
+            events.callbacks[:] = [(events.RemoteCommunicationEvent, callback)]
+            interface.sendRemoteCommEvent('foo', 'bar')
+        finally:
+            events.callbacks[:] = []
+            events.clear()
+
+        self.assertEqual(len(L), 1)
+        event = L[0]                                     
+        self.assertEqual(event.type, 'foo')
+        self.assertEqual(event.data, 'bar')
+
+    def test_sendRemoteCommEvent_unicode_encoded_to_utf8(self):
+        options = DummyOptions()
+        supervisord = DummySupervisor(options)
+        interface = self._makeOne(supervisord)
+
+        from supervisor import events
+        L = []
+        def callback(event):
+            L.append(event)
+        
+        try:
+            events.callbacks[:] = [(events.RemoteCommunicationEvent, callback)]
+            interface.sendRemoteCommEvent(u'fi\xed once', u'fi\xed twice')
+        finally:
+            events.callbacks[:] = []
+            events.clear()
+
+        self.assertEqual(len(L), 1)
+        event = L[0]                                     
+        self.assertEqual(event.type, 'fi\xc3\xad once')
+        self.assertEqual(event.data, 'fi\xc3\xad twice')
+        
+
 class SystemNamespaceXMLRPCInterfaceTests(TestBase):
     def _getTargetClass(self):
         from supervisor import xmlrpc