process.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867
  1. import os
  2. import time
  3. import errno
  4. import shlex
  5. import traceback
  6. import signal
  7. from supervisor.compat import maxint
  8. from supervisor.compat import StringIO
  9. from supervisor.compat import total_ordering
  10. from supervisor.compat import as_bytes
  11. from supervisor.medusa import asyncore_25 as asyncore
  12. from supervisor.states import ProcessStates
  13. from supervisor.states import SupervisorStates
  14. from supervisor.states import getProcessStateDescription
  15. from supervisor.states import STOPPED_STATES
  16. from supervisor.options import decode_wait_status
  17. from supervisor.options import signame
  18. from supervisor.options import ProcessException, BadCommand
  19. from supervisor.dispatchers import EventListenerStates
  20. from supervisor import events
  21. from supervisor.datatypes import RestartUnconditionally
  22. from supervisor.socket_manager import SocketManager
  23. @total_ordering
  24. class Subprocess(object):
  25. """A class to manage a subprocess."""
  26. # Initial state; overridden by instance variables
  27. pid = 0 # Subprocess pid; 0 when not running
  28. config = None # ProcessConfig instance
  29. state = None # process state code
  30. listener_state = None # listener state code (if we're an event listener)
  31. event = None # event currently being processed (if we're an event listener)
  32. laststart = 0 # Last time the subprocess was started; 0 if never
  33. laststop = 0 # Last time the subprocess was stopped; 0 if never
  34. delay = 0 # If nonzero, delay starting or killing until this time
  35. administrative_stop = 0 # true if the process has been stopped by an admin
  36. system_stop = 0 # true if the process has been stopped by the system
  37. killing = 0 # flag determining whether we are trying to kill this proc
  38. backoff = 0 # backoff counter (to startretries)
  39. dispatchers = None # asyncore output dispatchers (keyed by fd)
  40. pipes = None # map of channel name to file descriptor #
  41. exitstatus = None # status attached to dead process by finish()
  42. spawnerr = None # error message attached by spawn() if any
  43. group = None # ProcessGroup instance if process is in the group
  44. def __init__(self, config):
  45. """Constructor.
  46. Argument is a ProcessConfig instance.
  47. """
  48. self.config = config
  49. self.dispatchers = {}
  50. self.pipes = {}
  51. self.state = ProcessStates.STOPPED
  52. def removelogs(self):
  53. for dispatcher in self.dispatchers.values():
  54. if hasattr(dispatcher, 'removelogs'):
  55. dispatcher.removelogs()
  56. def reopenlogs(self):
  57. for dispatcher in self.dispatchers.values():
  58. if hasattr(dispatcher, 'reopenlogs'):
  59. dispatcher.reopenlogs()
  60. def drain(self):
  61. for dispatcher in self.dispatchers.values():
  62. # note that we *must* call readable() for every
  63. # dispatcher, as it may have side effects for a given
  64. # dispatcher (eg. call handle_listener_state_change for
  65. # event listener processes)
  66. if dispatcher.readable():
  67. dispatcher.handle_read_event()
  68. if dispatcher.writable():
  69. dispatcher.handle_write_event()
  70. def write(self, chars):
  71. if not self.pid or self.killing:
  72. raise OSError(errno.EPIPE, "Process already closed")
  73. stdin_fd = self.pipes['stdin']
  74. if stdin_fd is None:
  75. raise OSError(errno.EPIPE, "Process has no stdin channel")
  76. dispatcher = self.dispatchers[stdin_fd]
  77. if dispatcher.closed:
  78. raise OSError(errno.EPIPE, "Process' stdin channel is closed")
  79. dispatcher.input_buffer += chars
  80. dispatcher.flush() # this must raise EPIPE if the pipe is closed
  81. def get_execv_args(self):
  82. """Internal: turn a program name into a file name, using $PATH,
  83. make sure it exists / is executable, raising a ProcessException
  84. if not """
  85. try:
  86. commandargs = shlex.split(self.config.command)
  87. except ValueError as e:
  88. raise BadCommand("can't parse command %r: %s" % \
  89. (self.config.command, str(e)))
  90. if commandargs:
  91. program = commandargs[0]
  92. else:
  93. raise BadCommand("command is empty")
  94. if "/" in program:
  95. filename = program
  96. try:
  97. st = self.config.options.stat(filename)
  98. except OSError:
  99. st = None
  100. else:
  101. path = self.config.options.get_path()
  102. found = None
  103. st = None
  104. for dir in path:
  105. found = os.path.join(dir, program)
  106. try:
  107. st = self.config.options.stat(found)
  108. except OSError:
  109. pass
  110. else:
  111. break
  112. if st is None:
  113. filename = program
  114. else:
  115. filename = found
  116. # check_execv_args will raise a ProcessException if the execv
  117. # args are bogus, we break it out into a separate options
  118. # method call here only to service unit tests
  119. self.config.options.check_execv_args(filename, commandargs, st)
  120. return filename, commandargs
  121. event_map = {
  122. ProcessStates.BACKOFF: events.ProcessStateBackoffEvent,
  123. ProcessStates.FATAL: events.ProcessStateFatalEvent,
  124. ProcessStates.UNKNOWN: events.ProcessStateUnknownEvent,
  125. ProcessStates.STOPPED: events.ProcessStateStoppedEvent,
  126. ProcessStates.EXITED: events.ProcessStateExitedEvent,
  127. ProcessStates.RUNNING: events.ProcessStateRunningEvent,
  128. ProcessStates.STARTING: events.ProcessStateStartingEvent,
  129. ProcessStates.STOPPING: events.ProcessStateStoppingEvent,
  130. }
  131. def change_state(self, new_state, expected=True):
  132. old_state = self.state
  133. if new_state is old_state:
  134. # exists for unit tests
  135. return False
  136. event_class = self.event_map.get(new_state)
  137. if event_class is not None:
  138. event = event_class(self, old_state, expected)
  139. events.notify(event)
  140. if new_state == ProcessStates.BACKOFF:
  141. now = time.time()
  142. self.backoff += 1
  143. self.delay = now + self.backoff
  144. self.state = new_state
  145. def _assertInState(self, *states):
  146. if self.state not in states:
  147. current_state = getProcessStateDescription(self.state)
  148. allowable_states = ' '.join(map(getProcessStateDescription, states))
  149. raise AssertionError('Assertion failed for %s: %s not in %s' % (
  150. self.config.name, current_state, allowable_states))
  151. def record_spawnerr(self, msg):
  152. self.spawnerr = msg
  153. self.config.options.logger.info("spawnerr: %s" % msg)
  154. def spawn(self):
  155. """Start the subprocess. It must not be running already.
  156. Return the process id. If the fork() call fails, return None.
  157. """
  158. options = self.config.options
  159. if self.pid:
  160. msg = 'process %r already running' % self.config.name
  161. options.logger.warn(msg)
  162. return
  163. self.killing = 0
  164. self.spawnerr = None
  165. self.exitstatus = None
  166. self.system_stop = 0
  167. self.administrative_stop = 0
  168. self.laststart = time.time()
  169. self._assertInState(ProcessStates.EXITED, ProcessStates.FATAL,
  170. ProcessStates.BACKOFF, ProcessStates.STOPPED)
  171. self.change_state(ProcessStates.STARTING)
  172. try:
  173. filename, argv = self.get_execv_args()
  174. except ProcessException as what:
  175. self.record_spawnerr(what.args[0])
  176. self._assertInState(ProcessStates.STARTING)
  177. self.change_state(ProcessStates.BACKOFF)
  178. return
  179. try:
  180. self.dispatchers, self.pipes = self.config.make_dispatchers(self)
  181. except OSError as why:
  182. code = why.args[0]
  183. if code == errno.EMFILE:
  184. # too many file descriptors open
  185. msg = 'too many open files to spawn %r' % self.config.name
  186. else:
  187. msg = 'unknown error: %s' % errno.errorcode.get(code, code)
  188. self.record_spawnerr(msg)
  189. self._assertInState(ProcessStates.STARTING)
  190. self.change_state(ProcessStates.BACKOFF)
  191. return
  192. try:
  193. pid = options.fork()
  194. except OSError as why:
  195. code = why.args[0]
  196. if code == errno.EAGAIN:
  197. # process table full
  198. msg = ('Too many processes in process table to spawn %r' %
  199. self.config.name)
  200. else:
  201. msg = 'unknown error: %s' % errno.errorcode.get(code, code)
  202. self.record_spawnerr(msg)
  203. self._assertInState(ProcessStates.STARTING)
  204. self.change_state(ProcessStates.BACKOFF)
  205. options.close_parent_pipes(self.pipes)
  206. options.close_child_pipes(self.pipes)
  207. return
  208. if pid != 0:
  209. return self._spawn_as_parent(pid)
  210. else:
  211. return self._spawn_as_child(filename, argv)
  212. def _spawn_as_parent(self, pid):
  213. # Parent
  214. self.pid = pid
  215. options = self.config.options
  216. options.close_child_pipes(self.pipes)
  217. options.logger.info('spawned: %r with pid %s' % (self.config.name, pid))
  218. self.spawnerr = None
  219. self.delay = time.time() + self.config.startsecs
  220. options.pidhistory[pid] = self
  221. return pid
  222. def _prepare_child_fds(self):
  223. options = self.config.options
  224. options.dup2(self.pipes['child_stdin'], 0)
  225. options.dup2(self.pipes['child_stdout'], 1)
  226. if self.config.redirect_stderr:
  227. options.dup2(self.pipes['child_stdout'], 2)
  228. else:
  229. options.dup2(self.pipes['child_stderr'], 2)
  230. for i in range(3, options.minfds):
  231. options.close_fd(i)
  232. def _spawn_as_child(self, filename, argv):
  233. options = self.config.options
  234. try:
  235. # prevent child from receiving signals sent to the
  236. # parent by calling os.setpgrp to create a new process
  237. # group for the child; this prevents, for instance,
  238. # the case of child processes being sent a SIGINT when
  239. # running supervisor in foreground mode and Ctrl-C in
  240. # the terminal window running supervisord is pressed.
  241. # Presumably it also prevents HUP, etc received by
  242. # supervisord from being sent to children.
  243. options.setpgrp()
  244. self._prepare_child_fds()
  245. # sending to fd 2 will put this output in the stderr log
  246. # set user
  247. setuid_msg = self.set_uid()
  248. if setuid_msg:
  249. uid = self.config.uid
  250. msg = "couldn't setuid to %s: %s\n" % (uid, setuid_msg)
  251. options.write(2, "supervisor: " + msg)
  252. return # finally clause will exit the child process
  253. # set environment
  254. env = os.environ.copy()
  255. env['SUPERVISOR_ENABLED'] = '1'
  256. serverurl = self.config.serverurl
  257. if serverurl is None: # unset
  258. serverurl = self.config.options.serverurl # might still be None
  259. if serverurl:
  260. env['SUPERVISOR_SERVER_URL'] = serverurl
  261. env['SUPERVISOR_PROCESS_NAME'] = self.config.name
  262. if self.group:
  263. env['SUPERVISOR_GROUP_NAME'] = self.group.config.name
  264. if self.config.environment is not None:
  265. env.update(self.config.environment)
  266. # change directory
  267. cwd = self.config.directory
  268. try:
  269. if cwd is not None:
  270. options.chdir(cwd)
  271. except OSError as why:
  272. code = errno.errorcode.get(why.args[0], why.args[0])
  273. msg = "couldn't chdir to %s: %s\n" % (cwd, code)
  274. options.write(2, "supervisor: " + msg)
  275. return # finally clause will exit the child process
  276. # set umask, then execve
  277. try:
  278. if self.config.umask is not None:
  279. options.setumask(self.config.umask)
  280. options.execve(filename, argv, env)
  281. except OSError as why:
  282. code = errno.errorcode.get(why.args[0], why.args[0])
  283. msg = "couldn't exec %s: %s\n" % (argv[0], code)
  284. options.write(2, "supervisor: " + msg)
  285. except:
  286. (file, fun, line), t,v,tbinfo = asyncore.compact_traceback()
  287. error = '%s, %s: file: %s line: %s' % (t, v, file, line)
  288. msg = "couldn't exec %s: %s\n" % (filename, error)
  289. options.write(2, "supervisor: " + msg)
  290. # this point should only be reached if execve failed.
  291. # the finally clause will exit the child process.
  292. finally:
  293. options.write(2, "supervisor: child process was not spawned\n")
  294. options._exit(127) # exit process with code for spawn failure
  295. def stop(self):
  296. """ Administrative stop """
  297. self.administrative_stop = 1
  298. return self.kill(self.config.stopsignal)
  299. def give_up(self):
  300. self.delay = 0
  301. self.backoff = 0
  302. self.system_stop = 1
  303. self._assertInState(ProcessStates.BACKOFF)
  304. self.change_state(ProcessStates.FATAL)
  305. def kill(self, sig):
  306. """Send a signal to the subprocess. This may or may not kill it.
  307. Return None if the signal was sent, or an error message string
  308. if an error occurred or if the subprocess is not running.
  309. """
  310. now = time.time()
  311. options = self.config.options
  312. # Properly stop processes in BACKOFF state.
  313. if self.state == ProcessStates.BACKOFF:
  314. msg = ("Attempted to kill %s, which is in BACKOFF state." %
  315. (self.config.name))
  316. options.logger.debug(msg)
  317. self.change_state(ProcessStates.STOPPED)
  318. return None
  319. if not self.pid:
  320. msg = ("attempted to kill %s with sig %s but it wasn't running" %
  321. (self.config.name, signame(sig)))
  322. options.logger.debug(msg)
  323. return msg
  324. #If we're in the stopping state, then we've already sent the stop
  325. #signal and this is the kill signal
  326. if self.state == ProcessStates.STOPPING:
  327. killasgroup = self.config.killasgroup
  328. else:
  329. killasgroup = self.config.stopasgroup
  330. as_group = ""
  331. if killasgroup:
  332. as_group = "process group "
  333. options.logger.debug('killing %s (pid %s) %swith signal %s'
  334. % (self.config.name,
  335. self.pid,
  336. as_group,
  337. signame(sig))
  338. )
  339. # RUNNING/STARTING/STOPPING -> STOPPING
  340. self.killing = 1
  341. self.delay = now + self.config.stopwaitsecs
  342. # we will already be in the STOPPING state if we're doing a
  343. # SIGKILL as a result of overrunning stopwaitsecs
  344. self._assertInState(ProcessStates.RUNNING,ProcessStates.STARTING,
  345. ProcessStates.STOPPING)
  346. self.change_state(ProcessStates.STOPPING)
  347. pid = self.pid
  348. if killasgroup:
  349. # send to the whole process group instead
  350. pid = -self.pid
  351. try:
  352. options.kill(pid, sig)
  353. except:
  354. io = StringIO()
  355. traceback.print_exc(file=io)
  356. tb = io.getvalue()
  357. msg = 'unknown problem killing %s (%s):%s' % (self.config.name,
  358. self.pid, tb)
  359. options.logger.critical(msg)
  360. self.change_state(ProcessStates.UNKNOWN)
  361. self.pid = 0
  362. self.killing = 0
  363. self.delay = 0
  364. return msg
  365. return None
  366. def finish(self, pid, sts):
  367. """ The process was reaped and we need to report and manage its state
  368. """
  369. self.drain()
  370. es, msg = decode_wait_status(sts)
  371. now = time.time()
  372. self.laststop = now
  373. processname = self.config.name
  374. tooquickly = now - self.laststart < self.config.startsecs
  375. exit_expected = es in self.config.exitcodes
  376. if self.killing:
  377. # likely the result of a stop request
  378. # implies STOPPING -> STOPPED
  379. self.killing = 0
  380. self.delay = 0
  381. self.exitstatus = es
  382. msg = "stopped: %s (%s)" % (processname, msg)
  383. self._assertInState(ProcessStates.STOPPING)
  384. self.change_state(ProcessStates.STOPPED)
  385. elif tooquickly:
  386. # the program did not stay up long enough to make it to RUNNING
  387. # implies STARTING -> BACKOFF
  388. self.exitstatus = None
  389. self.spawnerr = 'Exited too quickly (process log may have details)'
  390. msg = "exited: %s (%s)" % (processname, msg + "; not expected")
  391. self._assertInState(ProcessStates.STARTING)
  392. self.change_state(ProcessStates.BACKOFF)
  393. else:
  394. # this finish was not the result of a stop request, the
  395. # program was in the RUNNING state but exited implies
  396. # RUNNING -> EXITED
  397. self.delay = 0
  398. self.backoff = 0
  399. self.exitstatus = es
  400. if self.state == ProcessStates.STARTING: # pragma: no cover
  401. # XXX I don't know under which circumstances this
  402. # happens, but in the wild, there is a transition that
  403. # subverts the RUNNING state (directly from STARTING
  404. # to EXITED), so we perform the correct transition
  405. # here.
  406. self.change_state(ProcessStates.RUNNING)
  407. self._assertInState(ProcessStates.RUNNING)
  408. if exit_expected:
  409. # expected exit code
  410. msg = "exited: %s (%s)" % (processname, msg + "; expected")
  411. self.change_state(ProcessStates.EXITED, expected=True)
  412. else:
  413. # unexpected exit code
  414. self.spawnerr = 'Bad exit code %s' % es
  415. msg = "exited: %s (%s)" % (processname, msg + "; not expected")
  416. self.change_state(ProcessStates.EXITED, expected=False)
  417. self.config.options.logger.info(msg)
  418. self.pid = 0
  419. self.config.options.close_parent_pipes(self.pipes)
  420. self.pipes = {}
  421. self.dispatchers = {}
  422. # if we died before we processed the current event (only happens
  423. # if we're an event listener), notify the event system that this
  424. # event was rejected so it can be processed again.
  425. if self.event is not None:
  426. # Note: this should only be true if we were in the BUSY
  427. # state when finish() was called.
  428. events.notify(events.EventRejectedEvent(self, self.event))
  429. self.event = None
  430. def set_uid(self):
  431. if self.config.uid is None:
  432. return
  433. msg = self.config.options.dropPrivileges(self.config.uid)
  434. return msg
  435. def __lt__(self, other):
  436. return self.config.priority < other.config.priority
  437. def __eq__(self, other):
  438. # sort by priority
  439. return self.config.priority == other.config.priority
  440. def __repr__(self):
  441. return '<Subprocess at %s with name %s in state %s>' % (
  442. id(self),
  443. self.config.name,
  444. getProcessStateDescription(self.get_state()))
  445. def get_state(self):
  446. return self.state
  447. def transition(self):
  448. now = time.time()
  449. state = self.state
  450. logger = self.config.options.logger
  451. if self.config.options.mood > SupervisorStates.RESTARTING:
  452. # dont start any processes if supervisor is shutting down
  453. if state == ProcessStates.EXITED:
  454. if self.config.autorestart:
  455. if self.config.autorestart is RestartUnconditionally:
  456. # EXITED -> STARTING
  457. self.spawn()
  458. else: # autorestart is RestartWhenExitUnexpected
  459. if self.exitstatus not in self.config.exitcodes:
  460. # EXITED -> STARTING
  461. self.spawn()
  462. elif state == ProcessStates.STOPPED and not self.laststart:
  463. if self.config.autostart:
  464. # STOPPED -> STARTING
  465. self.spawn()
  466. elif state == ProcessStates.BACKOFF:
  467. if self.backoff <= self.config.startretries:
  468. if now > self.delay:
  469. # BACKOFF -> STARTING
  470. self.spawn()
  471. if state == ProcessStates.STARTING:
  472. if now - self.laststart > self.config.startsecs:
  473. # STARTING -> RUNNING if the proc has started
  474. # successfully and it has stayed up for at least
  475. # proc.config.startsecs,
  476. self.delay = 0
  477. self.backoff = 0
  478. self._assertInState(ProcessStates.STARTING)
  479. self.change_state(ProcessStates.RUNNING)
  480. msg = (
  481. 'entered RUNNING state, process has stayed up for '
  482. '> than %s seconds (startsecs)' % self.config.startsecs)
  483. logger.info('success: %s %s' % (self.config.name, msg))
  484. if state == ProcessStates.BACKOFF:
  485. if self.backoff > self.config.startretries:
  486. # BACKOFF -> FATAL if the proc has exceeded its number
  487. # of retries
  488. self.give_up()
  489. msg = ('entered FATAL state, too many start retries too '
  490. 'quickly')
  491. logger.info('gave up: %s %s' % (self.config.name, msg))
  492. elif state == ProcessStates.STOPPING:
  493. time_left = self.delay - now
  494. if time_left <= 0:
  495. # kill processes which are taking too long to stop with a final
  496. # sigkill. if this doesn't kill it, the process will be stuck
  497. # in the STOPPING state forever.
  498. self.config.options.logger.warn(
  499. 'killing %r (%s) with SIGKILL' % (self.config.name,
  500. self.pid))
  501. self.kill(signal.SIGKILL)
  502. class FastCGISubprocess(Subprocess):
  503. """Extends Subprocess class to handle FastCGI subprocesses"""
  504. def __init__(self, config):
  505. Subprocess.__init__(self, config)
  506. self.fcgi_sock = None
  507. def before_spawn(self):
  508. """
  509. The FastCGI socket needs to be created by the parent before we fork
  510. """
  511. if self.group is None:
  512. raise NotImplementedError('No group set for FastCGISubprocess')
  513. if not hasattr(self.group, 'socket_manager'):
  514. raise NotImplementedError('No SocketManager set for '
  515. '%s:%s' % (self.group, dir(self.group)))
  516. self.fcgi_sock = self.group.socket_manager.get_socket()
  517. def spawn(self):
  518. """
  519. Overrides Subprocess.spawn() so we can hook in before it happens
  520. """
  521. self.before_spawn()
  522. pid = Subprocess.spawn(self)
  523. if pid is None:
  524. #Remove object reference to decrement the reference count on error
  525. self.fcgi_sock = None
  526. return pid
  527. def after_finish(self):
  528. """
  529. Releases reference to FastCGI socket when process is reaped
  530. """
  531. #Remove object reference to decrement the reference count
  532. self.fcgi_sock = None
  533. def finish(self, pid, sts):
  534. """
  535. Overrides Subprocess.finish() so we can hook in after it happens
  536. """
  537. retval = Subprocess.finish(self, pid, sts)
  538. self.after_finish()
  539. return retval
  540. def _prepare_child_fds(self):
  541. """
  542. Overrides Subprocess._prepare_child_fds()
  543. The FastCGI socket needs to be set to file descriptor 0 in the child
  544. """
  545. sock_fd = self.fcgi_sock.fileno()
  546. options = self.config.options
  547. options.dup2(sock_fd, 0)
  548. options.dup2(self.pipes['child_stdout'], 1)
  549. if self.config.redirect_stderr:
  550. options.dup2(self.pipes['child_stdout'], 2)
  551. else:
  552. options.dup2(self.pipes['child_stderr'], 2)
  553. for i in range(3, options.minfds):
  554. options.close_fd(i)
  555. @total_ordering
  556. class ProcessGroupBase(object):
  557. def __init__(self, config):
  558. self.config = config
  559. self.processes = {}
  560. for pconfig in self.config.process_configs:
  561. self.processes[pconfig.name] = pconfig.make_process(self)
  562. def __lt__(self, other):
  563. return self.config.priority < other.config.priority
  564. def __eq__(self, other):
  565. return self.config.priority == other.config.priority
  566. def __repr__(self):
  567. return '<%s instance at %s named %s>' % (self.__class__, id(self),
  568. self.config.name)
  569. def removelogs(self):
  570. for process in self.processes.values():
  571. process.removelogs()
  572. def reopenlogs(self):
  573. for process in self.processes.values():
  574. process.reopenlogs()
  575. def stop_all(self):
  576. processes = list(self.processes.values())
  577. processes.sort()
  578. processes.reverse() # stop in desc priority order
  579. for proc in processes:
  580. state = proc.get_state()
  581. if state == ProcessStates.RUNNING:
  582. # RUNNING -> STOPPING
  583. proc.stop()
  584. elif state == ProcessStates.STARTING:
  585. # STARTING -> STOPPING
  586. proc.stop()
  587. elif state == ProcessStates.BACKOFF:
  588. # BACKOFF -> FATAL
  589. proc.give_up()
  590. def get_unstopped_processes(self):
  591. """ Processes which aren't in a state that is considered 'stopped' """
  592. return [ x for x in self.processes.values() if x.get_state() not in
  593. STOPPED_STATES ]
  594. def get_dispatchers(self):
  595. dispatchers = {}
  596. for process in self.processes.values():
  597. dispatchers.update(process.dispatchers)
  598. return dispatchers
  599. class ProcessGroup(ProcessGroupBase):
  600. def transition(self):
  601. for proc in self.processes.values():
  602. proc.transition()
  603. class FastCGIProcessGroup(ProcessGroup):
  604. def __init__(self, config, **kwargs):
  605. ProcessGroup.__init__(self, config)
  606. sockManagerKlass = kwargs.get('socketManager', SocketManager)
  607. self.socket_manager = sockManagerKlass(config.socket_config,
  608. logger=config.options.logger)
  609. # It's not required to call get_socket() here but we want
  610. # to fail early during start up if there is a config error
  611. try:
  612. self.socket_manager.get_socket()
  613. except Exception as e:
  614. raise ValueError(
  615. 'Could not create FastCGI socket %s: %s' % (
  616. self.socket_manager.config(), e)
  617. )
  618. class EventListenerPool(ProcessGroupBase):
  619. def __init__(self, config):
  620. ProcessGroupBase.__init__(self, config)
  621. self.event_buffer = []
  622. for event_type in self.config.pool_events:
  623. events.subscribe(event_type, self._acceptEvent)
  624. events.subscribe(events.EventRejectedEvent, self.handle_rejected)
  625. self.serial = -1
  626. self.last_dispatch = 0
  627. self.dispatch_throttle = 0 # in seconds: .00195 is an interesting one
  628. def handle_rejected(self, event):
  629. process = event.process
  630. procs = self.processes.values()
  631. if process in procs: # this is one of our processes
  632. # rebuffer the event
  633. self._acceptEvent(event.event, head=True)
  634. def transition(self):
  635. processes = self.processes.values()
  636. dispatch_capable = False
  637. for process in processes:
  638. process.transition()
  639. # this is redundant, we do it in _dispatchEvent too, but we
  640. # want to reduce function call overhead
  641. if process.state == ProcessStates.RUNNING:
  642. if process.listener_state == EventListenerStates.READY:
  643. dispatch_capable = True
  644. if dispatch_capable:
  645. if self.dispatch_throttle:
  646. now = time.time()
  647. if now - self.last_dispatch < self.dispatch_throttle:
  648. return
  649. self.dispatch()
  650. def dispatch(self):
  651. while self.event_buffer:
  652. # dispatch the oldest event
  653. event = self.event_buffer.pop(0)
  654. ok = self._dispatchEvent(event)
  655. if not ok:
  656. # if we can't dispatch an event, rebuffer it and stop trying
  657. # to process any further events in the buffer
  658. self._acceptEvent(event, head=True)
  659. break
  660. self.last_dispatch = time.time()
  661. def _acceptEvent(self, event, head=False):
  662. # events are required to be instances
  663. # this has a side effect to fail with an attribute error on 'old style'
  664. # classes
  665. if not hasattr(event, 'serial'):
  666. event.serial = new_serial(GlobalSerial)
  667. if not hasattr(event, 'pool_serials'):
  668. event.pool_serials = {}
  669. if self.config.name not in event.pool_serials:
  670. event.pool_serials[self.config.name] = new_serial(self)
  671. else:
  672. self.config.options.logger.debug(
  673. 'rebuffering event %s for pool %s (bufsize %s)' % (
  674. (event.serial, self.config.name, len(self.event_buffer))))
  675. if len(self.event_buffer) >= self.config.buffer_size:
  676. if self.event_buffer:
  677. # discard the oldest event
  678. discarded_event = self.event_buffer.pop(0)
  679. self.config.options.logger.error(
  680. 'pool %s event buffer overflowed, discarding event %s' % (
  681. (self.config.name, discarded_event.serial)))
  682. if head:
  683. self.event_buffer.insert(0, event)
  684. else:
  685. self.event_buffer.append(event)
  686. def _dispatchEvent(self, event):
  687. pool_serial = event.pool_serials[self.config.name]
  688. for process in self.processes.values():
  689. if process.state != ProcessStates.RUNNING:
  690. continue
  691. if process.listener_state == EventListenerStates.READY:
  692. payload = str(event)
  693. try:
  694. event_type = event.__class__
  695. serial = event.serial
  696. envelope = self._eventEnvelope(event_type, serial,
  697. pool_serial, payload)
  698. process.write(as_bytes(envelope))
  699. except OSError as why:
  700. if why.args[0] != errno.EPIPE:
  701. raise
  702. continue
  703. process.listener_state = EventListenerStates.BUSY
  704. process.event = event
  705. self.config.options.logger.debug(
  706. 'event %s sent to listener %s' % (
  707. event.serial, process.config.name))
  708. return True
  709. return False
  710. def _eventEnvelope(self, event_type, serial, pool_serial, payload):
  711. event_name = events.getEventNameByType(event_type)
  712. payload_len = len(payload)
  713. D = {
  714. 'ver':'3.0',
  715. 'sid':self.config.options.identifier,
  716. 'serial':serial,
  717. 'pool_name':self.config.name,
  718. 'pool_serial':pool_serial,
  719. 'event_name':event_name,
  720. 'len':payload_len,
  721. 'payload':payload,
  722. }
  723. return ('ver:%(ver)s server:%(sid)s serial:%(serial)s '
  724. 'pool:%(pool_name)s poolserial:%(pool_serial)s '
  725. 'eventname:%(event_name)s len:%(len)s\n%(payload)s' % D)
  726. class GlobalSerial(object):
  727. def __init__(self):
  728. self.serial = -1
  729. GlobalSerial = GlobalSerial() # singleton
  730. def new_serial(inst):
  731. if inst.serial == maxint:
  732. inst.serial = -1
  733. inst.serial += 1
  734. return inst.serial