supervisord.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. #!/usr/bin/env python
  2. """supervisord -- run a set of applications as daemons.
  3. Usage: %s [options]
  4. Options:
  5. -c/--configuration FILENAME -- configuration file
  6. -n/--nodaemon -- run in the foreground (same as 'nodaemon true' in config file)
  7. -h/--help -- print this usage message and exit
  8. -v/--version -- print supervisord version number and exit
  9. -u/--user USER -- run supervisord as this user (or numeric uid)
  10. -m/--umask UMASK -- use this umask for daemon subprocess (default is 022)
  11. -d/--directory DIRECTORY -- directory to chdir to when daemonized
  12. -l/--logfile FILENAME -- use FILENAME as logfile path
  13. -y/--logfile_maxbytes BYTES -- use BYTES to limit the max size of logfile
  14. -z/--logfile_backups NUM -- number of backups to keep when max bytes reached
  15. -e/--loglevel LEVEL -- use LEVEL as log level (debug,info,warn,error,critical)
  16. -j/--pidfile FILENAME -- write a pid file for the daemon process to FILENAME
  17. -i/--identifier STR -- identifier used for this instance of supervisord
  18. -q/--childlogdir DIRECTORY -- the log directory for child process logs
  19. -k/--nocleanup -- prevent the process from performing cleanup (removal of
  20. old automatic child log files) at startup.
  21. -a/--minfds NUM -- the minimum number of file descriptors for start success
  22. -t/--strip_ansi -- strip ansi escape codes from process output
  23. --minprocs NUM -- the minimum number of processes available for start success
  24. --profile_options OPTIONS -- run supervisord under profiler and output
  25. results based on OPTIONS, which is a comma-sep'd
  26. list of 'cumulative', 'calls', and/or 'callers',
  27. e.g. 'cumulative,callers')
  28. """
  29. import os
  30. import time
  31. import errno
  32. import select
  33. import signal
  34. from supervisor.medusa import asyncore_25 as asyncore
  35. from supervisor.options import ServerOptions
  36. from supervisor.options import signame
  37. from supervisor import events
  38. from supervisor.states import SupervisorStates
  39. from supervisor.states import getProcessStateDescription
  40. class Supervisor:
  41. stopping = False # set after we detect that we are handling a stop request
  42. lastshutdownreport = 0 # throttle for delayed process error reports at stop
  43. process_groups = None # map of process group name to process group object
  44. stop_groups = None # list used for priority ordered shutdown
  45. def __init__(self, options):
  46. self.options = options
  47. self.process_groups = {}
  48. self.ticks = {}
  49. def main(self):
  50. if not self.options.first:
  51. # prevent crash on libdispatch-based systems, at least for the
  52. # first request
  53. self.options.cleanup_fds()
  54. info_messages = []
  55. critical_messages = []
  56. warn_messages = []
  57. setuid_msg = self.options.set_uid()
  58. if setuid_msg:
  59. critical_messages.append(setuid_msg)
  60. if self.options.first:
  61. rlimit_messages = self.options.set_rlimits()
  62. info_messages.extend(rlimit_messages)
  63. warn_messages.extend(self.options.parse_warnings)
  64. # this sets the options.logger object
  65. # delay logger instantiation until after setuid
  66. self.options.make_logger(critical_messages, warn_messages,
  67. info_messages)
  68. if not self.options.nocleanup:
  69. # clean up old automatic logs
  70. self.options.clear_autochildlogdir()
  71. self.run()
  72. def run(self):
  73. self.process_groups = {} # clear
  74. self.stop_groups = None # clear
  75. events.clear()
  76. try:
  77. for config in self.options.process_group_configs:
  78. self.add_process_group(config)
  79. self.options.process_environment()
  80. self.options.openhttpservers(self)
  81. self.options.setsignals()
  82. if (not self.options.nodaemon) and self.options.first:
  83. self.options.daemonize()
  84. # writing pid file needs to come *after* daemonizing or pid
  85. # will be wrong
  86. self.options.write_pidfile()
  87. self.runforever()
  88. finally:
  89. self.options.cleanup()
  90. def diff_to_active(self, new=None):
  91. if not new:
  92. new = self.options.process_group_configs
  93. cur = [group.config for group in self.process_groups.values()]
  94. curdict = dict(zip([cfg.name for cfg in cur], cur))
  95. newdict = dict(zip([cfg.name for cfg in new], new))
  96. added = [cand for cand in new if cand.name not in curdict]
  97. removed = [cand for cand in cur if cand.name not in newdict]
  98. changed = [cand for cand in new
  99. if cand != curdict.get(cand.name, cand)]
  100. return added, changed, removed
  101. def add_process_group(self, config):
  102. name = config.name
  103. if name not in self.process_groups:
  104. config.after_setuid()
  105. self.process_groups[name] = config.make_group()
  106. events.notify(events.ProcessGroupAddedEvent(name))
  107. return True
  108. return False
  109. def remove_process_group(self, name):
  110. if self.process_groups[name].get_unstopped_processes():
  111. return False
  112. del self.process_groups[name]
  113. events.notify(events.ProcessGroupRemovedEvent(name))
  114. return True
  115. def get_process_map(self):
  116. process_map = {}
  117. pgroups = self.process_groups.values()
  118. for group in pgroups:
  119. process_map.update(group.get_dispatchers())
  120. return process_map
  121. def shutdown_report(self):
  122. unstopped = []
  123. pgroups = self.process_groups.values()
  124. for group in pgroups:
  125. unstopped.extend(group.get_unstopped_processes())
  126. if unstopped:
  127. # throttle 'waiting for x to die' reports
  128. now = time.time()
  129. if now > (self.lastshutdownreport + 3): # every 3 secs
  130. names = [ p.config.name for p in unstopped ]
  131. namestr = ', '.join(names)
  132. self.options.logger.info('waiting for %s to die' % namestr)
  133. self.lastshutdownreport = now
  134. for proc in unstopped:
  135. state = getProcessStateDescription(proc.get_state())
  136. self.options.logger.blather(
  137. '%s state: %s' % (proc.config.name, state))
  138. return unstopped
  139. def ordered_stop_groups_phase_1(self):
  140. if self.stop_groups:
  141. # stop the last group (the one with the "highest" priority)
  142. self.stop_groups[-1].stop_all()
  143. def ordered_stop_groups_phase_2(self):
  144. # after phase 1 we've transitioned and reaped, let's see if we
  145. # can remove the group we stopped from the stop_groups queue.
  146. if self.stop_groups:
  147. # pop the last group (the one with the "highest" priority)
  148. group = self.stop_groups.pop()
  149. if group.get_unstopped_processes():
  150. # if any processes in the group aren't yet in a
  151. # stopped state, we're not yet done shutting this
  152. # group down, so push it back on to the end of the
  153. # stop group queue
  154. self.stop_groups.append(group)
  155. def runforever(self):
  156. events.notify(events.SupervisorRunningEvent())
  157. timeout = 1 # this cannot be fewer than the smallest TickEvent (5)
  158. socket_map = self.options.get_socket_map()
  159. while 1:
  160. combined_map = {}
  161. combined_map.update(socket_map)
  162. combined_map.update(self.get_process_map())
  163. pgroups = self.process_groups.values()
  164. pgroups.sort()
  165. if self.options.mood < SupervisorStates.RUNNING:
  166. if not self.stopping:
  167. # first time, set the stopping flag, do a
  168. # notification and set stop_groups
  169. self.stopping = True
  170. self.stop_groups = pgroups[:]
  171. events.notify(events.SupervisorStoppingEvent())
  172. self.ordered_stop_groups_phase_1()
  173. if not self.shutdown_report():
  174. # if there are no unstopped processes (we're done
  175. # killing everything), it's OK to swtop or reload
  176. raise asyncore.ExitNow
  177. r, w, x = [], [], []
  178. for fd, dispatcher in combined_map.items():
  179. if dispatcher.readable():
  180. r.append(fd)
  181. if dispatcher.writable():
  182. w.append(fd)
  183. try:
  184. r, w, x = self.options.select(r, w, x, timeout)
  185. except select.error, err:
  186. r = w = x = []
  187. if err.args[0] == errno.EINTR:
  188. self.options.logger.blather('EINTR encountered in select')
  189. else:
  190. raise
  191. for fd in r:
  192. if combined_map.has_key(fd):
  193. try:
  194. dispatcher = combined_map[fd]
  195. self.options.logger.blather(
  196. 'read event caused by %(dispatcher)s',
  197. dispatcher=dispatcher)
  198. dispatcher.handle_read_event()
  199. except asyncore.ExitNow:
  200. raise
  201. except:
  202. combined_map[fd].handle_error()
  203. for fd in w:
  204. if combined_map.has_key(fd):
  205. try:
  206. dispatcher = combined_map[fd]
  207. self.options.logger.blather(
  208. 'write event caused by %(dispatcher)s',
  209. dispatcher=dispatcher)
  210. dispatcher.handle_write_event()
  211. except asyncore.ExitNow:
  212. raise
  213. except:
  214. combined_map[fd].handle_error()
  215. [ group.transition() for group in pgroups ]
  216. self.reap()
  217. self.handle_signal()
  218. self.tick()
  219. if self.options.mood < SupervisorStates.RUNNING:
  220. self.ordered_stop_groups_phase_2()
  221. if self.options.test:
  222. break
  223. def tick(self, now=None):
  224. """ Send one or more 'tick' events when the timeslice related to
  225. the period for the event type rolls over """
  226. if now is None:
  227. # now won't be None in unit tests
  228. now = time.time()
  229. for event in events.TICK_EVENTS:
  230. period = event.period
  231. last_tick = self.ticks.get(period)
  232. if last_tick is None:
  233. # we just started up
  234. last_tick = self.ticks[period] = timeslice(period, now)
  235. this_tick = timeslice(period, now)
  236. if this_tick != last_tick:
  237. self.ticks[period] = this_tick
  238. events.notify(event(this_tick, self))
  239. def reap(self, once=False, recursionguard=0):
  240. if recursionguard == 100:
  241. return
  242. pid, sts = self.options.waitpid()
  243. if pid:
  244. process = self.options.pidhistory.get(pid, None)
  245. if process is None:
  246. self.options.logger.info('reaped unknown pid %s' % pid)
  247. else:
  248. process.finish(pid, sts)
  249. del self.options.pidhistory[pid]
  250. if not once:
  251. # keep reaping until no more kids to reap, but don't recurse
  252. # infintely
  253. self.reap(once=False, recursionguard=recursionguard+1)
  254. def handle_signal(self):
  255. sig = self.options.get_signal()
  256. if sig:
  257. if sig in (signal.SIGTERM, signal.SIGINT, signal.SIGQUIT):
  258. self.options.logger.warn(
  259. 'received %s indicating exit request' % signame(sig))
  260. self.options.mood = SupervisorStates.SHUTDOWN
  261. elif sig == signal.SIGHUP:
  262. self.options.logger.warn(
  263. 'received %s indicating restart request' % signame(sig))
  264. self.options.mood = SupervisorStates.RESTARTING
  265. elif sig == signal.SIGCHLD:
  266. self.options.logger.debug(
  267. 'received %s indicating a child quit' % signame(sig))
  268. elif sig == signal.SIGUSR2:
  269. self.options.logger.info(
  270. 'received %s indicating log reopen request' % signame(sig))
  271. self.options.reopenlogs()
  272. for group in self.process_groups.values():
  273. group.reopenlogs()
  274. else:
  275. self.options.logger.blather(
  276. 'received %s indicating nothing' % signame(sig))
  277. def get_state(self):
  278. return self.options.mood
  279. def timeslice(period, when):
  280. return int(when - (when % period))
  281. # profile entry point
  282. def profile(cmd, globals, locals, sort_order, callers):
  283. try:
  284. import cProfile as profile
  285. except ImportError:
  286. import profile # python < 2.5
  287. import pstats
  288. import tempfile
  289. fd, fn = tempfile.mkstemp()
  290. try:
  291. profile.runctx(cmd, globals, locals, fn)
  292. stats = pstats.Stats(fn)
  293. stats.strip_dirs()
  294. # calls,time,cumulative and cumulative,calls,time are useful
  295. stats.sort_stats(*sort_order or ('cumulative', 'calls', 'time'))
  296. if callers:
  297. stats.print_callers(.3)
  298. else:
  299. stats.print_stats(.3)
  300. finally:
  301. os.remove(fn)
  302. # Main program
  303. def main(args=None, test=False):
  304. assert os.name == "posix", "This code makes Unix-specific assumptions"
  305. # if we hup, restart by making a new Supervisor()
  306. first = True
  307. while 1:
  308. options = ServerOptions()
  309. options.realize(args, doc=__doc__)
  310. options.first = first
  311. options.test = test
  312. if options.profile_options:
  313. sort_order, callers = options.profile_options
  314. profile('go(options)', globals(), locals(), sort_order, callers)
  315. else:
  316. go(options)
  317. if test or (options.mood < SupervisorStates.RESTARTING):
  318. break
  319. options.close_httpservers()
  320. options.close_logger()
  321. first = False
  322. def go(options):
  323. d = Supervisor(options)
  324. try:
  325. d.main()
  326. except asyncore.ExitNow:
  327. pass
  328. if __name__ == "__main__":
  329. main()