test_supervisord.py 8.2 KB


  1. import unittest
  2. import time
  3. import signal
  4. import sys
  5. from supervisor.tests.base import DummyOptions
  6. from supervisor.tests.base import DummyPConfig
  7. from supervisor.tests.base import DummyPGroupConfig
  8. from supervisor.tests.base import DummyProcess
  9. from supervisor.tests.base import DummyProcessGroup
  10. class SupervisordTests(unittest.TestCase):
  11. def _getTargetClass(self):
  12. from supervisor.supervisord import Supervisor
  13. return Supervisor
  14. def _makeOne(self, options):
  15. return self._getTargetClass()(options)
  16. def test_main(self):
  17. options = DummyOptions()
  18. pconfig = DummyPConfig(options, 'foo', 'foo', '/bin/foo')
  19. gconfigs = [DummyPGroupConfig(options,'foo', pconfigs=[pconfig])]
  20. options.process_group_configs = gconfigs
  21. supervisord = self._makeOne(options)
  22. supervisord.main(args='abc', test=True, first=True)
  23. self.assertEqual(options.realizeargs, 'abc')
  24. self.assertEqual(options.environment_processed, True)
  25. self.assertEqual(options.fds_cleaned_up, True)
  26. self.assertEqual(options.rlimits_set, True)
  27. self.assertEqual(options.make_logger_messages,
  28. (['setuid_called'], ['rlimits_set']))
  29. self.assertEqual(options.autochildlogdir_cleared, True)
  30. self.assertEqual(len(supervisord.process_groups), 1)
  31. self.assertEqual(supervisord.process_groups['foo'].config.options,
  32. options)
  33. self.assertEqual(options.environment_processed, True)
  34. self.assertEqual(options.httpserver_opened, True)
  35. self.assertEqual(options.signals_set, True)
  36. self.assertEqual(options.daemonized, True)
  37. self.assertEqual(options.pidfile_written, True)
  38. self.assertEqual(options.cleaned_up, True)
  39. def test_get_state(self):
  40. from supervisor.supervisord import SupervisorStates
  41. options = DummyOptions()
  42. supervisord = self._makeOne(options)
  43. self.assertEqual(supervisord.get_state(), SupervisorStates.ACTIVE)
  44. supervisord.mood = -1
  45. self.assertEqual(supervisord.get_state(), SupervisorStates.SHUTDOWN)
  46. def test_reap(self):
  47. options = DummyOptions()
  48. options.waitpid_return = 1, 1
  49. pconfig = DummyPConfig(options, 'process', 'process', '/bin/process1')
  50. process = DummyProcess(pconfig)
  51. process.drained = False
  52. process.killing = 1
  53. process.laststop = None
  54. process.waitstatus = None, None
  55. options.pidhistory = {1:process}
  56. supervisord = self._makeOne(options)
  57. supervisord.reap(once=True)
  58. self.assertEqual(process.finished, (1,1))
  59. def test_handle_sigterm(self):
  60. options = DummyOptions()
  61. options.signal = signal.SIGTERM
  62. supervisord = self._makeOne(options)
  63. supervisord.handle_signal()
  64. self.assertEqual(supervisord.mood, -1)
  65. self.assertEqual(options.logger.data[0],
  66. 'received SIGTERM indicating exit request')
  67. def test_handle_sigint(self):
  68. options = DummyOptions()
  69. options.signal = signal.SIGINT
  70. supervisord = self._makeOne(options)
  71. supervisord.handle_signal()
  72. self.assertEqual(supervisord.mood, -1)
  73. self.assertEqual(options.logger.data[0],
  74. 'received SIGINT indicating exit request')
  75. def test_handle_sigquit(self):
  76. options = DummyOptions()
  77. options.signal = signal.SIGQUIT
  78. supervisord = self._makeOne(options)
  79. supervisord.handle_signal()
  80. self.assertEqual(supervisord.mood, -1)
  81. self.assertEqual(options.logger.data[0],
  82. 'received SIGQUIT indicating exit request')
  83. def test_handle_sighup(self):
  84. options = DummyOptions()
  85. options.signal = signal.SIGHUP
  86. supervisord = self._makeOne(options)
  87. supervisord.handle_signal()
  88. self.assertEqual(supervisord.mood, 0)
  89. self.assertEqual(options.logger.data[0],
  90. 'received SIGHUP indicating restart request')
  91. def test_handle_sigusr2(self):
  92. options = DummyOptions()
  93. options.signal = signal.SIGUSR2
  94. pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
  95. from supervisor.process import ProcessStates
  96. process1 = DummyProcess(pconfig1, state=ProcessStates.STOPPING)
  97. process1.delay = time.time() - 1
  98. supervisord = self._makeOne(options)
  99. pconfigs = [DummyPConfig(options, 'foo', 'foo', '/bin/foo')]
  100. options.process_group_configs = DummyPGroupConfig(
  101. options, 'foo',
  102. pconfigs=pconfigs)
  103. supervisord.handle_signal()
  104. self.assertEqual(supervisord.mood, 1)
  105. self.assertEqual(options.logs_reopened, True)
  106. self.assertEqual(options.logger.data[0],
  107. 'received SIGUSR2 indicating log reopen request')
  108. def test_handle_unknown_signal(self):
  109. options = DummyOptions()
  110. options.signal = signal.SIGUSR1
  111. supervisord = self._makeOne(options)
  112. supervisord.handle_signal()
  113. self.assertEqual(supervisord.mood, 1)
  114. self.assertEqual(options.logger.data[0],
  115. 'received SIGUSR1 indicating nothing')
  116. def test_runforever_select_eintr(self):
  117. options = DummyOptions()
  118. import errno
  119. options.select_error = errno.EINTR
  120. supervisord = self._makeOne(options)
  121. supervisord.runforever(test=True)
  122. self.assertEqual(options.logger.data[0], 5)
  123. self.assertEqual(options.logger.data[1], 'EINTR encountered in select')
  124. def test_runforever_select_uncaught_exception(self):
  125. options = DummyOptions()
  126. import errno
  127. options.select_error = errno.EBADF
  128. supervisord = self._makeOne(options)
  129. import select
  130. self.assertRaises(select.error, supervisord.runforever, test=True)
  131. def test_one_process_group_select(self):
  132. options = DummyOptions()
  133. supervisord = self._makeOne(options)
  134. pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
  135. process = DummyProcess(pconfig)
  136. gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
  137. pgroup = DummyProcessGroup(gconfig)
  138. L = []
  139. def callback():
  140. L.append(1)
  141. pgroup.select_result = {6:callback, 7:callback}, [6], [7], []
  142. supervisord.process_groups = {'foo': pgroup}
  143. options.select_result = [6], [7], []
  144. supervisord.runforever(test=True)
  145. self.assertEqual(pgroup.necessary_started, True)
  146. self.assertEqual(pgroup.transitioned, True)
  147. self.assertEqual(L, [1, 1])
  148. def test_exit(self):
  149. options = DummyOptions()
  150. supervisord = self._makeOne(options)
  151. pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
  152. process = DummyProcess(pconfig)
  153. gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
  154. pgroup = DummyProcessGroup(gconfig)
  155. L = []
  156. def callback():
  157. L.append(1)
  158. supervisord.process_groups = {'foo': pgroup}
  159. supervisord.mood = 0
  160. import asyncore
  161. self.assertRaises(asyncore.ExitNow, supervisord.runforever, test=True)
  162. self.assertEqual(pgroup.all_stopped, True)
  163. def test_exit_delayed(self):
  164. options = DummyOptions()
  165. supervisord = self._makeOne(options)
  166. pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
  167. process = DummyProcess(pconfig)
  168. gconfig = DummyPGroupConfig(options, pconfigs=[pconfig])
  169. pgroup = DummyProcessGroup(gconfig)
  170. pgroup.delay_processes = [process]
  171. L = []
  172. def callback():
  173. L.append(1)
  174. supervisord.process_groups = {'foo': pgroup}
  175. supervisord.mood = 0
  176. import asyncore
  177. supervisord.runforever(test=True)
  178. self.assertNotEqual(supervisord.lastdelayreport, 0)
  179. def test_getSupervisorStateDescription(self):
  180. from supervisor.supervisord import getSupervisorStateDescription
  181. from supervisor.supervisord import SupervisorStates
  182. result = getSupervisorStateDescription(SupervisorStates.ACTIVE)
  183. self.assertEqual(result, 'ACTIVE')
  184. def test_suite():
  185. return unittest.findTestCases(sys.modules[__name__])
  186. if __name__ == '__main__':
  187. unittest.main(defaultTest='test_suite')