process.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866
  1. import os
  2. import time
  3. import errno
  4. import shlex
  5. import traceback
  6. import signal
  7. from supervisor.compat import maxint
  8. from supervisor.compat import StringIO
  9. from supervisor.compat import total_ordering
  10. from supervisor.medusa import asyncore_25 as asyncore
  11. from supervisor.states import ProcessStates
  12. from supervisor.states import SupervisorStates
  13. from supervisor.states import getProcessStateDescription
  14. from supervisor.states import STOPPED_STATES
  15. from supervisor.options import decode_wait_status
  16. from supervisor.options import signame
  17. from supervisor.options import ProcessException, BadCommand
  18. from supervisor.dispatchers import EventListenerStates
  19. from supervisor import events
  20. from supervisor.datatypes import RestartUnconditionally
  21. from supervisor.socket_manager import SocketManager
  22. @total_ordering
  23. class Subprocess(object):
  24. """A class to manage a subprocess."""
  25. # Initial state; overridden by instance variables
  26. pid = 0 # Subprocess pid; 0 when not running
  27. config = None # ProcessConfig instance
  28. state = None # process state code
  29. listener_state = None # listener state code (if we're an event listener)
  30. event = None # event currently being processed (if we're an event listener)
  31. laststart = 0 # Last time the subprocess was started; 0 if never
  32. laststop = 0 # Last time the subprocess was stopped; 0 if never
  33. delay = 0 # If nonzero, delay starting or killing until this time
  34. administrative_stop = 0 # true if the process has been stopped by an admin
  35. system_stop = 0 # true if the process has been stopped by the system
  36. killing = 0 # flag determining whether we are trying to kill this proc
  37. backoff = 0 # backoff counter (to startretries)
  38. dispatchers = None # asyncore output dispatchers (keyed by fd)
  39. pipes = None # map of channel name to file descriptor #
  40. exitstatus = None # status attached to dead process by finish()
  41. spawnerr = None # error message attached by spawn() if any
  42. group = None # ProcessGroup instance if process is in the group
  43. def __init__(self, config):
  44. """Constructor.
  45. Argument is a ProcessConfig instance.
  46. """
  47. self.config = config
  48. self.dispatchers = {}
  49. self.pipes = {}
  50. self.state = ProcessStates.STOPPED
  51. def removelogs(self):
  52. for dispatcher in self.dispatchers.values():
  53. if hasattr(dispatcher, 'removelogs'):
  54. dispatcher.removelogs()
  55. def reopenlogs(self):
  56. for dispatcher in self.dispatchers.values():
  57. if hasattr(dispatcher, 'reopenlogs'):
  58. dispatcher.reopenlogs()
  59. def drain(self):
  60. for dispatcher in self.dispatchers.values():
  61. # note that we *must* call readable() for every
  62. # dispatcher, as it may have side effects for a given
  63. # dispatcher (eg. call handle_listener_state_change for
  64. # event listener processes)
  65. if dispatcher.readable():
  66. dispatcher.handle_read_event()
  67. if dispatcher.writable():
  68. dispatcher.handle_write_event()
  69. def write(self, chars):
  70. if not self.pid or self.killing:
  71. raise OSError(errno.EPIPE, "Process already closed")
  72. stdin_fd = self.pipes['stdin']
  73. if stdin_fd is None:
  74. raise OSError(errno.EPIPE, "Process has no stdin channel")
  75. dispatcher = self.dispatchers[stdin_fd]
  76. if dispatcher.closed:
  77. raise OSError(errno.EPIPE, "Process' stdin channel is closed")
  78. dispatcher.input_buffer += chars
  79. dispatcher.flush() # this must raise EPIPE if the pipe is closed
  80. def get_execv_args(self):
  81. """Internal: turn a program name into a file name, using $PATH,
  82. make sure it exists / is executable, raising a ProcessException
  83. if not """
  84. try:
  85. commandargs = shlex.split(self.config.command)
  86. except ValueError as e:
  87. raise BadCommand("can't parse command %r: %s" % \
  88. (self.config.command, str(e)))
  89. if commandargs:
  90. program = commandargs[0]
  91. else:
  92. raise BadCommand("command is empty")
  93. if "/" in program:
  94. filename = program
  95. try:
  96. st = self.config.options.stat(filename)
  97. except OSError:
  98. st = None
  99. else:
  100. path = self.config.options.get_path()
  101. found = None
  102. st = None
  103. for dir in path:
  104. found = os.path.join(dir, program)
  105. try:
  106. st = self.config.options.stat(found)
  107. except OSError:
  108. pass
  109. else:
  110. break
  111. if st is None:
  112. filename = program
  113. else:
  114. filename = found
  115. # check_execv_args will raise a ProcessException if the execv
  116. # args are bogus, we break it out into a separate options
  117. # method call here only to service unit tests
  118. self.config.options.check_execv_args(filename, commandargs, st)
  119. return filename, commandargs
  120. event_map = {
  121. ProcessStates.BACKOFF: events.ProcessStateBackoffEvent,
  122. ProcessStates.FATAL: events.ProcessStateFatalEvent,
  123. ProcessStates.UNKNOWN: events.ProcessStateUnknownEvent,
  124. ProcessStates.STOPPED: events.ProcessStateStoppedEvent,
  125. ProcessStates.EXITED: events.ProcessStateExitedEvent,
  126. ProcessStates.RUNNING: events.ProcessStateRunningEvent,
  127. ProcessStates.STARTING: events.ProcessStateStartingEvent,
  128. ProcessStates.STOPPING: events.ProcessStateStoppingEvent,
  129. }
  130. def change_state(self, new_state, expected=True):
  131. old_state = self.state
  132. if new_state is old_state:
  133. # exists for unit tests
  134. return False
  135. event_class = self.event_map.get(new_state)
  136. if event_class is not None:
  137. event = event_class(self, old_state, expected)
  138. events.notify(event)
  139. if new_state == ProcessStates.BACKOFF:
  140. now = time.time()
  141. self.backoff += 1
  142. self.delay = now + self.backoff
  143. self.state = new_state
  144. def _assertInState(self, *states):
  145. if self.state not in states:
  146. current_state = getProcessStateDescription(self.state)
  147. allowable_states = ' '.join(map(getProcessStateDescription, states))
  148. raise AssertionError('Assertion failed for %s: %s not in %s' % (
  149. self.config.name, current_state, allowable_states))
  150. def record_spawnerr(self, msg):
  151. self.spawnerr = msg
  152. self.config.options.logger.info("spawnerr: %s" % msg)
  153. def spawn(self):
  154. """Start the subprocess. It must not be running already.
  155. Return the process id. If the fork() call fails, return None.
  156. """
  157. options = self.config.options
  158. if self.pid:
  159. msg = 'process %r already running' % self.config.name
  160. options.logger.warn(msg)
  161. return
  162. self.killing = 0
  163. self.spawnerr = None
  164. self.exitstatus = None
  165. self.system_stop = 0
  166. self.administrative_stop = 0
  167. self.laststart = time.time()
  168. self._assertInState(ProcessStates.EXITED, ProcessStates.FATAL,
  169. ProcessStates.BACKOFF, ProcessStates.STOPPED)
  170. self.change_state(ProcessStates.STARTING)
  171. try:
  172. filename, argv = self.get_execv_args()
  173. except ProcessException as what:
  174. self.record_spawnerr(what.args[0])
  175. self._assertInState(ProcessStates.STARTING)
  176. self.change_state(ProcessStates.BACKOFF)
  177. return
  178. try:
  179. self.dispatchers, self.pipes = self.config.make_dispatchers(self)
  180. except OSError as why:
  181. code = why.args[0]
  182. if code == errno.EMFILE:
  183. # too many file descriptors open
  184. msg = 'too many open files to spawn %r' % self.config.name
  185. else:
  186. msg = 'unknown error: %s' % errno.errorcode.get(code, code)
  187. self.record_spawnerr(msg)
  188. self._assertInState(ProcessStates.STARTING)
  189. self.change_state(ProcessStates.BACKOFF)
  190. return
  191. try:
  192. pid = options.fork()
  193. except OSError as why:
  194. code = why.args[0]
  195. if code == errno.EAGAIN:
  196. # process table full
  197. msg = ('Too many processes in process table to spawn %r' %
  198. self.config.name)
  199. else:
  200. msg = 'unknown error: %s' % errno.errorcode.get(code, code)
  201. self.record_spawnerr(msg)
  202. self._assertInState(ProcessStates.STARTING)
  203. self.change_state(ProcessStates.BACKOFF)
  204. options.close_parent_pipes(self.pipes)
  205. options.close_child_pipes(self.pipes)
  206. return
  207. if pid != 0:
  208. return self._spawn_as_parent(pid)
  209. else:
  210. return self._spawn_as_child(filename, argv)
  211. def _spawn_as_parent(self, pid):
  212. # Parent
  213. self.pid = pid
  214. options = self.config.options
  215. options.close_child_pipes(self.pipes)
  216. options.logger.info('spawned: %r with pid %s' % (self.config.name, pid))
  217. self.spawnerr = None
  218. self.delay = time.time() + self.config.startsecs
  219. options.pidhistory[pid] = self
  220. return pid
  221. def _prepare_child_fds(self):
  222. options = self.config.options
  223. options.dup2(self.pipes['child_stdin'], 0)
  224. options.dup2(self.pipes['child_stdout'], 1)
  225. if self.config.redirect_stderr:
  226. options.dup2(self.pipes['child_stdout'], 2)
  227. else:
  228. options.dup2(self.pipes['child_stderr'], 2)
  229. for i in range(3, options.minfds):
  230. options.close_fd(i)
  231. def _spawn_as_child(self, filename, argv):
  232. options = self.config.options
  233. try:
  234. # prevent child from receiving signals sent to the
  235. # parent by calling os.setpgrp to create a new process
  236. # group for the child; this prevents, for instance,
  237. # the case of child processes being sent a SIGINT when
  238. # running supervisor in foreground mode and Ctrl-C in
  239. # the terminal window running supervisord is pressed.
  240. # Presumably it also prevents HUP, etc received by
  241. # supervisord from being sent to children.
  242. options.setpgrp()
  243. self._prepare_child_fds()
  244. # sending to fd 2 will put this output in the stderr log
  245. # set user
  246. setuid_msg = self.set_uid()
  247. if setuid_msg:
  248. uid = self.config.uid
  249. msg = "couldn't setuid to %s: %s\n" % (uid, setuid_msg)
  250. options.write(2, "supervisor: " + msg)
  251. return # finally clause will exit the child process
  252. # set environment
  253. env = os.environ.copy()
  254. env['SUPERVISOR_ENABLED'] = '1'
  255. serverurl = self.config.serverurl
  256. if serverurl is None: # unset
  257. serverurl = self.config.options.serverurl # might still be None
  258. if serverurl:
  259. env['SUPERVISOR_SERVER_URL'] = serverurl
  260. env['SUPERVISOR_PROCESS_NAME'] = self.config.name
  261. if self.group:
  262. env['SUPERVISOR_GROUP_NAME'] = self.group.config.name
  263. if self.config.environment is not None:
  264. env.update(self.config.environment)
  265. # change directory
  266. cwd = self.config.directory
  267. try:
  268. if cwd is not None:
  269. options.chdir(cwd)
  270. except OSError as why:
  271. code = errno.errorcode.get(why.args[0], why.args[0])
  272. msg = "couldn't chdir to %s: %s\n" % (cwd, code)
  273. options.write(2, "supervisor: " + msg)
  274. return # finally clause will exit the child process
  275. # set umask, then execve
  276. try:
  277. if self.config.umask is not None:
  278. options.setumask(self.config.umask)
  279. options.execve(filename, argv, env)
  280. except OSError as why:
  281. code = errno.errorcode.get(why.args[0], why.args[0])
  282. msg = "couldn't exec %s: %s\n" % (argv[0], code)
  283. options.write(2, "supervisor: " + msg)
  284. except:
  285. (file, fun, line), t,v,tbinfo = asyncore.compact_traceback()
  286. error = '%s, %s: file: %s line: %s' % (t, v, file, line)
  287. msg = "couldn't exec %s: %s\n" % (filename, error)
  288. options.write(2, "supervisor: " + msg)
  289. # this point should only be reached if execve failed.
  290. # the finally clause will exit the child process.
  291. finally:
  292. options.write(2, "supervisor: child process was not spawned\n")
  293. options._exit(127) # exit process with code for spawn failure
  294. def stop(self):
  295. """ Administrative stop """
  296. self.administrative_stop = 1
  297. return self.kill(self.config.stopsignal)
  298. def give_up(self):
  299. self.delay = 0
  300. self.backoff = 0
  301. self.system_stop = 1
  302. self._assertInState(ProcessStates.BACKOFF)
  303. self.change_state(ProcessStates.FATAL)
  304. def kill(self, sig):
  305. """Send a signal to the subprocess. This may or may not kill it.
  306. Return None if the signal was sent, or an error message string
  307. if an error occurred or if the subprocess is not running.
  308. """
  309. now = time.time()
  310. options = self.config.options
  311. # Properly stop processes in BACKOFF state.
  312. if self.state == ProcessStates.BACKOFF:
  313. msg = ("Attempted to kill %s, which is in BACKOFF state." %
  314. (self.config.name))
  315. options.logger.debug(msg)
  316. self.change_state(ProcessStates.STOPPED)
  317. return None
  318. if not self.pid:
  319. msg = ("attempted to kill %s with sig %s but it wasn't running" %
  320. (self.config.name, signame(sig)))
  321. options.logger.debug(msg)
  322. return msg
  323. #If we're in the stopping state, then we've already sent the stop
  324. #signal and this is the kill signal
  325. if self.state == ProcessStates.STOPPING:
  326. killasgroup = self.config.killasgroup
  327. else:
  328. killasgroup = self.config.stopasgroup
  329. as_group = ""
  330. if killasgroup:
  331. as_group = "process group "
  332. options.logger.debug('killing %s (pid %s) %swith signal %s'
  333. % (self.config.name,
  334. self.pid,
  335. as_group,
  336. signame(sig))
  337. )
  338. # RUNNING/STARTING/STOPPING -> STOPPING
  339. self.killing = 1
  340. self.delay = now + self.config.stopwaitsecs
  341. # we will already be in the STOPPING state if we're doing a
  342. # SIGKILL as a result of overrunning stopwaitsecs
  343. self._assertInState(ProcessStates.RUNNING,ProcessStates.STARTING,
  344. ProcessStates.STOPPING)
  345. self.change_state(ProcessStates.STOPPING)
  346. pid = self.pid
  347. if killasgroup:
  348. # send to the whole process group instead
  349. pid = -self.pid
  350. try:
  351. options.kill(pid, sig)
  352. except:
  353. io = StringIO()
  354. traceback.print_exc(file=io)
  355. tb = io.getvalue()
  356. msg = 'unknown problem killing %s (%s):%s' % (self.config.name,
  357. self.pid, tb)
  358. options.logger.critical(msg)
  359. self.change_state(ProcessStates.UNKNOWN)
  360. self.pid = 0
  361. self.killing = 0
  362. self.delay = 0
  363. return msg
  364. return None
  365. def finish(self, pid, sts):
  366. """ The process was reaped and we need to report and manage its state
  367. """
  368. self.drain()
  369. es, msg = decode_wait_status(sts)
  370. now = time.time()
  371. self.laststop = now
  372. processname = self.config.name
  373. tooquickly = now - self.laststart < self.config.startsecs
  374. exit_expected = es in self.config.exitcodes
  375. if self.killing:
  376. # likely the result of a stop request
  377. # implies STOPPING -> STOPPED
  378. self.killing = 0
  379. self.delay = 0
  380. self.exitstatus = es
  381. msg = "stopped: %s (%s)" % (processname, msg)
  382. self._assertInState(ProcessStates.STOPPING)
  383. self.change_state(ProcessStates.STOPPED)
  384. elif tooquickly:
  385. # the program did not stay up long enough to make it to RUNNING
  386. # implies STARTING -> BACKOFF
  387. self.exitstatus = None
  388. self.spawnerr = 'Exited too quickly (process log may have details)'
  389. msg = "exited: %s (%s)" % (processname, msg + "; not expected")
  390. self._assertInState(ProcessStates.STARTING)
  391. self.change_state(ProcessStates.BACKOFF)
  392. else:
  393. # this finish was not the result of a stop request, the
  394. # program was in the RUNNING state but exited implies
  395. # RUNNING -> EXITED
  396. self.delay = 0
  397. self.backoff = 0
  398. self.exitstatus = es
  399. if self.state == ProcessStates.STARTING: # pragma: no cover
  400. # XXX I don't know under which circumstances this
  401. # happens, but in the wild, there is a transition that
  402. # subverts the RUNNING state (directly from STARTING
  403. # to EXITED), so we perform the correct transition
  404. # here.
  405. self.change_state(ProcessStates.RUNNING)
  406. self._assertInState(ProcessStates.RUNNING)
  407. if exit_expected:
  408. # expected exit code
  409. msg = "exited: %s (%s)" % (processname, msg + "; expected")
  410. self.change_state(ProcessStates.EXITED, expected=True)
  411. else:
  412. # unexpected exit code
  413. self.spawnerr = 'Bad exit code %s' % es
  414. msg = "exited: %s (%s)" % (processname, msg + "; not expected")
  415. self.change_state(ProcessStates.EXITED, expected=False)
  416. self.config.options.logger.info(msg)
  417. self.pid = 0
  418. self.config.options.close_parent_pipes(self.pipes)
  419. self.pipes = {}
  420. self.dispatchers = {}
  421. # if we died before we processed the current event (only happens
  422. # if we're an event listener), notify the event system that this
  423. # event was rejected so it can be processed again.
  424. if self.event is not None:
  425. # Note: this should only be true if we were in the BUSY
  426. # state when finish() was called.
  427. events.notify(events.EventRejectedEvent(self, self.event))
  428. self.event = None
  429. def set_uid(self):
  430. if self.config.uid is None:
  431. return
  432. msg = self.config.options.dropPrivileges(self.config.uid)
  433. return msg
  434. def __lt__(self, other):
  435. return self.config.priority < other.config.priority
  436. def __eq__(self, other):
  437. # sort by priority
  438. return self.config.priority == other.config.priority
  439. def __repr__(self):
  440. return '<Subprocess at %s with name %s in state %s>' % (
  441. id(self),
  442. self.config.name,
  443. getProcessStateDescription(self.get_state()))
  444. def get_state(self):
  445. return self.state
  446. def transition(self):
  447. now = time.time()
  448. state = self.state
  449. logger = self.config.options.logger
  450. if self.config.options.mood > SupervisorStates.RESTARTING:
  451. # dont start any processes if supervisor is shutting down
  452. if state == ProcessStates.EXITED:
  453. if self.config.autorestart:
  454. if self.config.autorestart is RestartUnconditionally:
  455. # EXITED -> STARTING
  456. self.spawn()
  457. else: # autorestart is RestartWhenExitUnexpected
  458. if self.exitstatus not in self.config.exitcodes:
  459. # EXITED -> STARTING
  460. self.spawn()
  461. elif state == ProcessStates.STOPPED and not self.laststart:
  462. if self.config.autostart:
  463. # STOPPED -> STARTING
  464. self.spawn()
  465. elif state == ProcessStates.BACKOFF:
  466. if self.backoff <= self.config.startretries:
  467. if now > self.delay:
  468. # BACKOFF -> STARTING
  469. self.spawn()
  470. if state == ProcessStates.STARTING:
  471. if now - self.laststart > self.config.startsecs:
  472. # STARTING -> RUNNING if the proc has started
  473. # successfully and it has stayed up for at least
  474. # proc.config.startsecs,
  475. self.delay = 0
  476. self.backoff = 0
  477. self._assertInState(ProcessStates.STARTING)
  478. self.change_state(ProcessStates.RUNNING)
  479. msg = (
  480. 'entered RUNNING state, process has stayed up for '
  481. '> than %s seconds (startsecs)' % self.config.startsecs)
  482. logger.info('success: %s %s' % (self.config.name, msg))
  483. if state == ProcessStates.BACKOFF:
  484. if self.backoff > self.config.startretries:
  485. # BACKOFF -> FATAL if the proc has exceeded its number
  486. # of retries
  487. self.give_up()
  488. msg = ('entered FATAL state, too many start retries too '
  489. 'quickly')
  490. logger.info('gave up: %s %s' % (self.config.name, msg))
  491. elif state == ProcessStates.STOPPING:
  492. time_left = self.delay - now
  493. if time_left <= 0:
  494. # kill processes which are taking too long to stop with a final
  495. # sigkill. if this doesn't kill it, the process will be stuck
  496. # in the STOPPING state forever.
  497. self.config.options.logger.warn(
  498. 'killing %r (%s) with SIGKILL' % (self.config.name,
  499. self.pid))
  500. self.kill(signal.SIGKILL)
  501. class FastCGISubprocess(Subprocess):
  502. """Extends Subprocess class to handle FastCGI subprocesses"""
  503. def __init__(self, config):
  504. Subprocess.__init__(self, config)
  505. self.fcgi_sock = None
  506. def before_spawn(self):
  507. """
  508. The FastCGI socket needs to be created by the parent before we fork
  509. """
  510. if self.group is None:
  511. raise NotImplementedError('No group set for FastCGISubprocess')
  512. if not hasattr(self.group, 'socket_manager'):
  513. raise NotImplementedError('No SocketManager set for '
  514. '%s:%s' % (self.group, dir(self.group)))
  515. self.fcgi_sock = self.group.socket_manager.get_socket()
  516. def spawn(self):
  517. """
  518. Overrides Subprocess.spawn() so we can hook in before it happens
  519. """
  520. self.before_spawn()
  521. pid = Subprocess.spawn(self)
  522. if pid is None:
  523. #Remove object reference to decrement the reference count on error
  524. self.fcgi_sock = None
  525. return pid
  526. def after_finish(self):
  527. """
  528. Releases reference to FastCGI socket when process is reaped
  529. """
  530. #Remove object reference to decrement the reference count
  531. self.fcgi_sock = None
  532. def finish(self, pid, sts):
  533. """
  534. Overrides Subprocess.finish() so we can hook in after it happens
  535. """
  536. retval = Subprocess.finish(self, pid, sts)
  537. self.after_finish()
  538. return retval
  539. def _prepare_child_fds(self):
  540. """
  541. Overrides Subprocess._prepare_child_fds()
  542. The FastCGI socket needs to be set to file descriptor 0 in the child
  543. """
  544. sock_fd = self.fcgi_sock.fileno()
  545. options = self.config.options
  546. options.dup2(sock_fd, 0)
  547. options.dup2(self.pipes['child_stdout'], 1)
  548. if self.config.redirect_stderr:
  549. options.dup2(self.pipes['child_stdout'], 2)
  550. else:
  551. options.dup2(self.pipes['child_stderr'], 2)
  552. for i in range(3, options.minfds):
  553. options.close_fd(i)
  554. @total_ordering
  555. class ProcessGroupBase(object):
  556. def __init__(self, config):
  557. self.config = config
  558. self.processes = {}
  559. for pconfig in self.config.process_configs:
  560. self.processes[pconfig.name] = pconfig.make_process(self)
  561. def __lt__(self, other):
  562. return self.config.priority < other.config.priority
  563. def __eq__(self, other):
  564. return self.config.priority == other.config.priority
  565. def __repr__(self):
  566. return '<%s instance at %s named %s>' % (self.__class__, id(self),
  567. self.config.name)
  568. def removelogs(self):
  569. for process in self.processes.values():
  570. process.removelogs()
  571. def reopenlogs(self):
  572. for process in self.processes.values():
  573. process.reopenlogs()
  574. def stop_all(self):
  575. processes = list(self.processes.values())
  576. processes.sort()
  577. processes.reverse() # stop in desc priority order
  578. for proc in processes:
  579. state = proc.get_state()
  580. if state == ProcessStates.RUNNING:
  581. # RUNNING -> STOPPING
  582. proc.stop()
  583. elif state == ProcessStates.STARTING:
  584. # STARTING -> STOPPING
  585. proc.stop()
  586. elif state == ProcessStates.BACKOFF:
  587. # BACKOFF -> FATAL
  588. proc.give_up()
  589. def get_unstopped_processes(self):
  590. """ Processes which aren't in a state that is considered 'stopped' """
  591. return [ x for x in self.processes.values() if x.get_state() not in
  592. STOPPED_STATES ]
  593. def get_dispatchers(self):
  594. dispatchers = {}
  595. for process in self.processes.values():
  596. dispatchers.update(process.dispatchers)
  597. return dispatchers
  598. class ProcessGroup(ProcessGroupBase):
  599. def transition(self):
  600. for proc in self.processes.values():
  601. proc.transition()
  602. class FastCGIProcessGroup(ProcessGroup):
  603. def __init__(self, config, **kwargs):
  604. ProcessGroup.__init__(self, config)
  605. sockManagerKlass = kwargs.get('socketManager', SocketManager)
  606. self.socket_manager = sockManagerKlass(config.socket_config,
  607. logger=config.options.logger)
  608. # It's not required to call get_socket() here but we want
  609. # to fail early during start up if there is a config error
  610. try:
  611. self.socket_manager.get_socket()
  612. except Exception as e:
  613. raise ValueError(
  614. 'Could not create FastCGI socket %s: %s' % (
  615. self.socket_manager.config(), e)
  616. )
  617. class EventListenerPool(ProcessGroupBase):
  618. def __init__(self, config):
  619. ProcessGroupBase.__init__(self, config)
  620. self.event_buffer = []
  621. for event_type in self.config.pool_events:
  622. events.subscribe(event_type, self._acceptEvent)
  623. events.subscribe(events.EventRejectedEvent, self.handle_rejected)
  624. self.serial = -1
  625. self.last_dispatch = 0
  626. self.dispatch_throttle = 0 # in seconds: .00195 is an interesting one
  627. def handle_rejected(self, event):
  628. process = event.process
  629. procs = self.processes.values()
  630. if process in procs: # this is one of our processes
  631. # rebuffer the event
  632. self._acceptEvent(event.event, head=True)
  633. def transition(self):
  634. processes = self.processes.values()
  635. dispatch_capable = False
  636. for process in processes:
  637. process.transition()
  638. # this is redundant, we do it in _dispatchEvent too, but we
  639. # want to reduce function call overhead
  640. if process.state == ProcessStates.RUNNING:
  641. if process.listener_state == EventListenerStates.READY:
  642. dispatch_capable = True
  643. if dispatch_capable:
  644. if self.dispatch_throttle:
  645. now = time.time()
  646. if now - self.last_dispatch < self.dispatch_throttle:
  647. return
  648. self.dispatch()
  649. def dispatch(self):
  650. while self.event_buffer:
  651. # dispatch the oldest event
  652. event = self.event_buffer.pop(0)
  653. ok = self._dispatchEvent(event)
  654. if not ok:
  655. # if we can't dispatch an event, rebuffer it and stop trying
  656. # to process any further events in the buffer
  657. self._acceptEvent(event, head=True)
  658. break
  659. self.last_dispatch = time.time()
  660. def _acceptEvent(self, event, head=False):
  661. # events are required to be instances
  662. # this has a side effect to fail with an attribute error on 'old style'
  663. # classes
  664. if not hasattr(event, 'serial'):
  665. event.serial = new_serial(GlobalSerial)
  666. if not hasattr(event, 'pool_serials'):
  667. event.pool_serials = {}
  668. if self.config.name not in event.pool_serials:
  669. event.pool_serials[self.config.name] = new_serial(self)
  670. else:
  671. self.config.options.logger.debug(
  672. 'rebuffering event %s for pool %s (bufsize %s)' % (
  673. (event.serial, self.config.name, len(self.event_buffer))))
  674. if len(self.event_buffer) >= self.config.buffer_size:
  675. if self.event_buffer:
  676. # discard the oldest event
  677. discarded_event = self.event_buffer.pop(0)
  678. self.config.options.logger.error(
  679. 'pool %s event buffer overflowed, discarding event %s' % (
  680. (self.config.name, discarded_event.serial)))
  681. if head:
  682. self.event_buffer.insert(0, event)
  683. else:
  684. self.event_buffer.append(event)
  685. def _dispatchEvent(self, event):
  686. pool_serial = event.pool_serials[self.config.name]
  687. for process in self.processes.values():
  688. if process.state != ProcessStates.RUNNING:
  689. continue
  690. if process.listener_state == EventListenerStates.READY:
  691. payload = str(event)
  692. try:
  693. event_type = event.__class__
  694. serial = event.serial
  695. envelope = self._eventEnvelope(event_type, serial,
  696. pool_serial, payload)
  697. process.write(envelope)
  698. except OSError as why:
  699. if why.args[0] != errno.EPIPE:
  700. raise
  701. continue
  702. process.listener_state = EventListenerStates.BUSY
  703. process.event = event
  704. self.config.options.logger.debug(
  705. 'event %s sent to listener %s' % (
  706. event.serial, process.config.name))
  707. return True
  708. return False
  709. def _eventEnvelope(self, event_type, serial, pool_serial, payload):
  710. event_name = events.getEventNameByType(event_type)
  711. payload_len = len(payload)
  712. D = {
  713. 'ver':'3.0',
  714. 'sid':self.config.options.identifier,
  715. 'serial':serial,
  716. 'pool_name':self.config.name,
  717. 'pool_serial':pool_serial,
  718. 'event_name':event_name,
  719. 'len':payload_len,
  720. 'payload':payload,
  721. }
  722. return ('ver:%(ver)s server:%(sid)s serial:%(serial)s '
  723. 'pool:%(pool_name)s poolserial:%(pool_serial)s '
  724. 'eventname:%(event_name)s len:%(len)s\n%(payload)s' % D)
  725. class GlobalSerial(object):
  726. def __init__(self):
  727. self.serial = -1
  728. GlobalSerial = GlobalSerial() # singleton
  729. def new_serial(inst):
  730. if inst.serial == maxint:
  731. inst.serial = -1
  732. inst.serial += 1
  733. return inst.serial