supervisord.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. #!/usr/bin/env python
  2. """supervisord -- run a set of applications as daemons.
  3. Usage: %s [options]
  4. Options:
  5. -c/--configuration FILENAME -- configuration file
  6. -n/--nodaemon -- run in the foreground (same as 'nodaemon true' in config file)
  7. -h/--help -- print this usage message and exit
  8. -v/--version -- print supervisord version number and exit
  9. -u/--user USER -- run supervisord as this user (or numeric uid)
  10. -m/--umask UMASK -- use this umask for daemon subprocess (default is 022)
  11. -d/--directory DIRECTORY -- directory to chdir to when daemonized
  12. -l/--logfile FILENAME -- use FILENAME as logfile path
  13. -y/--logfile_maxbytes BYTES -- use BYTES to limit the max size of logfile
  14. -z/--logfile_backups NUM -- number of backups to keep when max bytes reached
  15. -e/--loglevel LEVEL -- use LEVEL as log level (debug,info,warn,error,critical)
  16. -j/--pidfile FILENAME -- write a pid file for the daemon process to FILENAME
  17. -i/--identifier STR -- identifier used for this instance of supervisord
  18. -q/--childlogdir DIRECTORY -- the log directory for child process logs
  19. -k/--nocleanup -- prevent the process from performing cleanup (removal of
  20. old automatic child log files) at startup.
  21. -a/--minfds NUM -- the minimum number of file descriptors for start success
  22. -t/--strip_ansi -- strip ansi escape codes from process output
  23. --minprocs NUM -- the minimum number of processes available for start success
  24. --profile_options OPTIONS -- run supervisord under profiler and output
  25. results based on OPTIONS, which is a comma-sep'd
  26. list of 'cumulative', 'calls', and/or 'callers',
  27. e.g. 'cumulative,callers')
  28. """
  29. import os
  30. import time
  31. import errno
  32. import select
  33. import signal
  34. from supervisor.medusa import asyncore_25 as asyncore
  35. from supervisor.options import ServerOptions
  36. from supervisor.options import signame
  37. from supervisor import events
  38. from supervisor.states import SupervisorStates
  39. from supervisor.states import getProcessStateDescription
  40. class Supervisor:
  41. stopping = False # set after we detect that we are handling a stop request
  42. lastshutdownreport = 0 # throttle for delayed process error reports at stop
  43. process_groups = None # map of process group name to process group object
  44. stop_groups = None # list used for priority ordered shutdown
  45. def __init__(self, options):
  46. self.options = options
  47. self.process_groups = {}
  48. self.ticks = {}
  49. def main(self):
  50. if not self.options.first:
  51. # prevent crash on libdispatch-based systems, at least for the
  52. # first request
  53. self.options.cleanup_fds()
  54. info_messages = []
  55. critical_messages = []
  56. warn_messages = []
  57. setuid_msg = self.options.set_uid()
  58. if setuid_msg:
  59. critical_messages.append(setuid_msg)
  60. if self.options.first:
  61. rlimit_messages = self.options.set_rlimits()
  62. info_messages.extend(rlimit_messages)
  63. warn_messages.extend(self.options.parse_warnings)
  64. # this sets the options.logger object
  65. # delay logger instantiation until after setuid
  66. self.options.make_logger(critical_messages, warn_messages,
  67. info_messages)
  68. if not self.options.nocleanup:
  69. # clean up old automatic logs
  70. self.options.clear_autochildlogdir()
  71. self.run()
  72. def run(self):
  73. self.process_groups = {} # clear
  74. self.stop_groups = None # clear
  75. events.clear()
  76. try:
  77. for config in self.options.process_group_configs:
  78. self.add_process_group(config)
  79. self.options.process_environment()
  80. self.options.openhttpservers(self)
  81. self.options.setsignals()
  82. if (not self.options.nodaemon) and self.options.first:
  83. self.options.daemonize()
  84. # writing pid file needs to come *after* daemonizing or pid
  85. # will be wrong
  86. self.options.write_pidfile()
  87. self.runforever()
  88. finally:
  89. self.options.cleanup()
  90. def diff_to_active(self, new=None):
  91. if not new:
  92. new = self.options.process_group_configs
  93. cur = [group.config for group in self.process_groups.values()]
  94. curdict = dict(zip([cfg.name for cfg in cur], cur))
  95. newdict = dict(zip([cfg.name for cfg in new], new))
  96. added = [cand for cand in new if cand.name not in curdict]
  97. removed = [cand for cand in cur if cand.name not in newdict]
  98. changed = [cand for cand in new
  99. if cand != curdict.get(cand.name, cand)]
  100. return added, changed, removed
  101. def add_process_group(self, config):
  102. name = config.name
  103. if name not in self.process_groups:
  104. config.after_setuid()
  105. self.process_groups[name] = config.make_group()
  106. return True
  107. return False
  108. def remove_process_group(self, name):
  109. if self.process_groups[name].get_unstopped_processes():
  110. return False
  111. del self.process_groups[name]
  112. return True
  113. def get_process_map(self):
  114. process_map = {}
  115. pgroups = self.process_groups.values()
  116. for group in pgroups:
  117. process_map.update(group.get_dispatchers())
  118. return process_map
  119. def shutdown_report(self):
  120. unstopped = []
  121. pgroups = self.process_groups.values()
  122. for group in pgroups:
  123. unstopped.extend(group.get_unstopped_processes())
  124. if unstopped:
  125. # throttle 'waiting for x to die' reports
  126. now = time.time()
  127. if now > (self.lastshutdownreport + 3): # every 3 secs
  128. names = [ p.config.name for p in unstopped ]
  129. namestr = ', '.join(names)
  130. self.options.logger.info('waiting for %s to die' % namestr)
  131. self.lastshutdownreport = now
  132. for proc in unstopped:
  133. state = getProcessStateDescription(proc.get_state())
  134. self.options.logger.blather(
  135. '%s state: %s' % (proc.config.name, state))
  136. return unstopped
  137. def ordered_stop_groups_phase_1(self):
  138. if self.stop_groups:
  139. # stop the last group (the one with the "highest" priority)
  140. self.stop_groups[-1].stop_all()
  141. def ordered_stop_groups_phase_2(self):
  142. # after phase 1 we've transitioned and reaped, let's see if we
  143. # can remove the group we stopped from the stop_groups queue.
  144. if self.stop_groups:
  145. # pop the last group (the one with the "highest" priority)
  146. group = self.stop_groups.pop()
  147. if group.get_unstopped_processes():
  148. # if any processes in the group aren't yet in a
  149. # stopped state, we're not yet done shutting this
  150. # group down, so push it back on to the end of the
  151. # stop group queue
  152. self.stop_groups.append(group)
  153. def runforever(self):
  154. events.notify(events.SupervisorRunningEvent())
  155. timeout = 1 # this cannot be fewer than the smallest TickEvent (5)
  156. socket_map = self.options.get_socket_map()
  157. while 1:
  158. combined_map = {}
  159. combined_map.update(socket_map)
  160. combined_map.update(self.get_process_map())
  161. pgroups = self.process_groups.values()
  162. pgroups.sort()
  163. if self.options.mood < SupervisorStates.RUNNING:
  164. if not self.stopping:
  165. # first time, set the stopping flag, do a
  166. # notification and set stop_groups
  167. self.stopping = True
  168. self.stop_groups = pgroups[:]
  169. events.notify(events.SupervisorStoppingEvent())
  170. self.ordered_stop_groups_phase_1()
  171. if not self.shutdown_report():
  172. # if there are no unstopped processes (we're done
  173. # killing everything), it's OK to swtop or reload
  174. raise asyncore.ExitNow
  175. r, w, x = [], [], []
  176. for fd, dispatcher in combined_map.items():
  177. if dispatcher.readable():
  178. r.append(fd)
  179. if dispatcher.writable():
  180. w.append(fd)
  181. try:
  182. r, w, x = self.options.select(r, w, x, timeout)
  183. except select.error, err:
  184. r = w = x = []
  185. if err[0] == errno.EINTR:
  186. self.options.logger.blather('EINTR encountered in select')
  187. else:
  188. raise
  189. for fd in r:
  190. if combined_map.has_key(fd):
  191. try:
  192. dispatcher = combined_map[fd]
  193. self.options.logger.blather(
  194. 'read event caused by %(dispatcher)s',
  195. dispatcher=dispatcher)
  196. dispatcher.handle_read_event()
  197. except asyncore.ExitNow:
  198. raise
  199. except:
  200. combined_map[fd].handle_error()
  201. for fd in w:
  202. if combined_map.has_key(fd):
  203. try:
  204. dispatcher = combined_map[fd]
  205. self.options.logger.blather(
  206. 'write event caused by %(dispatcher)s',
  207. dispatcher=dispatcher)
  208. dispatcher.handle_write_event()
  209. except asyncore.ExitNow:
  210. raise
  211. except:
  212. combined_map[fd].handle_error()
  213. [ group.transition() for group in pgroups ]
  214. self.reap()
  215. self.handle_signal()
  216. self.tick()
  217. if self.options.mood < SupervisorStates.RUNNING:
  218. self.ordered_stop_groups_phase_2()
  219. if self.options.test:
  220. break
  221. def tick(self, now=None):
  222. """ Send one or more 'tick' events when the timeslice related to
  223. the period for the event type rolls over """
  224. if now is None:
  225. # now won't be None in unit tests
  226. now = time.time()
  227. for event in events.TICK_EVENTS:
  228. period = event.period
  229. last_tick = self.ticks.get(period)
  230. if last_tick is None:
  231. # we just started up
  232. last_tick = self.ticks[period] = timeslice(period, now)
  233. this_tick = timeslice(period, now)
  234. if this_tick != last_tick:
  235. self.ticks[period] = this_tick
  236. events.notify(event(this_tick, self))
  237. def reap(self, once=False):
  238. pid, sts = self.options.waitpid()
  239. if pid:
  240. process = self.options.pidhistory.get(pid, None)
  241. if process is None:
  242. self.options.logger.critical('reaped unknown pid %s)' % pid)
  243. else:
  244. process.finish(pid, sts)
  245. del self.options.pidhistory[pid]
  246. if not once:
  247. self.reap() # keep reaping until no more kids to reap
  248. def handle_signal(self):
  249. sig = self.options.get_signal()
  250. if sig:
  251. if sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT):
  252. self.options.logger.warn(
  253. 'received %s indicating exit request' % signame(sig))
  254. self.options.mood = SupervisorStates.SHUTDOWN
  255. elif sig == signal.SIGHUP:
  256. self.options.logger.warn(
  257. 'received %s indicating restart request' % signame(sig))
  258. self.options.mood = SupervisorStates.RESTARTING
  259. elif sig == signal.SIGCHLD:
  260. self.options.logger.debug(
  261. 'received %s indicating a child quit' % signame(sig))
  262. elif sig == signal.SIGUSR2:
  263. self.options.logger.info(
  264. 'received %s indicating log reopen request' % signame(sig))
  265. self.options.reopenlogs()
  266. for group in self.process_groups.values():
  267. group.reopenlogs()
  268. else:
  269. self.options.logger.blather(
  270. 'received %s indicating nothing' % signame(sig))
  271. def get_state(self):
  272. return self.options.mood
  273. def timeslice(period, when):
  274. return int(when - (when % period))
  275. # profile entry point
  276. def profile(cmd, globals, locals, sort_order, callers):
  277. import profile
  278. import pstats
  279. import tempfile
  280. fd, fn = tempfile.mkstemp()
  281. try:
  282. if hasattr(profile, 'runctx'):
  283. profile.runctx(cmd, globals, locals, fn)
  284. else:
  285. raise NotImplementedError('No profiling support under Python 2.3')
  286. stats = pstats.Stats(fn)
  287. stats.strip_dirs()
  288. # calls,time,cumulative and cumulative,calls,time are useful
  289. stats.sort_stats(*sort_order or ('cumulative', 'calls', 'time'))
  290. if callers:
  291. stats.print_callers(.3)
  292. else:
  293. stats.print_stats(.3)
  294. finally:
  295. os.remove(fn)
  296. # Main program
  297. def main(args=None, test=False):
  298. assert os.name == "posix", "This code makes Unix-specific assumptions"
  299. # if we hup, restart by making a new Supervisor()
  300. first = True
  301. while 1:
  302. options = ServerOptions()
  303. options.realize(args, doc=__doc__)
  304. options.first = first
  305. options.test = test
  306. if options.profile_options:
  307. sort_order, callers = options.profile_options
  308. profile('go(options)', globals(), locals(), sort_order, callers)
  309. else:
  310. go(options)
  311. if test or (options.mood < SupervisorStates.RESTARTING):
  312. break
  313. options.close_httpservers()
  314. options.close_logger()
  315. first = False
  316. def go(options):
  317. d = Supervisor(options)
  318. try:
  319. d.main()
  320. except asyncore.ExitNow:
  321. pass
  322. if __name__ == "__main__":
  323. main()