supervisord.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. #!/usr/bin/env python
  2. """supervisord -- run a set of applications as daemons.
  3. Usage: %s [options]
  4. Options:
  5. -c/--configuration FILENAME -- configuration file
  6. -n/--nodaemon -- run in the foreground (same as 'nodaemon true' in config file)
  7. -h/--help -- print this usage message and exit
  8. -v/--version -- print supervisord version number and exit
  9. -u/--user USER -- run supervisord as this user (or numeric uid)
  10. -m/--umask UMASK -- use this umask for daemon subprocess (default is 022)
  11. -d/--directory DIRECTORY -- directory to chdir to when daemonized
  12. -l/--logfile FILENAME -- use FILENAME as logfile path
  13. -y/--logfile_maxbytes BYTES -- use BYTES to limit the max size of logfile
  14. -z/--logfile_backups NUM -- number of backups to keep when max bytes reached
  15. -e/--loglevel LEVEL -- use LEVEL as log level (debug,info,warn,error,critical)
  16. -j/--pidfile FILENAME -- write a pid file for the daemon process to FILENAME
  17. -i/--identifier STR -- identifier used for this instance of supervisord
  18. -q/--childlogdir DIRECTORY -- the log directory for child process logs
  19. -k/--nocleanup -- prevent the process from performing cleanup (removal of
  20. old automatic child log files) at startup.
  21. -a/--minfds NUM -- the minimum number of file descriptors for start success
  22. -t/--strip_ansi -- strip ansi escape codes from process output
  23. --minprocs NUM -- the minimum number of processes available for start success
  24. --profile_options OPTIONS -- run supervisord under profiler and output
  25. results based on OPTIONS, which is a comma-sep'd
  26. list of 'cumulative', 'calls', and/or 'callers',
  27. e.g. 'cumulative,callers')
  28. """
  29. import os
  30. import time
  31. import signal
  32. from supervisor.medusa import asyncore_25 as asyncore
  33. from supervisor.options import ServerOptions
  34. from supervisor.options import signame
  35. from supervisor import events
  36. from supervisor.states import SupervisorStates
  37. from supervisor.states import getProcessStateDescription
  38. class Supervisor:
  39. stopping = False # set after we detect that we are handling a stop request
  40. lastshutdownreport = 0 # throttle for delayed process error reports at stop
  41. process_groups = None # map of process group name to process group object
  42. stop_groups = None # list used for priority ordered shutdown
  43. def __init__(self, options):
  44. self.options = options
  45. self.process_groups = {}
  46. self.ticks = {}
  47. def main(self):
  48. if not self.options.first:
  49. # prevent crash on libdispatch-based systems, at least for the
  50. # first request
  51. self.options.cleanup_fds()
  52. info_messages = []
  53. critical_messages = []
  54. warn_messages = []
  55. setuid_msg = self.options.set_uid()
  56. if setuid_msg:
  57. critical_messages.append(setuid_msg)
  58. if self.options.first:
  59. rlimit_messages = self.options.set_rlimits()
  60. info_messages.extend(rlimit_messages)
  61. warn_messages.extend(self.options.parse_warnings)
  62. # this sets the options.logger object
  63. # delay logger instantiation until after setuid
  64. self.options.make_logger(critical_messages, warn_messages,
  65. info_messages)
  66. if not self.options.nocleanup:
  67. # clean up old automatic logs
  68. self.options.clear_autochildlogdir()
  69. self.run()
  70. def run(self):
  71. self.process_groups = {} # clear
  72. self.stop_groups = None # clear
  73. events.clear()
  74. try:
  75. for config in self.options.process_group_configs:
  76. self.add_process_group(config)
  77. self.options.process_environment()
  78. self.options.openhttpservers(self)
  79. self.options.setsignals()
  80. if (not self.options.nodaemon) and self.options.first:
  81. self.options.daemonize()
  82. # writing pid file needs to come *after* daemonizing or pid
  83. # will be wrong
  84. self.options.write_pidfile()
  85. self.runforever()
  86. finally:
  87. self.options.cleanup()
  88. def diff_to_active(self, new=None):
  89. if not new:
  90. new = self.options.process_group_configs
  91. cur = [group.config for group in self.process_groups.values()]
  92. curdict = dict(zip([cfg.name for cfg in cur], cur))
  93. newdict = dict(zip([cfg.name for cfg in new], new))
  94. added = [cand for cand in new if cand.name not in curdict]
  95. removed = [cand for cand in cur if cand.name not in newdict]
  96. changed = [cand for cand in new
  97. if cand != curdict.get(cand.name, cand)]
  98. return added, changed, removed
  99. def add_process_group(self, config):
  100. name = config.name
  101. if name not in self.process_groups:
  102. config.after_setuid()
  103. self.process_groups[name] = config.make_group()
  104. events.notify(events.ProcessGroupAddedEvent(name))
  105. return True
  106. return False
  107. def remove_process_group(self, name):
  108. if self.process_groups[name].get_unstopped_processes():
  109. return False
  110. del self.process_groups[name]
  111. events.notify(events.ProcessGroupRemovedEvent(name))
  112. return True
  113. def get_process_map(self):
  114. process_map = {}
  115. for group in self.process_groups.values():
  116. process_map.update(group.get_dispatchers())
  117. return process_map
  118. def shutdown_report(self):
  119. unstopped = []
  120. for group in self.process_groups.values():
  121. unstopped.extend(group.get_unstopped_processes())
  122. if unstopped:
  123. # throttle 'waiting for x to die' reports
  124. now = time.time()
  125. if now > (self.lastshutdownreport + 3): # every 3 secs
  126. names = [ p.config.name for p in unstopped ]
  127. namestr = ', '.join(names)
  128. self.options.logger.info('waiting for %s to die' % namestr)
  129. self.lastshutdownreport = now
  130. for proc in unstopped:
  131. state = getProcessStateDescription(proc.get_state())
  132. self.options.logger.blather(
  133. '%s state: %s' % (proc.config.name, state))
  134. return unstopped
  135. def ordered_stop_groups_phase_1(self):
  136. if self.stop_groups:
  137. # stop the last group (the one with the "highest" priority)
  138. self.stop_groups[-1].stop_all()
  139. def ordered_stop_groups_phase_2(self):
  140. # after phase 1 we've transitioned and reaped, let's see if we
  141. # can remove the group we stopped from the stop_groups queue.
  142. if self.stop_groups:
  143. # pop the last group (the one with the "highest" priority)
  144. group = self.stop_groups.pop()
  145. if group.get_unstopped_processes():
  146. # if any processes in the group aren't yet in a
  147. # stopped state, we're not yet done shutting this
  148. # group down, so push it back on to the end of the
  149. # stop group queue
  150. self.stop_groups.append(group)
  151. def runforever(self):
  152. events.notify(events.SupervisorRunningEvent())
  153. timeout = 1 # this cannot be fewer than the smallest TickEvent (5)
  154. socket_map = self.options.get_socket_map()
  155. while 1:
  156. combined_map = {}
  157. combined_map.update(socket_map)
  158. combined_map.update(self.get_process_map())
  159. pgroups = list(self.process_groups.values())
  160. pgroups.sort()
  161. if self.options.mood < SupervisorStates.RUNNING:
  162. if not self.stopping:
  163. # first time, set the stopping flag, do a
  164. # notification and set stop_groups
  165. self.stopping = True
  166. self.stop_groups = pgroups[:]
  167. events.notify(events.SupervisorStoppingEvent())
  168. self.ordered_stop_groups_phase_1()
  169. if not self.shutdown_report():
  170. # if there are no unstopped processes (we're done
  171. # killing everything), it's OK to swtop or reload
  172. raise asyncore.ExitNow
  173. for fd, dispatcher in combined_map.items():
  174. if dispatcher.readable():
  175. self.options.poller.register_readable(fd)
  176. if dispatcher.writable():
  177. self.options.poller.register_writable(fd)
  178. r, w = self.options.poller.poll(timeout)
  179. for fd in r:
  180. if fd in combined_map:
  181. try:
  182. dispatcher = combined_map[fd]
  183. self.options.logger.blather(
  184. 'read event caused by %(dispatcher)s',
  185. dispatcher=dispatcher)
  186. dispatcher.handle_read_event()
  187. except asyncore.ExitNow:
  188. raise
  189. except:
  190. combined_map[fd].handle_error()
  191. for fd in w:
  192. if fd in combined_map:
  193. try:
  194. dispatcher = combined_map[fd]
  195. self.options.logger.blather(
  196. 'write event caused by %(dispatcher)s',
  197. dispatcher=dispatcher)
  198. dispatcher.handle_write_event()
  199. except asyncore.ExitNow:
  200. raise
  201. except:
  202. combined_map[fd].handle_error()
  203. [ group.transition() for group in pgroups ]
  204. self.reap()
  205. self.handle_signal()
  206. self.tick()
  207. if self.options.mood < SupervisorStates.RUNNING:
  208. self.ordered_stop_groups_phase_2()
  209. if self.options.test:
  210. break
  211. def tick(self, now=None):
  212. """ Send one or more 'tick' events when the timeslice related to
  213. the period for the event type rolls over """
  214. if now is None:
  215. # now won't be None in unit tests
  216. now = time.time()
  217. for event in events.TICK_EVENTS:
  218. period = event.period
  219. last_tick = self.ticks.get(period)
  220. if last_tick is None:
  221. # we just started up
  222. last_tick = self.ticks[period] = timeslice(period, now)
  223. this_tick = timeslice(period, now)
  224. if this_tick != last_tick:
  225. self.ticks[period] = this_tick
  226. events.notify(event(this_tick, self))
  227. def reap(self, once=False, recursionguard=0):
  228. if recursionguard == 100:
  229. return
  230. pid, sts = self.options.waitpid()
  231. if pid:
  232. process = self.options.pidhistory.get(pid, None)
  233. if process is None:
  234. self.options.logger.info('reaped unknown pid %s' % pid)
  235. else:
  236. process.finish(pid, sts)
  237. del self.options.pidhistory[pid]
  238. if not once:
  239. # keep reaping until no more kids to reap, but don't recurse
  240. # infintely
  241. self.reap(once=False, recursionguard=recursionguard+1)
  242. def handle_signal(self):
  243. sig = self.options.get_signal()
  244. if sig:
  245. if sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT):
  246. self.options.logger.warn(
  247. 'received %s indicating exit request' % signame(sig))
  248. self.options.mood = SupervisorStates.SHUTDOWN
  249. elif sig == signal.SIGHUP:
  250. self.options.logger.warn(
  251. 'received %s indicating restart request' % signame(sig))
  252. self.options.mood = SupervisorStates.RESTARTING
  253. elif sig == signal.SIGCHLD:
  254. self.options.logger.debug(
  255. 'received %s indicating a child quit' % signame(sig))
  256. elif sig == signal.SIGUSR2:
  257. self.options.logger.info(
  258. 'received %s indicating log reopen request' % signame(sig))
  259. self.options.reopenlogs()
  260. for group in self.process_groups.values():
  261. group.reopenlogs()
  262. else:
  263. self.options.logger.blather(
  264. 'received %s indicating nothing' % signame(sig))
  265. def get_state(self):
  266. return self.options.mood
  267. def timeslice(period, when):
  268. return int(when - (when % period))
  269. # profile entry point
  270. def profile(cmd, globals, locals, sort_order, callers): # pragma: no cover
  271. try:
  272. import cProfile as profile
  273. except ImportError:
  274. import profile
  275. import pstats
  276. import tempfile
  277. fd, fn = tempfile.mkstemp()
  278. try:
  279. profile.runctx(cmd, globals, locals, fn)
  280. stats = pstats.Stats(fn)
  281. stats.strip_dirs()
  282. # calls,time,cumulative and cumulative,calls,time are useful
  283. stats.sort_stats(*sort_order or ('cumulative', 'calls', 'time'))
  284. if callers:
  285. stats.print_callers(.3)
  286. else:
  287. stats.print_stats(.3)
  288. finally:
  289. os.remove(fn)
  290. # Main program
  291. def main(args=None, test=False):
  292. assert os.name == "posix", "This code makes Unix-specific assumptions"
  293. # if we hup, restart by making a new Supervisor()
  294. first = True
  295. while 1:
  296. options = ServerOptions()
  297. options.realize(args, doc=__doc__)
  298. options.first = first
  299. options.test = test
  300. if options.profile_options:
  301. sort_order, callers = options.profile_options
  302. profile('go(options)', globals(), locals(), sort_order, callers)
  303. else:
  304. go(options)
  305. options.close_httpservers()
  306. options.close_logger()
  307. first = False
  308. if test or (options.mood < SupervisorStates.RESTARTING):
  309. break
  310. def go(options): # pragma: no cover
  311. d = Supervisor(options)
  312. try:
  313. d.main()
  314. except asyncore.ExitNow:
  315. pass
  316. if __name__ == "__main__": # pragma: no cover
  317. main()