|
@@ -101,19 +101,27 @@ class TestSerializations(unittest.TestCase):
|
|
|
pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
|
|
|
process1 = DummyProcess(pconfig1)
|
|
|
from supervisor.events import ProcessCommunicationStdoutEvent
|
|
|
+ class DummyGroup:
|
|
|
+ config = pconfig1
|
|
|
+ process1.group = DummyGroup
|
|
|
event = ProcessCommunicationStdoutEvent(process1, 'yo')
|
|
|
self.assertEqual(str(event),
|
|
|
- 'process_name: process1\nchannel: stdout\nyo'
|
|
|
+ 'process_name: process1\ngroup_name: process1\n'
|
|
|
+ 'channel: stdout\nyo'
|
|
|
)
|
|
|
|
|
|
def test_pcomm_stderr_event(self):
|
|
|
options = DummyOptions()
|
|
|
pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
|
|
|
process1 = DummyProcess(pconfig1)
|
|
|
+ class DummyGroup:
|
|
|
+ config = pconfig1
|
|
|
+ process1.group = DummyGroup
|
|
|
from supervisor.events import ProcessCommunicationStderrEvent
|
|
|
event = ProcessCommunicationStderrEvent(process1, 'yo')
|
|
|
self.assertEqual(str(event),
|
|
|
- 'process_name: process1\nchannel: stderr\nyo'
|
|
|
+ 'process_name: process1\ngroup_name: process1\n'
|
|
|
+ 'channel: stderr\nyo'
|
|
|
)
|
|
|
|
|
|
def test_overflow_event(self):
|
|
@@ -121,6 +129,9 @@ class TestSerializations(unittest.TestCase):
|
|
|
options = DummyOptions()
|
|
|
pconfig1 = DummyPConfig(options, 'foo', 'process1','/bin/process1')
|
|
|
process1 = DummyProcess(pconfig1)
|
|
|
+ class DummyGroup:
|
|
|
+ config = pconfig1
|
|
|
+ process1.group = DummyGroup
|
|
|
wrapped = events.ProcessCommunicationStderrEvent(process1, 'yo')
|
|
|
event = events.EventBufferOverflowEvent(process1, wrapped)
|
|
|
self.assertEqual(str(event), 'group_name: foo\nevent_type: None')
|
|
@@ -130,8 +141,12 @@ class TestSerializations(unittest.TestCase):
|
|
|
options = DummyOptions()
|
|
|
pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
|
|
|
process1 = DummyProcess(pconfig1)
|
|
|
+ class DummyGroup:
|
|
|
+ config = pconfig1
|
|
|
+ process1.group = DummyGroup
|
|
|
event = events.StartingFromStoppedEvent(process1)
|
|
|
- self.assertEqual(str(event), 'process_name: process1')
|
|
|
+ self.assertEqual(str(event),
|
|
|
+ 'process_name: process1\ngroup_name: process1')
|
|
|
|
|
|
def test_supervisor_sc_event(self):
|
|
|
from supervisor import events
|