dispatchers.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import warnings
  2. import errno
  3. from supervisor.medusa.asyncore_25 import compact_traceback
  4. from supervisor.events import notify
  5. from supervisor.events import EventRejectedEvent
  6. from supervisor.events import ProcessLogStderrEvent
  7. from supervisor.events import ProcessLogStdoutEvent
  8. from supervisor.states import EventListenerStates
  9. from supervisor import loggers
  10. def find_prefix_at_end(haystack, needle):
  11. l = len(needle) - 1
  12. while l and not haystack.endswith(needle[:l]):
  13. l -= 1
  14. return l
  15. class PDispatcher:
  16. """ Asyncore dispatcher for mainloop, representing a process channel
  17. (stdin, stdout, or stderr). This class is abstract. """
  18. closed = False # True if close() has been called
  19. def __repr__(self):
  20. return '<%s at %s for %s (%s)>' % (self.__class__.__name__,
  21. id(self),
  22. self.process,
  23. self.channel)
  24. def readable(self):
  25. raise NotImplementedError
  26. def writable(self):
  27. raise NotImplementedError
  28. def handle_read_event(self):
  29. raise NotImplementedError
  30. def handle_write_event(self):
  31. raise NotImplementedError
  32. def handle_error(self):
  33. nil, t, v, tbinfo = compact_traceback()
  34. self.process.config.options.logger.critical(
  35. 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
  36. repr(self),
  37. t,
  38. v,
  39. tbinfo
  40. )
  41. )
  42. self.close()
  43. def close(self):
  44. if not self.closed:
  45. self.process.config.options.logger.debug(
  46. 'fd %s closed, stopped monitoring %s' % (self.fd, self))
  47. self.closed = True
  48. def flush(self):
  49. pass
  50. class POutputDispatcher(PDispatcher):
  51. """
  52. A Process Output (stdout/stderr) dispatcher. Serves several purposes:
  53. - capture output sent within <!--XSUPERVISOR:BEGIN--> and
  54. <!--XSUPERVISOR:END--> tags and signal a ProcessCommunicationEvent
  55. by calling notify(event).
  56. - route the output to the appropriate log handlers as specified in the
  57. config.
  58. """
  59. process = None # process which "owns" this dispatcher
  60. channel = None # 'stderr' or 'stdout'
  61. capturemode = False # are we capturing process event data
  62. mainlog = None # the process' "normal" logger
  63. capturelog = None # the logger while we're in capturemode
  64. childlog = None # the current logger (event or main)
  65. output_buffer = '' # data waiting to be logged
  66. def __init__(self, process, event_type, fd):
  67. """
  68. Initialize the dispatcher.
  69. `event_type` should be one of ProcessLogStdoutEvent or
  70. ProcessLogStderrEvent
  71. """
  72. self.process = process
  73. self.event_type = event_type
  74. self.fd = fd
  75. self.channel = channel = self.event_type.channel
  76. self._setup_logging(process.config, channel)
  77. capture_maxbytes = getattr(process.config,
  78. '%s_capture_maxbytes' % channel)
  79. if capture_maxbytes:
  80. self.capturelog = loggers.handle_boundIO(
  81. self.process.config.options.getLogger(),
  82. fmt='%(message)s',
  83. maxbytes=capture_maxbytes,
  84. )
  85. self.childlog = self.mainlog
  86. # all code below is purely for minor speedups
  87. begintoken = self.event_type.BEGIN_TOKEN
  88. endtoken = self.event_type.END_TOKEN
  89. self.begintoken_data = (begintoken, len(begintoken))
  90. self.endtoken_data = (endtoken, len(endtoken))
  91. self.mainlog_level = loggers.LevelsByName.DEBG
  92. config = self.process.config
  93. self.log_to_mainlog = config.options.loglevel <= self.mainlog_level
  94. self.stdout_events_enabled = config.stdout_events_enabled
  95. self.stderr_events_enabled = config.stderr_events_enabled
  96. def _setup_logging(self, config, channel):
  97. """
  98. Configure the main log according to the process' configuration and
  99. channel. Sets `mainlog` on self. Returns nothing.
  100. """
  101. logfile = getattr(config, '%s_logfile' % channel)
  102. if not logfile:
  103. return
  104. maxbytes = getattr(config, '%s_logfile_maxbytes' % channel)
  105. backups = getattr(config, '%s_logfile_backups' % channel)
  106. fmt = '%(message)s'
  107. if logfile == 'syslog':
  108. warnings.warn("Specifying 'syslog' for filename is deprecated. "
  109. "Use %s_syslog instead." % channel, DeprecationWarning)
  110. fmt = ' '.join((config.name, fmt))
  111. self.mainlog = loggers.handle_file(
  112. config.options.getLogger(),
  113. filename=logfile,
  114. fmt=fmt,
  115. rotating=not not maxbytes, # optimization
  116. maxbytes=maxbytes,
  117. backups=backups)
  118. if getattr(config, '%s_syslog' % channel, False):
  119. fmt = config.name + ' %(message)s'
  120. loggers.handle_syslog(self.mainlog, fmt)
  121. def removelogs(self):
  122. for log in (self.mainlog, self.capturelog):
  123. if log is not None:
  124. for handler in log.handlers:
  125. handler.remove()
  126. handler.reopen()
  127. def reopenlogs(self):
  128. for log in (self.mainlog, self.capturelog):
  129. if log is not None:
  130. for handler in log.handlers:
  131. handler.reopen()
  132. def _log(self, data):
  133. if data:
  134. config = self.process.config
  135. if config.options.strip_ansi:
  136. data = stripEscapes(data)
  137. if self.childlog:
  138. self.childlog.info(data)
  139. if self.log_to_mainlog:
  140. msg = '%(name)r %(channel)s output:\n%(data)s'
  141. config.options.logger.log(
  142. self.mainlog_level, msg, name=config.name,
  143. channel=self.channel, data=data)
  144. if self.channel == 'stdout':
  145. if self.stdout_events_enabled:
  146. notify(
  147. ProcessLogStdoutEvent(self.process,
  148. self.process.pid, data)
  149. )
  150. else: # channel == stderr
  151. if self.stderr_events_enabled:
  152. notify(
  153. ProcessLogStderrEvent(self.process,
  154. self.process.pid, data)
  155. )
  156. def record_output(self):
  157. if self.capturelog is None:
  158. # shortcut trying to find capture data
  159. data = self.output_buffer
  160. self.output_buffer = ''
  161. self._log(data)
  162. return
  163. if self.capturemode:
  164. token, tokenlen = self.endtoken_data
  165. else:
  166. token, tokenlen = self.begintoken_data
  167. if len(self.output_buffer) <= tokenlen:
  168. return # not enough data
  169. data = self.output_buffer
  170. self.output_buffer = ''
  171. try:
  172. before, after = data.split(token, 1)
  173. except ValueError:
  174. after = None
  175. index = find_prefix_at_end(data, token)
  176. if index:
  177. self.output_buffer = self.output_buffer + data[-index:]
  178. data = data[:-index]
  179. self._log(data)
  180. else:
  181. self._log(before)
  182. self.toggle_capturemode()
  183. self.output_buffer = after
  184. if after:
  185. self.record_output()
  186. def toggle_capturemode(self):
  187. self.capturemode = not self.capturemode
  188. if self.capturelog is not None:
  189. if self.capturemode:
  190. self.childlog = self.capturelog
  191. else:
  192. for handler in self.capturelog.handlers:
  193. handler.flush()
  194. data = self.capturelog.getvalue()
  195. channel = self.channel
  196. procname = self.process.config.name
  197. event = self.event_type(self.process, self.process.pid, data)
  198. notify(event)
  199. msg = "%(procname)r %(channel)s emitted a comm event"
  200. self.process.config.options.logger.debug(msg,
  201. procname=procname,
  202. channel=channel)
  203. for handler in self.capturelog.handlers:
  204. handler.remove()
  205. handler.reopen()
  206. self.childlog = self.mainlog
  207. def writable(self):
  208. return False
  209. def readable(self):
  210. if self.closed:
  211. return False
  212. return True
  213. def handle_read_event(self):
  214. data = self.process.config.options.readfd(self.fd)
  215. self.output_buffer += data
  216. self.record_output()
  217. if not data:
  218. # if we get no data back from the pipe, it means that the
  219. # child process has ended. See
  220. # mail.python.org/pipermail/python-dev/2004-August/046850.html
  221. self.close()
  222. class PEventListenerDispatcher(PDispatcher):
  223. """ An output dispatcher that monitors and changes a process'
  224. listener_state """
  225. process = None # process which "owns" this dispatcher
  226. channel = None # 'stderr' or 'stdout'
  227. childlog = None # the logger
  228. state_buffer = '' # data waiting to be reviewed for state changes
  229. READY_FOR_EVENTS_TOKEN = 'READY\n'
  230. RESULT_TOKEN_START = 'RESULT '
  231. READY_FOR_EVENTS_LEN = len(READY_FOR_EVENTS_TOKEN)
  232. RESULT_TOKEN_START_LEN = len(RESULT_TOKEN_START)
  233. def __init__(self, process, channel, fd):
  234. self.process = process
  235. # the initial state of our listener is ACKNOWLEDGED; this is a
  236. # "busy" state that implies we're awaiting a READY_FOR_EVENTS_TOKEN
  237. self.process.listener_state = EventListenerStates.ACKNOWLEDGED
  238. self.process.event = None
  239. self.result = ''
  240. self.resultlen = None
  241. self.channel = channel
  242. self.fd = fd
  243. logfile = getattr(process.config, '%s_logfile' % channel)
  244. if logfile:
  245. maxbytes = getattr(process.config, '%s_logfile_maxbytes' % channel)
  246. backups = getattr(process.config, '%s_logfile_backups' % channel)
  247. self.childlog = loggers.handle_file(
  248. process.config.options.getLogger(),
  249. logfile,
  250. '%(message)s',
  251. rotating=not not maxbytes, # optimization
  252. maxbytes=maxbytes,
  253. backups=backups,
  254. )
  255. def removelogs(self):
  256. if self.childlog is not None:
  257. for handler in self.childlog.handlers:
  258. handler.remove()
  259. handler.reopen()
  260. def reopenlogs(self):
  261. if self.childlog is not None:
  262. for handler in self.childlog.handlers:
  263. handler.reopen()
  264. def writable(self):
  265. return False
  266. def readable(self):
  267. if self.closed:
  268. return False
  269. return True
  270. def handle_read_event(self):
  271. data = self.process.config.options.readfd(self.fd)
  272. if data:
  273. self.state_buffer += data
  274. procname = self.process.config.name
  275. msg = '%r %s output:\n%s' % (procname, self.channel, data)
  276. self.process.config.options.logger.debug(msg)
  277. if self.childlog:
  278. if self.process.config.options.strip_ansi:
  279. data = stripEscapes(data)
  280. self.childlog.info(data)
  281. else:
  282. # if we get no data back from the pipe, it means that the
  283. # child process has ended. See
  284. # mail.python.org/pipermail/python-dev/2004-August/046850.html
  285. self.close()
  286. self.handle_listener_state_change()
  287. def handle_listener_state_change(self):
  288. data = self.state_buffer
  289. if not data:
  290. return
  291. process = self.process
  292. procname = process.config.name
  293. state = process.listener_state
  294. if state == EventListenerStates.UNKNOWN:
  295. # this is a fatal state
  296. self.state_buffer = ''
  297. return
  298. if state == EventListenerStates.ACKNOWLEDGED:
  299. if len(data) < self.READY_FOR_EVENTS_LEN:
  300. # not enough info to make a decision
  301. return
  302. elif data.startswith(self.READY_FOR_EVENTS_TOKEN):
  303. msg = '%s: ACKNOWLEDGED -> READY' % procname
  304. process.config.options.logger.debug(msg)
  305. process.listener_state = EventListenerStates.READY
  306. tokenlen = self.READY_FOR_EVENTS_LEN
  307. self.state_buffer = self.state_buffer[tokenlen:]
  308. process.event = None
  309. else:
  310. msg = '%s: ACKNOWLEDGED -> UNKNOWN' % procname
  311. process.config.options.logger.debug(msg)
  312. process.listener_state = EventListenerStates.UNKNOWN
  313. self.state_buffer = ''
  314. process.event = None
  315. if self.state_buffer:
  316. # keep going til its too short
  317. self.handle_listener_state_change()
  318. else:
  319. return
  320. elif state == EventListenerStates.READY:
  321. # the process sent some spurious data, be a hardass about it
  322. msg = '%s: READY -> UNKNOWN' % procname
  323. process.config.options.logger.debug(msg)
  324. process.listener_state = EventListenerStates.UNKNOWN
  325. self.state_buffer = ''
  326. process.event = None
  327. return
  328. elif state == EventListenerStates.BUSY:
  329. if self.resultlen is None:
  330. # we haven't begun gathering result data yet
  331. pos = data.find('\n')
  332. if pos == -1:
  333. # we can't make a determination yet, we dont have a full
  334. # results line
  335. return
  336. result_line = self.state_buffer[:pos]
  337. self.state_buffer = self.state_buffer[pos+1:] # rid LF
  338. resultlen = result_line[self.RESULT_TOKEN_START_LEN:]
  339. try:
  340. self.resultlen = int(resultlen)
  341. except ValueError:
  342. msg = ('%s: BUSY -> UNKNOWN (bad result line %r)'
  343. % (procname, result_line))
  344. process.config.options.logger.debug(msg)
  345. process.listener_state = EventListenerStates.UNKNOWN
  346. self.state_buffer = ''
  347. notify(EventRejectedEvent(process, process.event))
  348. process.event = None
  349. return
  350. else:
  351. needed = self.resultlen - len(self.result)
  352. if needed:
  353. self.result += self.state_buffer[:needed]
  354. self.state_buffer = self.state_buffer[needed:]
  355. needed = self.resultlen - len(self.result)
  356. if not needed:
  357. self.handle_result(self.result)
  358. self.process.event = None
  359. self.result = ''
  360. self.resultlen = None
  361. if self.state_buffer:
  362. # keep going til its too short
  363. self.handle_listener_state_change()
  364. else:
  365. return
  366. def handle_result(self, result):
  367. process = self.process
  368. procname = process.config.name
  369. try:
  370. self.process.group.config.result_handler(process.event, result)
  371. msg = '%s: BUSY -> ACKNOWLEDGED (processed)' % procname
  372. process.listener_state = EventListenerStates.ACKNOWLEDGED
  373. except RejectEvent:
  374. msg = '%s: BUSY -> ACKNOWLEDGED (rejected)' % procname
  375. process.listener_state = EventListenerStates.ACKNOWLEDGED
  376. notify(EventRejectedEvent(process, process.event))
  377. except:
  378. msg = '%s: BUSY -> UNKNOWN' % procname
  379. process.listener_state = EventListenerStates.UNKNOWN
  380. notify(EventRejectedEvent(process, process.event))
  381. process.config.options.logger.debug(msg)
  382. class PInputDispatcher(PDispatcher):
  383. """ Input (stdin) dispatcher """
  384. process = None # process which "owns" this dispatcher
  385. channel = None # 'stdin'
  386. input_buffer = '' # data waiting to be sent to the child process
  387. def __init__(self, process, channel, fd):
  388. self.process = process
  389. self.channel = channel
  390. self.fd = fd
  391. self.input_buffer = ''
  392. def writable(self):
  393. if self.input_buffer and not self.closed:
  394. return True
  395. return False
  396. def readable(self):
  397. return False
  398. def flush(self):
  399. # other code depends on this raising EPIPE if the pipe is closed
  400. sent = self.process.config.options.write(self.fd,
  401. self.input_buffer)
  402. self.input_buffer = self.input_buffer[sent:]
  403. def handle_write_event(self):
  404. if self.input_buffer:
  405. try:
  406. self.flush()
  407. except OSError, why:
  408. if why.args[0] == errno.EPIPE:
  409. self.input_buffer = ''
  410. self.close()
  411. else:
  412. raise
  413. ANSI_ESCAPE_BEGIN = '\x1b['
  414. ANSI_TERMINATORS = ('H', 'f', 'A', 'B', 'C', 'D', 'R', 's', 'u', 'J',
  415. 'K', 'h', 'l', 'p', 'm')
  416. def stripEscapes(s):
  417. """
  418. Remove all ANSI color escapes from the given string.
  419. """
  420. result = ''
  421. show = 1
  422. i = 0
  423. L = len(s)
  424. while i < L:
  425. if show == 0 and s[i] in ANSI_TERMINATORS:
  426. show = 1
  427. elif show:
  428. n = s.find(ANSI_ESCAPE_BEGIN, i)
  429. if n == -1:
  430. return result + s[i:]
  431. else:
  432. result = result + s[i:n]
  433. i = n
  434. show = 0
  435. i = i + 1
  436. return result
  437. class RejectEvent(Exception):
  438. """ The exception type expected by a dispatcher when a handler wants
  439. to reject an event """
  440. def default_handler(event, response):
  441. if response != 'OK':
  442. raise RejectEvent(response)