process.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818
  1. import os
  2. import sys
  3. import time
  4. import errno
  5. import shlex
  6. import StringIO
  7. import traceback
  8. import signal
  9. from supervisor.medusa import asyncore_25 as asyncore
  10. from supervisor.states import ProcessStates
  11. from supervisor.states import SupervisorStates
  12. from supervisor.states import getProcessStateDescription
  13. from supervisor.states import STOPPED_STATES
  14. from supervisor.options import decode_wait_status
  15. from supervisor.options import signame
  16. from supervisor.options import ProcessException
  17. from supervisor.dispatchers import EventListenerStates
  18. from supervisor import events
  19. from supervisor.datatypes import RestartUnconditionally
  20. from supervisor.socket_manager import SocketManager
  21. class Subprocess:
  22. """A class to manage a subprocess."""
  23. # Initial state; overridden by instance variables
  24. pid = 0 # Subprocess pid; 0 when not running
  25. config = None # ProcessConfig instance
  26. state = None # process state code
  27. listener_state = None # listener state code (if we're an event listener)
  28. event = None # event currently being processed (if we're an event listener)
  29. laststart = 0 # Last time the subprocess was started; 0 if never
  30. laststop = 0 # Last time the subprocess was stopped; 0 if never
  31. delay = 0 # If nonzero, delay starting or killing until this time
  32. administrative_stop = 0 # true if the process has been stopped by an admin
  33. system_stop = 0 # true if the process has been stopped by the system
  34. killing = 0 # flag determining whether we are trying to kill this proc
  35. backoff = 0 # backoff counter (to startretries)
  36. dispatchers = None # asnycore output dispatchers (keyed by fd)
  37. pipes = None # map of channel name to file descriptor #
  38. exitstatus = None # status attached to dead process by finsh()
  39. spawnerr = None # error message attached by spawn() if any
  40. group = None # ProcessGroup instance if process is in the group
  41. def __init__(self, config):
  42. """Constructor.
  43. Argument is a ProcessConfig instance.
  44. """
  45. self.config = config
  46. self.dispatchers = {}
  47. self.pipes = {}
  48. self.state = ProcessStates.STOPPED
  49. def removelogs(self):
  50. for dispatcher in self.dispatchers.values():
  51. if hasattr(dispatcher, 'removelogs'):
  52. dispatcher.removelogs()
  53. def reopenlogs(self):
  54. for dispatcher in self.dispatchers.values():
  55. if hasattr(dispatcher, 'reopenlogs'):
  56. dispatcher.reopenlogs()
  57. def drain(self):
  58. for dispatcher in self.dispatchers.values():
  59. # note that we *must* call readable() for every
  60. # dispatcher, as it may have side effects for a given
  61. # dispatcher (eg. call handle_listener_state_change for
  62. # event listener processes)
  63. if dispatcher.readable():
  64. dispatcher.handle_read_event()
  65. if dispatcher.writable():
  66. dispatcher.handle_write_event()
  67. def write(self, chars):
  68. if not self.pid or self.killing:
  69. raise OSError(errno.EPIPE, "Process already closed")
  70. stdin_fd = self.pipes['stdin']
  71. if stdin_fd is None:
  72. raise OSError(errno.EPIPE, "Process has no stdin channel")
  73. dispatcher = self.dispatchers[stdin_fd]
  74. if dispatcher.closed:
  75. raise OSError(errno.EPIPE, "Process' stdin channel is closed")
  76. dispatcher.input_buffer += chars
  77. dispatcher.flush() # this must raise EPIPE if the pipe is closed
  78. def get_execv_args(self):
  79. """Internal: turn a program name into a file name, using $PATH,
  80. make sure it exists / is executable, raising a ProcessException
  81. if not """
  82. commandargs = shlex.split(self.config.command)
  83. program = commandargs[0]
  84. if "/" in program:
  85. filename = program
  86. try:
  87. st = self.config.options.stat(filename)
  88. except OSError:
  89. st = None
  90. else:
  91. path = self.config.options.get_path()
  92. found = None
  93. st = None
  94. for dir in path:
  95. found = os.path.join(dir, program)
  96. try:
  97. st = self.config.options.stat(found)
  98. except OSError:
  99. pass
  100. else:
  101. break
  102. if st is None:
  103. filename = program
  104. else:
  105. filename = found
  106. # check_execv_args will raise a ProcessException if the execv
  107. # args are bogus, we break it out into a separate options
  108. # method call here only to service unit tests
  109. self.config.options.check_execv_args(filename, commandargs, st)
  110. return filename, commandargs
  111. event_map = {
  112. ProcessStates.BACKOFF: events.ProcessStateBackoffEvent,
  113. ProcessStates.FATAL: events.ProcessStateFatalEvent,
  114. ProcessStates.UNKNOWN: events.ProcessStateUnknownEvent,
  115. ProcessStates.STOPPED: events.ProcessStateStoppedEvent,
  116. ProcessStates.EXITED: events.ProcessStateExitedEvent,
  117. ProcessStates.RUNNING: events.ProcessStateRunningEvent,
  118. ProcessStates.STARTING: events.ProcessStateStartingEvent,
  119. ProcessStates.STOPPING: events.ProcessStateStoppingEvent,
  120. }
  121. def change_state(self, new_state, expected=True):
  122. old_state = self.state
  123. if new_state is old_state:
  124. # exists for unit tests
  125. return False
  126. event_class = self.event_map.get(new_state)
  127. if event_class is not None:
  128. event = event_class(self, old_state, expected)
  129. events.notify(event)
  130. if new_state == ProcessStates.BACKOFF:
  131. now = time.time()
  132. self.backoff = self.backoff + 1
  133. self.delay = now + self.backoff
  134. self.state = new_state
  135. def _assertInState(self, *states):
  136. if self.state not in states:
  137. current_state = getProcessStateDescription(self.state)
  138. allowable_states = ' '.join(map(getProcessStateDescription, states))
  139. raise AssertionError('Assertion failed for %s: %s not in %s' % (
  140. self.config.name, current_state, allowable_states))
  141. def record_spawnerr(self, msg):
  142. self.spawnerr = msg
  143. self.config.options.logger.info("spawnerr: %s" % msg)
  144. def spawn(self):
  145. """Start the subprocess. It must not be running already.
  146. Return the process id. If the fork() call fails, return None.
  147. """
  148. options = self.config.options
  149. if self.pid:
  150. msg = 'process %r already running' % self.config.name
  151. options.logger.warn(msg)
  152. return
  153. self.killing = 0
  154. self.spawnerr = None
  155. self.exitstatus = None
  156. self.system_stop = 0
  157. self.administrative_stop = 0
  158. self.laststart = time.time()
  159. self._assertInState(ProcessStates.EXITED, ProcessStates.FATAL,
  160. ProcessStates.BACKOFF, ProcessStates.STOPPED)
  161. self.change_state(ProcessStates.STARTING)
  162. try:
  163. filename, argv = self.get_execv_args()
  164. except ProcessException, what:
  165. self.record_spawnerr(what.args[0])
  166. self._assertInState(ProcessStates.STARTING)
  167. self.change_state(ProcessStates.BACKOFF)
  168. return
  169. try:
  170. self.dispatchers, self.pipes = self.config.make_dispatchers(self)
  171. except OSError, why:
  172. code = why[0]
  173. if code == errno.EMFILE:
  174. # too many file descriptors open
  175. msg = 'too many open files to spawn %r' % self.config.name
  176. else:
  177. msg = 'unknown error: %s' % errno.errorcode.get(code, code)
  178. self.record_spawnerr(msg)
  179. self._assertInState(ProcessStates.STARTING)
  180. self.change_state(ProcessStates.BACKOFF)
  181. return
  182. try:
  183. pid = options.fork()
  184. except OSError, why:
  185. code = why[0]
  186. if code == errno.EAGAIN:
  187. # process table full
  188. msg = ('Too many processes in process table to spawn %r' %
  189. self.config.name)
  190. else:
  191. msg = 'unknown error: %s' % errno.errorcode.get(code, code)
  192. self.record_spawnerr(msg)
  193. self._assertInState(ProcessStates.STARTING)
  194. self.change_state(ProcessStates.BACKOFF)
  195. options.close_parent_pipes(self.pipes)
  196. options.close_child_pipes(self.pipes)
  197. return
  198. if pid != 0:
  199. return self._spawn_as_parent(pid)
  200. else:
  201. return self._spawn_as_child(filename, argv)
  202. def _spawn_as_parent(self, pid):
  203. # Parent
  204. self.pid = pid
  205. options = self.config.options
  206. options.close_child_pipes(self.pipes)
  207. options.logger.info('spawned: %r with pid %s' % (self.config.name, pid))
  208. self.spawnerr = None
  209. self.delay = time.time() + self.config.startsecs
  210. options.pidhistory[pid] = self
  211. return pid
  212. def _prepare_child_fds(self):
  213. options = self.config.options
  214. options.dup2(self.pipes['child_stdin'], 0)
  215. options.dup2(self.pipes['child_stdout'], 1)
  216. if self.config.redirect_stderr:
  217. options.dup2(self.pipes['child_stdout'], 2)
  218. else:
  219. options.dup2(self.pipes['child_stderr'], 2)
  220. for i in range(3, options.minfds):
  221. options.close_fd(i)
  222. def _spawn_as_child(self, filename, argv):
  223. options = self.config.options
  224. try:
  225. # prevent child from receiving signals sent to the
  226. # parent by calling os.setpgrp to create a new process
  227. # group for the child; this prevents, for instance,
  228. # the case of child processes being sent a SIGINT when
  229. # running supervisor in foreground mode and Ctrl-C in
  230. # the terminal window running supervisord is pressed.
  231. # Presumably it also prevents HUP, etc received by
  232. # supervisord from being sent to children.
  233. options.setpgrp()
  234. self._prepare_child_fds()
  235. # sending to fd 2 will put this output in the stderr log
  236. msg = self.set_uid()
  237. if msg:
  238. uid = self.config.uid
  239. s = 'supervisor: error trying to setuid to %s ' % uid
  240. options.write(2, s)
  241. options.write(2, "(%s)\n" % msg)
  242. env = os.environ.copy()
  243. env['SUPERVISOR_ENABLED'] = '1'
  244. serverurl = self.config.serverurl
  245. if serverurl is None: # unset
  246. serverurl = self.config.options.serverurl # might still be None
  247. if serverurl:
  248. env['SUPERVISOR_SERVER_URL'] = serverurl
  249. env['SUPERVISOR_PROCESS_NAME'] = self.config.name
  250. if self.group:
  251. env['SUPERVISOR_GROUP_NAME'] = self.group.config.name
  252. if self.config.environment is not None:
  253. env.update(self.config.environment)
  254. try:
  255. cwd = self.config.directory
  256. if cwd is not None:
  257. options.chdir(cwd)
  258. except OSError, why:
  259. code = errno.errorcode.get(why[0], why[0])
  260. msg = "couldn't chdir to %s: %s\n" % (cwd, code)
  261. options.write(2, msg)
  262. else:
  263. try:
  264. if self.config.umask is not None:
  265. options.setumask(self.config.umask)
  266. options.execve(filename, argv, env)
  267. except OSError, why:
  268. code = errno.errorcode.get(why[0], why[0])
  269. msg = "couldn't exec %s: %s\n" % (argv[0], code)
  270. options.write(2, msg)
  271. except:
  272. (file, fun, line), t,v,tbinfo = asyncore.compact_traceback()
  273. error = '%s, %s: file: %s line: %s' % (t, v, file, line)
  274. options.write(2, "couldn't exec %s: %s\n" % (filename,
  275. error))
  276. finally:
  277. options._exit(127)
  278. def stop(self):
  279. """ Administrative stop """
  280. self.administrative_stop = 1
  281. return self.kill(self.config.stopsignal)
  282. def give_up(self):
  283. self.delay = 0
  284. self.backoff = 0
  285. self.system_stop = 1
  286. self._assertInState(ProcessStates.BACKOFF)
  287. self.change_state(ProcessStates.FATAL)
  288. def kill(self, sig):
  289. """Send a signal to the subprocess. This may or may not kill it.
  290. Return None if the signal was sent, or an error message string
  291. if an error occurred or if the subprocess is not running.
  292. """
  293. now = time.time()
  294. options = self.config.options
  295. if not self.pid:
  296. msg = ("attempted to kill %s with sig %s but it wasn't running" %
  297. (self.config.name, signame(sig)))
  298. options.logger.debug(msg)
  299. return msg
  300. killasgroup = self.config.killasgroup and sig == signal.SIGKILL
  301. as_group = ""
  302. if killasgroup:
  303. as_group = "process group "
  304. options.logger.debug('killing %s (pid %s) %swith signal %s'
  305. % (self.config.name,
  306. self.pid,
  307. as_group,
  308. signame(sig))
  309. )
  310. # RUNNING/STARTING/STOPPING -> STOPPING
  311. self.killing = 1
  312. self.delay = now + self.config.stopwaitsecs
  313. # we will already be in the STOPPING state if we're doing a
  314. # SIGKILL as a result of overrunning stopwaitsecs
  315. self._assertInState(ProcessStates.RUNNING,ProcessStates.STARTING,
  316. ProcessStates.STOPPING)
  317. self.change_state(ProcessStates.STOPPING)
  318. pid = self.pid
  319. if killasgroup:
  320. # send to the whole process group instead
  321. pid = -self.pid
  322. try:
  323. options.kill(pid, sig)
  324. except:
  325. io = StringIO.StringIO()
  326. traceback.print_exc(file=io)
  327. tb = io.getvalue()
  328. msg = 'unknown problem killing %s (%s):%s' % (self.config.name,
  329. self.pid, tb)
  330. options.logger.critical(msg)
  331. self.change_state(ProcessStates.UNKNOWN)
  332. self.pid = 0
  333. self.killing = 0
  334. self.delay = 0
  335. return msg
  336. return None
  337. def finish(self, pid, sts):
  338. """ The process was reaped and we need to report and manage its state
  339. """
  340. self.drain()
  341. es, msg = decode_wait_status(sts)
  342. now = time.time()
  343. self.laststop = now
  344. processname = self.config.name
  345. tooquickly = now - self.laststart < self.config.startsecs
  346. exit_expected = es in self.config.exitcodes
  347. if self.killing:
  348. # likely the result of a stop request
  349. # implies STOPPING -> STOPPED
  350. self.killing = 0
  351. self.delay = 0
  352. self.exitstatus = es
  353. msg = "stopped: %s (%s)" % (processname, msg)
  354. self._assertInState(ProcessStates.STOPPING)
  355. self.change_state(ProcessStates.STOPPED)
  356. elif tooquickly:
  357. # the program did not stay up long enough to make it to RUNNING
  358. # implies STARTING -> BACKOFF
  359. self.exitstatus = None
  360. self.spawnerr = 'Exited too quickly (process log may have details)'
  361. msg = "exited: %s (%s)" % (processname, msg + "; not expected")
  362. self._assertInState(ProcessStates.STARTING)
  363. self.change_state(ProcessStates.BACKOFF)
  364. else:
  365. # this finish was not the result of a stop request, the
  366. # program was in the RUNNING state but exited implies
  367. # RUNNING -> EXITED
  368. self.delay = 0
  369. self.backoff = 0
  370. self.exitstatus = es
  371. if self.state == ProcessStates.STARTING:
  372. # XXX I dont know under which circumstances this
  373. # happens, but in the wild, there is a transition that
  374. # subverts the RUNNING state (directly from STARTING
  375. # to EXITED), so we perform the correct transition
  376. # here.
  377. self.change_state(ProcessStates.RUNNING)
  378. self._assertInState(ProcessStates.RUNNING)
  379. if exit_expected:
  380. # expected exit code
  381. msg = "exited: %s (%s)" % (processname, msg + "; expected")
  382. self.change_state(ProcessStates.EXITED, expected=True)
  383. else:
  384. # unexpected exit code
  385. self.spawnerr = 'Bad exit code %s' % es
  386. msg = "exited: %s (%s)" % (processname, msg + "; not expected")
  387. self.change_state(ProcessStates.EXITED, expected=False)
  388. self.config.options.logger.info(msg)
  389. self.pid = 0
  390. self.config.options.close_parent_pipes(self.pipes)
  391. self.pipes = {}
  392. self.dispatchers = {}
  393. # if we died before we processed the current event (only happens
  394. # if we're an event listener), notify the event system that this
  395. # event was rejected so it can be processed again.
  396. if self.event is not None:
  397. # Note: this should only be true if we were in the BUSY
  398. # state when finish() was called.
  399. events.notify(events.EventRejectedEvent(self, self.event))
  400. self.event = None
  401. def set_uid(self):
  402. if self.config.uid is None:
  403. return
  404. msg = self.config.options.dropPrivileges(self.config.uid)
  405. return msg
  406. def __cmp__(self, other):
  407. # sort by priority
  408. return cmp(self.config.priority, other.config.priority)
  409. def __repr__(self):
  410. return '<Subprocess at %s with name %s in state %s>' % (
  411. id(self),
  412. self.config.name,
  413. getProcessStateDescription(self.get_state()))
  414. def get_state(self):
  415. return self.state
  416. def transition(self):
  417. now = time.time()
  418. state = self.state
  419. logger = self.config.options.logger
  420. if self.config.options.mood > SupervisorStates.RESTARTING:
  421. # dont start any processes if supervisor is shutting down
  422. if state == ProcessStates.EXITED:
  423. if self.config.autorestart:
  424. if self.config.autorestart is RestartUnconditionally:
  425. # EXITED -> STARTING
  426. self.spawn()
  427. else: # autorestart is RestartWhenExitUnexpected
  428. if self.exitstatus not in self.config.exitcodes:
  429. # EXITED -> STARTING
  430. self.spawn()
  431. elif state == ProcessStates.STOPPED and not self.laststart:
  432. if self.config.autostart:
  433. # STOPPED -> STARTING
  434. self.spawn()
  435. elif state == ProcessStates.BACKOFF:
  436. if self.backoff <= self.config.startretries:
  437. if now > self.delay:
  438. # BACKOFF -> STARTING
  439. self.spawn()
  440. if state == ProcessStates.STARTING:
  441. if now - self.laststart > self.config.startsecs:
  442. # STARTING -> RUNNING if the proc has started
  443. # successfully and it has stayed up for at least
  444. # proc.config.startsecs,
  445. self.delay = 0
  446. self.backoff = 0
  447. self._assertInState(ProcessStates.STARTING)
  448. self.change_state(ProcessStates.RUNNING)
  449. msg = (
  450. 'entered RUNNING state, process has stayed up for '
  451. '> than %s seconds (startsecs)' % self.config.startsecs)
  452. logger.info('success: %s %s' % (self.config.name, msg))
  453. if state == ProcessStates.BACKOFF:
  454. if self.backoff > self.config.startretries:
  455. # BACKOFF -> FATAL if the proc has exceeded its number
  456. # of retries
  457. self.give_up()
  458. msg = ('entered FATAL state, too many start retries too '
  459. 'quickly')
  460. logger.info('gave up: %s %s' % (self.config.name, msg))
  461. elif state == ProcessStates.STOPPING:
  462. time_left = self.delay - now
  463. if time_left <= 0:
  464. # kill processes which are taking too long to stop with a final
  465. # sigkill. if this doesn't kill it, the process will be stuck
  466. # in the STOPPING state forever.
  467. self.config.options.logger.warn(
  468. 'killing %r (%s) with SIGKILL' % (self.config.name,
  469. self.pid))
  470. self.kill(signal.SIGKILL)
  471. class FastCGISubprocess(Subprocess):
  472. """Extends Subprocess class to handle FastCGI subprocesses"""
  473. def __init__(self, config):
  474. Subprocess.__init__(self, config)
  475. self.fcgi_sock = None
  476. def before_spawn(self):
  477. """
  478. The FastCGI socket needs to be created by the parent before we fork
  479. """
  480. if self.group is None:
  481. raise NotImplementedError('No group set for FastCGISubprocess')
  482. if not hasattr(self.group, 'socket_manager'):
  483. raise NotImplementedError('No SocketManager set for '
  484. '%s:%s' % (self.group, dir(self.group)))
  485. self.fcgi_sock = self.group.socket_manager.get_socket()
  486. def spawn(self):
  487. """
  488. Overrides Subprocess.spawn() so we can hook in before it happens
  489. """
  490. self.before_spawn()
  491. pid = Subprocess.spawn(self)
  492. if pid is None:
  493. #Remove object reference to decrement the reference count on error
  494. self.fcgi_sock = None
  495. return pid
  496. def after_finish(self):
  497. """
  498. Releases reference to FastCGI socket when process is reaped
  499. """
  500. #Remove object reference to decrement the reference count
  501. self.fcgi_sock = None
  502. def finish(self, pid, sts):
  503. """
  504. Overrides Subprocess.finish() so we can hook in after it happens
  505. """
  506. retval = Subprocess.finish(self, pid, sts)
  507. self.after_finish()
  508. return retval
  509. def _prepare_child_fds(self):
  510. """
  511. Overrides Subprocess._prepare_child_fds()
  512. The FastCGI socket needs to be set to file descriptor 0 in the child
  513. """
  514. sock_fd = self.fcgi_sock.fileno()
  515. options = self.config.options
  516. options.dup2(sock_fd, 0)
  517. options.dup2(self.pipes['child_stdout'], 1)
  518. if self.config.redirect_stderr:
  519. options.dup2(self.pipes['child_stdout'], 2)
  520. else:
  521. options.dup2(self.pipes['child_stderr'], 2)
  522. for i in range(3, options.minfds):
  523. options.close_fd(i)
  524. class ProcessGroupBase:
  525. def __init__(self, config):
  526. self.config = config
  527. self.processes = {}
  528. for pconfig in self.config.process_configs:
  529. self.processes[pconfig.name] = pconfig.make_process(self)
  530. def __cmp__(self, other):
  531. return cmp(self.config.priority, other.config.priority)
  532. def __repr__(self):
  533. return '<%s instance at %s named %s>' % (self.__class__, id(self),
  534. self.config.name)
  535. def removelogs(self):
  536. for process in self.processes.values():
  537. process.removelogs()
  538. def reopenlogs(self):
  539. for process in self.processes.values():
  540. process.reopenlogs()
  541. def stop_all(self):
  542. processes = self.processes.values()
  543. processes.sort()
  544. processes.reverse() # stop in desc priority order
  545. for proc in processes:
  546. state = proc.get_state()
  547. if state == ProcessStates.RUNNING:
  548. # RUNNING -> STOPPING
  549. proc.stop()
  550. elif state == ProcessStates.STARTING:
  551. # STARTING -> STOPPING
  552. proc.stop()
  553. elif state == ProcessStates.BACKOFF:
  554. # BACKOFF -> FATAL
  555. proc.give_up()
  556. def get_unstopped_processes(self):
  557. """ Processes which aren't in a state that is considered 'stopped' """
  558. return [ x for x in self.processes.values() if x.get_state() not in
  559. STOPPED_STATES ]
  560. def get_dispatchers(self):
  561. dispatchers = {}
  562. for process in self.processes.values():
  563. dispatchers.update(process.dispatchers)
  564. return dispatchers
  565. class ProcessGroup(ProcessGroupBase):
  566. def transition(self):
  567. for proc in self.processes.values():
  568. proc.transition()
  569. class FastCGIProcessGroup(ProcessGroup):
  570. def __init__(self, config, **kwargs):
  571. ProcessGroup.__init__(self, config)
  572. sockManagerKlass = kwargs.get('socketManager', SocketManager)
  573. self.socket_manager = sockManagerKlass(config.socket_config,
  574. logger=config.options.logger)
  575. #It's not required to call get_socket() here but we want
  576. #to fail early during start up if there is a config error
  577. try:
  578. sock = self.socket_manager.get_socket()
  579. except Exception, e:
  580. raise ValueError('Could not create FastCGI socket %s: %s' % (self.socket_manager.config(), e))
  581. class EventListenerPool(ProcessGroupBase):
  582. def __init__(self, config):
  583. ProcessGroupBase.__init__(self, config)
  584. self.event_buffer = []
  585. for event_type in self.config.pool_events:
  586. events.subscribe(event_type, self._acceptEvent)
  587. events.subscribe(events.EventRejectedEvent, self.handle_rejected)
  588. self.serial = -1
  589. self.last_dispatch = 0
  590. self.dispatch_throttle = 0 # in seconds: .00195 is an interesting one
  591. def handle_rejected(self, event):
  592. process = event.process
  593. procs = self.processes.values()
  594. if process in procs: # this is one of our processes
  595. # rebuffer the event
  596. self._acceptEvent(event.event, head=True)
  597. def transition(self):
  598. processes = self.processes.values()
  599. dispatch_capable = False
  600. for process in processes:
  601. process.transition()
  602. # this is redundant, we do it in _dispatchEvent too, but we
  603. # want to reduce function call overhead
  604. if process.state == ProcessStates.RUNNING:
  605. if process.listener_state == EventListenerStates.READY:
  606. dispatch_capable = True
  607. if dispatch_capable:
  608. if self.dispatch_throttle:
  609. now = time.time()
  610. if now - self.last_dispatch < self.dispatch_throttle:
  611. return
  612. self.dispatch()
  613. def dispatch(self):
  614. while self.event_buffer:
  615. # dispatch the oldest event
  616. event = self.event_buffer.pop(0)
  617. ok = self._dispatchEvent(event)
  618. if not ok:
  619. # if we can't dispatch an event, rebuffer it and stop trying
  620. # to process any further events in the buffer
  621. self._acceptEvent(event, head=True)
  622. break
  623. self.last_dispatch = time.time()
  624. def _acceptEvent(self, event, head=False):
  625. # events are required to be instances
  626. event_type = event.__class__
  627. if not hasattr(event, 'serial'):
  628. event.serial = new_serial(GlobalSerial)
  629. if not hasattr(event, 'pool_serials'):
  630. event.pool_serials = {}
  631. if not event.pool_serials.has_key(self.config.name):
  632. event.pool_serials[self.config.name] = new_serial(self)
  633. else:
  634. self.config.options.logger.debug(
  635. 'rebuffering event %s for pool %s (bufsize %s)' % (
  636. (event.serial, self.config.name, len(self.event_buffer))))
  637. if len(self.event_buffer) >= self.config.buffer_size:
  638. if self.event_buffer:
  639. # discard the oldest event
  640. discarded_event = self.event_buffer.pop(0)
  641. self.config.options.logger.error(
  642. 'pool %s event buffer overflowed, discarding event %s' % (
  643. (self.config.name, discarded_event.serial)))
  644. if head:
  645. self.event_buffer.insert(0, event)
  646. else:
  647. self.event_buffer.append(event)
  648. def _dispatchEvent(self, event):
  649. pool_serial = event.pool_serials[self.config.name]
  650. for process in self.processes.values():
  651. if process.state != ProcessStates.RUNNING:
  652. continue
  653. if process.listener_state == EventListenerStates.READY:
  654. payload = str(event)
  655. try:
  656. event_type = event.__class__
  657. serial = event.serial
  658. envelope = self._eventEnvelope(event_type, serial,
  659. pool_serial, payload)
  660. process.write(envelope)
  661. except OSError, why:
  662. if why[0] != errno.EPIPE:
  663. raise
  664. continue
  665. process.listener_state = EventListenerStates.BUSY
  666. process.event = event
  667. self.config.options.logger.debug(
  668. 'event %s sent to listener %s' % (
  669. event.serial, process.config.name))
  670. return True
  671. return False
  672. def _eventEnvelope(self, event_type, serial, pool_serial, payload):
  673. event_name = events.getEventNameByType(event_type)
  674. payload_len = len(payload)
  675. D = {
  676. 'ver':'3.0',
  677. 'sid':self.config.options.identifier,
  678. 'serial':serial,
  679. 'pool_name':self.config.name,
  680. 'pool_serial':pool_serial,
  681. 'event_name':event_name,
  682. 'len':payload_len,
  683. 'payload':payload,
  684. }
  685. return ('ver:%(ver)s server:%(sid)s serial:%(serial)s '
  686. 'pool:%(pool_name)s poolserial:%(pool_serial)s '
  687. 'eventname:%(event_name)s len:%(len)s\n%(payload)s' % D)
  688. class GlobalSerial:
  689. def __init__(self):
  690. self.serial = -1
  691. GlobalSerial = GlobalSerial() # singleton
  692. def new_serial(inst):
  693. if inst.serial == sys.maxint:
  694. inst.serial = -1
  695. inst.serial += 1
  696. return inst.serial