rpcinterface.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. import os
  2. import time
  3. import datetime
  4. import errno
  5. import types
  6. from supervisor.datatypes import signal_number
  7. from supervisor.options import readFile
  8. from supervisor.options import tailFile
  9. from supervisor.options import NotExecutable
  10. from supervisor.options import NotFound
  11. from supervisor.options import NoPermission
  12. from supervisor.options import make_namespec
  13. from supervisor.options import split_namespec
  14. from supervisor.options import VERSION
  15. from supervisor.events import notify
  16. from supervisor.events import RemoteCommunicationEvent
  17. from supervisor.http import NOT_DONE_YET
  18. from supervisor.xmlrpc import Faults
  19. from supervisor.xmlrpc import RPCError
  20. from supervisor.states import SupervisorStates
  21. from supervisor.states import getSupervisorStateDescription
  22. from supervisor.states import ProcessStates
  23. from supervisor.states import getProcessStateDescription
  24. from supervisor.states import (
  25. RUNNING_STATES,
  26. STOPPED_STATES,
  27. )
  28. API_VERSION = '3.0'
  29. class SupervisorNamespaceRPCInterface:
  30. def __init__(self, supervisord):
  31. self.supervisord = supervisord
  32. def _update(self, text):
  33. self.update_text = text # for unit tests, mainly
  34. if ( isinstance(self.supervisord.options.mood, int) and
  35. self.supervisord.options.mood < SupervisorStates.RUNNING ):
  36. raise RPCError(Faults.SHUTDOWN_STATE)
  37. # RPC API methods
  38. def getAPIVersion(self):
  39. """ Return the version of the RPC API used by supervisord
  40. @return string version version id
  41. """
  42. self._update('getAPIVersion')
  43. return API_VERSION
  44. getVersion = getAPIVersion # b/w compatibility with releases before 3.0
  45. def getSupervisorVersion(self):
  46. """ Return the version of the supervisor package in use by supervisord
  47. @return string version version id
  48. """
  49. self._update('getSupervisorVersion')
  50. return VERSION
  51. def getIdentification(self):
  52. """ Return identifying string of supervisord
  53. @return string identifier identifying string
  54. """
  55. self._update('getIdentification')
  56. return self.supervisord.options.identifier
  57. def getState(self):
  58. """ Return current state of supervisord as a struct
  59. @return struct A struct with keys int statecode, string statename
  60. """
  61. self._update('getState')
  62. state = self.supervisord.options.mood
  63. statename = getSupervisorStateDescription(state)
  64. data = {
  65. 'statecode':state,
  66. 'statename':statename,
  67. }
  68. return data
  69. def getPID(self):
  70. """ Return the PID of supervisord
  71. @return int PID
  72. """
  73. self._update('getPID')
  74. return self.supervisord.options.get_pid()
  75. def readLog(self, offset, length):
  76. """ Read length bytes from the main log starting at offset
  77. @param int offset offset to start reading from.
  78. @param int length number of bytes to read from the log.
  79. @return string result Bytes of log
  80. """
  81. self._update('readLog')
  82. logfile = self.supervisord.options.logfile
  83. if logfile is None or not os.path.exists(logfile):
  84. raise RPCError(Faults.NO_FILE, logfile)
  85. try:
  86. return readFile(logfile, int(offset), int(length))
  87. except ValueError, inst:
  88. why = inst.args[0]
  89. raise RPCError(getattr(Faults, why))
  90. readMainLog = readLog # b/w compatibility with releases before 2.1
  91. def clearLog(self):
  92. """ Clear the main log.
  93. @return boolean result always returns True unless error
  94. """
  95. self._update('clearLog')
  96. logfile = self.supervisord.options.logfile
  97. if logfile is None or not self.supervisord.options.exists(logfile):
  98. raise RPCError(Faults.NO_FILE)
  99. # there is a race condition here, but ignore it.
  100. try:
  101. self.supervisord.options.remove(logfile)
  102. except (OSError, IOError):
  103. raise RPCError(Faults.FAILED)
  104. for handler in self.supervisord.options.logger.handlers:
  105. if hasattr(handler, 'reopen'):
  106. self.supervisord.options.logger.info('reopening log file')
  107. handler.reopen()
  108. return True
  109. def shutdown(self):
  110. """ Shut down the supervisor process
  111. @return boolean result always returns True unless error
  112. """
  113. self._update('shutdown')
  114. self.supervisord.options.mood = SupervisorStates.SHUTDOWN
  115. return True
  116. def restart(self):
  117. """ Restart the supervisor process
  118. @return boolean result always return True unless error
  119. """
  120. self._update('restart')
  121. self.supervisord.options.mood = SupervisorStates.RESTARTING
  122. return True
  123. def reloadConfig(self):
  124. """
  125. Reload configuration
  126. @return boolean result always return True unless error
  127. """
  128. self._update('reloadConfig')
  129. try:
  130. self.supervisord.options.process_config(do_usage=False)
  131. except ValueError, msg:
  132. raise RPCError(Faults.CANT_REREAD, msg)
  133. added, changed, removed = self.supervisord.diff_to_active()
  134. added = [group.name for group in added]
  135. changed = [group.name for group in changed]
  136. removed = [group.name for group in removed]
  137. return [[added, changed, removed]] # cannot return len > 1, apparently
  138. def addProcessGroup(self, name):
  139. """ Update the config for a running process from config file.
  140. @param string name name of process group to add
  141. @return boolean result true if successful
  142. """
  143. self._update('addProcessGroup')
  144. for config in self.supervisord.options.process_group_configs:
  145. if config.name == name:
  146. result = self.supervisord.add_process_group(config)
  147. if not result:
  148. raise RPCError(Faults.ALREADY_ADDED, name)
  149. return True
  150. raise RPCError(Faults.BAD_NAME, name)
  151. def removeProcessGroup(self, name):
  152. """ Remove a stopped process from the active configuration.
  153. @param string name name of process group to remove
  154. @return boolean result Indicates whether the removal was successful
  155. """
  156. self._update('removeProcessGroup')
  157. if name not in self.supervisord.process_groups:
  158. raise RPCError(Faults.BAD_NAME, name)
  159. result = self.supervisord.remove_process_group(name)
  160. if not result:
  161. raise RPCError(Faults.STILL_RUNNING)
  162. return True
  163. def _getAllProcesses(self, lexical=False):
  164. # if lexical is true, return processes sorted in lexical order,
  165. # otherwise, sort in priority order
  166. all_processes = []
  167. if lexical:
  168. group_names = self.supervisord.process_groups.keys()
  169. group_names.sort()
  170. for group_name in group_names:
  171. group = self.supervisord.process_groups[group_name]
  172. process_names = group.processes.keys()
  173. process_names.sort()
  174. for process_name in process_names:
  175. process = group.processes[process_name]
  176. all_processes.append((group, process))
  177. else:
  178. groups = self.supervisord.process_groups.values()
  179. groups.sort() # asc by priority
  180. for group in groups:
  181. processes = group.processes.values()
  182. processes.sort() # asc by priority
  183. for process in processes:
  184. all_processes.append((group, process))
  185. return all_processes
  186. def _getGroupAndProcess(self, name):
  187. # get process to start from name
  188. group_name, process_name = split_namespec(name)
  189. group = self.supervisord.process_groups.get(group_name)
  190. if group is None:
  191. raise RPCError(Faults.BAD_NAME, name)
  192. if process_name is None:
  193. return group, None
  194. process = group.processes.get(process_name)
  195. if process is None:
  196. raise RPCError(Faults.BAD_NAME, name)
  197. return group, process
  198. def startProcess(self, name, wait=True):
  199. """ Start a process
  200. @param string name Process name (or ``group:name``, or ``group:*``)
  201. @param boolean wait Wait for process to be fully started
  202. @return boolean result Always true unless error
  203. """
  204. self._update('startProcess')
  205. group, process = self._getGroupAndProcess(name)
  206. if process is None:
  207. group_name, process_name = split_namespec(name)
  208. return self.startProcessGroup(group_name, wait)
  209. # test filespec, don't bother trying to spawn if we know it will
  210. # eventually fail
  211. try:
  212. filename, argv = process.get_execv_args()
  213. except NotFound, why:
  214. raise RPCError(Faults.NO_FILE, why.args[0])
  215. except (NotExecutable, NoPermission), why:
  216. raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
  217. if process.get_state() in RUNNING_STATES:
  218. raise RPCError(Faults.ALREADY_STARTED, name)
  219. process.spawn()
  220. # We call reap() in order to more quickly obtain the side effects of
  221. # process.finish(), which reap() eventually ends up calling. This
  222. # might be the case if the spawn() was successful but then the process
  223. # died before its startsecs elapsed or it exited with an unexpected
  224. # exit code. In particular, finish() may set spawnerr, which we can
  225. # check and immediately raise an RPCError, avoiding the need to
  226. # defer by returning a callback.
  227. self.supervisord.reap()
  228. if process.spawnerr:
  229. raise RPCError(Faults.SPAWN_ERROR, name)
  230. # We call process.transition() in order to more quickly obtain its
  231. # side effects. In particular, it might set the process' state from
  232. # STARTING->RUNNING if the process has a startsecs==0.
  233. process.transition()
  234. if wait and process.get_state() != ProcessStates.RUNNING:
  235. # by default, this branch will almost always be hit for processes
  236. # with default startsecs configurations, because the default number
  237. # of startsecs for a process is "1", and the process will not have
  238. # entered the RUNNING state yet even though we've called
  239. # transition() on it. This is because a process is not considered
  240. # RUNNING until it has stayed up > startsecs.
  241. def onwait():
  242. if process.spawnerr:
  243. raise RPCError(Faults.SPAWN_ERROR, name)
  244. state = process.get_state()
  245. if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
  246. raise RPCError(Faults.ABNORMAL_TERMINATION, name)
  247. if state == ProcessStates.RUNNING:
  248. return True
  249. return NOT_DONE_YET
  250. onwait.delay = 0.05
  251. onwait.rpcinterface = self
  252. return onwait # deferred
  253. return True
  254. def startProcessGroup(self, name, wait=True):
  255. """ Start all processes in the group named 'name'
  256. @param string name The group name
  257. @param boolean wait Wait for each process to be fully started
  258. @return array result An array of process status info structs
  259. """
  260. self._update('startProcessGroup')
  261. group = self.supervisord.process_groups.get(name)
  262. if group is None:
  263. raise RPCError(Faults.BAD_NAME, name)
  264. processes = group.processes.values()
  265. processes.sort()
  266. processes = [ (group, process) for process in processes ]
  267. startall = make_allfunc(processes, isNotRunning, self.startProcess,
  268. wait=wait)
  269. startall.delay = 0.05
  270. startall.rpcinterface = self
  271. return startall # deferred
  272. def startAllProcesses(self, wait=True):
  273. """ Start all processes listed in the configuration file
  274. @param boolean wait Wait for each process to be fully started
  275. @return array result An array of process status info structs
  276. """
  277. self._update('startAllProcesses')
  278. processes = self._getAllProcesses()
  279. startall = make_allfunc(processes, isNotRunning, self.startProcess,
  280. wait=wait)
  281. startall.delay = 0.05
  282. startall.rpcinterface = self
  283. return startall # deferred
  284. def stopProcess(self, name, wait=True):
  285. """ Stop a process named by name
  286. @param string name The name of the process to stop (or 'group:name')
  287. @param boolean wait Wait for the process to be fully stopped
  288. @return boolean result Always return True unless error
  289. """
  290. self._update('stopProcess')
  291. group, process = self._getGroupAndProcess(name)
  292. if process is None:
  293. group_name, process_name = split_namespec(name)
  294. return self.stopProcessGroup(group_name, wait)
  295. if process.get_state() not in RUNNING_STATES:
  296. raise RPCError(Faults.NOT_RUNNING)
  297. msg = process.stop()
  298. if msg is not None:
  299. raise RPCError(Faults.FAILED, msg)
  300. # We'll try to reap any killed child. FWIW, reap calls waitpid, and
  301. # then, if waitpid returns a pid, calls finish() on the process with
  302. # that pid, which drains any I/O from the process' dispatchers and
  303. # changes the process' state. I chose to call reap without once=True
  304. # because we don't really care if we reap more than one child. Even if
  305. # we only reap one child. we may not even be reaping the child that we
  306. # just stopped (this is all async, and process.stop() may not work, and
  307. # we'll need to wait for SIGKILL during process.transition() as the
  308. # result of normal select looping).
  309. self.supervisord.reap()
  310. if wait and process.get_state() not in STOPPED_STATES:
  311. def onwait():
  312. # process will eventually enter a stopped state by
  313. # virtue of the supervisord.reap() method being called
  314. # during normal operations
  315. process.stop_report()
  316. if process.get_state() not in STOPPED_STATES:
  317. return NOT_DONE_YET
  318. return True
  319. onwait.delay = 0
  320. onwait.rpcinterface = self
  321. return onwait # deferred
  322. return True
  323. def stopProcessGroup(self, name, wait=True):
  324. """ Stop all processes in the process group named 'name'
  325. @param string name The group name
  326. @param boolean wait Wait for each process to be fully stopped
  327. @return array result An array of process status info structs
  328. """
  329. self._update('stopProcessGroup')
  330. group = self.supervisord.process_groups.get(name)
  331. if group is None:
  332. raise RPCError(Faults.BAD_NAME, name)
  333. processes = group.processes.values()
  334. processes.sort()
  335. processes = [ (group, process) for process in processes ]
  336. killall = make_allfunc(processes, isRunning, self.stopProcess,
  337. wait=wait)
  338. killall.delay = 0.05
  339. killall.rpcinterface = self
  340. return killall # deferred
  341. def stopAllProcesses(self, wait=True):
  342. """ Stop all processes in the process list
  343. @param boolean wait Wait for each process to be fully stopped
  344. @return array result An array of process status info structs
  345. """
  346. self._update('stopAllProcesses')
  347. processes = self._getAllProcesses()
  348. killall = make_allfunc(processes, isRunning, self.stopProcess,
  349. wait=wait)
  350. killall.delay = 0.05
  351. killall.rpcinterface = self
  352. return killall # deferred
  353. def signalProcess(self, name, signal):
  354. """ Send an arbitrary UNIX signal to the process named by name
  355. @param string name Name of the process to signal (or 'group:name')
  356. @param string signal Signal to send, as name ('HUP') or number ('1')
  357. @return boolean
  358. """
  359. self._update('signalProcess')
  360. group, process = self._getGroupAndProcess(name)
  361. if process is None:
  362. group_name, process_name = split_namespec(name)
  363. return self.signalProcessGroup(group_name, signal=signal)
  364. try:
  365. sig = signal_number(signal)
  366. except ValueError:
  367. raise RPCError(Faults.BAD_SIGNAL, signal)
  368. if process.get_state() not in RUNNING_STATES:
  369. raise RPCError(Faults.NOT_RUNNING)
  370. msg = process.signal(sig)
  371. if not msg is None:
  372. raise RPCError(Faults.FAILED, msg)
  373. return True
  374. def signalProcessGroup(self, name, signal):
  375. """ Send a signal to all processes in the group named 'name'
  376. @param string name The group name
  377. @param string signal Signal to send, as name ('HUP') or number ('1')
  378. @return array
  379. """
  380. self._update('signalProcessGroup')
  381. group = self.supervisord.process_groups.get(name)
  382. if group is None:
  383. raise RPCError(Faults.BAD_NAME, name)
  384. processes = group.processes.values()
  385. processes.sort()
  386. processes = [(group, process) for process in processes]
  387. sendall = make_allfunc(processes, isRunning, self.signalProcess,
  388. signal=signal)
  389. result = sendall()
  390. self._update('signalProcessGroup')
  391. return result
  392. def signalAllProcesses(self, signal):
  393. """ Send a signal to all processes in the process list
  394. @param string signal Signal to send, as name ('HUP') or number ('1')
  395. @return array An array of process status info structs
  396. """
  397. processes = self._getAllProcesses()
  398. signalall = make_allfunc(processes, isRunning, self.signalProcess,
  399. signal=signal)
  400. result = signalall()
  401. self._update('signalAllProcesses')
  402. return result
  403. def getAllConfigInfo(self):
  404. """ Get info about all available process configurations. Each struct
  405. represents a single process (i.e. groups get flattened).
  406. @return array result An array of process config info structs
  407. """
  408. self._update('getAllConfigInfo')
  409. configinfo = []
  410. for gconfig in self.supervisord.options.process_group_configs:
  411. inuse = gconfig.name in self.supervisord.process_groups
  412. for pconfig in gconfig.process_configs:
  413. configinfo.append(
  414. { 'name': pconfig.name,
  415. 'group': gconfig.name,
  416. 'inuse': inuse,
  417. 'autostart': pconfig.autostart,
  418. 'group_prio': gconfig.priority,
  419. 'process_prio': pconfig.priority })
  420. configinfo.sort()
  421. return configinfo
  422. def _interpretProcessInfo(self, info):
  423. state = info['state']
  424. if state == ProcessStates.RUNNING:
  425. start = info['start']
  426. now = info['now']
  427. start_dt = datetime.datetime(*time.gmtime(start)[:6])
  428. now_dt = datetime.datetime(*time.gmtime(now)[:6])
  429. uptime = now_dt - start_dt
  430. if uptime.total_seconds() < 0: # system time set back
  431. uptime = datetime.timedelta(0)
  432. desc = 'pid %s, uptime %s' % (info['pid'], uptime)
  433. elif state in (ProcessStates.FATAL, ProcessStates.BACKOFF):
  434. desc = info['spawnerr']
  435. if not desc:
  436. desc = 'unknown error (try "tail %s")' % info['name']
  437. elif state in (ProcessStates.STOPPED, ProcessStates.EXITED):
  438. if info['start']:
  439. stop = info['stop']
  440. stop_dt = datetime.datetime(*time.localtime(stop)[:7])
  441. desc = stop_dt.strftime('%b %d %I:%M %p')
  442. else:
  443. desc = 'Not started'
  444. else:
  445. desc = ''
  446. return desc
  447. def getProcessInfo(self, name):
  448. """ Get info about a process named name
  449. @param string name The name of the process (or 'group:name')
  450. @return struct result A structure containing data about the process
  451. """
  452. self._update('getProcessInfo')
  453. group, process = self._getGroupAndProcess(name)
  454. if process is None:
  455. raise RPCError(Faults.BAD_NAME, name)
  456. start = int(process.laststart)
  457. stop = int(process.laststop)
  458. now = int(time.time())
  459. state = process.get_state()
  460. spawnerr = process.spawnerr or ''
  461. exitstatus = process.exitstatus or 0
  462. stdout_logfile = process.config.stdout_logfile or ''
  463. stderr_logfile = process.config.stderr_logfile or ''
  464. info = {
  465. 'name':process.config.name,
  466. 'group':group.config.name,
  467. 'start':start,
  468. 'stop':stop,
  469. 'now':now,
  470. 'state':state,
  471. 'statename':getProcessStateDescription(state),
  472. 'spawnerr':spawnerr,
  473. 'exitstatus':exitstatus,
  474. 'logfile':stdout_logfile, # b/c alias
  475. 'stdout_logfile':stdout_logfile,
  476. 'stderr_logfile':stderr_logfile,
  477. 'pid':process.pid,
  478. }
  479. description = self._interpretProcessInfo(info)
  480. info['description'] = description
  481. return info
  482. def getAllProcessInfo(self):
  483. """ Get info about all processes
  484. @return array result An array of process status results
  485. """
  486. self._update('getAllProcessInfo')
  487. all_processes = self._getAllProcesses(lexical=True)
  488. output = []
  489. for group, process in all_processes:
  490. name = make_namespec(group.config.name, process.config.name)
  491. output.append(self.getProcessInfo(name))
  492. return output
  493. def _readProcessLog(self, name, offset, length, channel):
  494. group, process = self._getGroupAndProcess(name)
  495. if process is None:
  496. raise RPCError(Faults.BAD_NAME, name)
  497. logfile = getattr(process.config, '%s_logfile' % channel)
  498. if logfile is None or not os.path.exists(logfile):
  499. raise RPCError(Faults.NO_FILE, logfile)
  500. try:
  501. return readFile(logfile, int(offset), int(length))
  502. except ValueError, inst:
  503. why = inst.args[0]
  504. raise RPCError(getattr(Faults, why))
  505. def readProcessStdoutLog(self, name, offset, length):
  506. """ Read length bytes from name's stdout log starting at offset
  507. @param string name the name of the process (or 'group:name')
  508. @param int offset offset to start reading from.
  509. @param int length number of bytes to read from the log.
  510. @return string result Bytes of log
  511. """
  512. self._update('readProcessStdoutLog')
  513. return self._readProcessLog(name, offset, length, 'stdout')
  514. readProcessLog = readProcessStdoutLog # b/c alias
  515. def readProcessStderrLog(self, name, offset, length):
  516. """ Read length bytes from name's stderr log starting at offset
  517. @param string name the name of the process (or 'group:name')
  518. @param int offset offset to start reading from.
  519. @param int length number of bytes to read from the log.
  520. @return string result Bytes of log
  521. """
  522. self._update('readProcessStderrLog')
  523. return self._readProcessLog(name, offset, length, 'stderr')
  524. def _tailProcessLog(self, name, offset, length, channel):
  525. group, process = self._getGroupAndProcess(name)
  526. if process is None:
  527. raise RPCError(Faults.BAD_NAME, name)
  528. logfile = getattr(process.config, '%s_logfile' % channel)
  529. if logfile is None or not os.path.exists(logfile):
  530. return ['', 0, False]
  531. return tailFile(logfile, int(offset), int(length))
  532. def tailProcessStdoutLog(self, name, offset, length):
  533. """
  534. Provides a more efficient way to tail the (stdout) log than
  535. readProcessStdoutLog(). Use readProcessStdoutLog() to read
  536. chunks and tailProcessStdoutLog() to tail.
  537. Requests (length) bytes from the (name)'s log, starting at
  538. (offset). If the total log size is greater than (offset +
  539. length), the overflow flag is set and the (offset) is
  540. automatically increased to position the buffer at the end of
  541. the log. If less than (length) bytes are available, the
  542. maximum number of available bytes will be returned. (offset)
  543. returned is always the last offset in the log +1.
  544. @param string name the name of the process (or 'group:name')
  545. @param int offset offset to start reading from
  546. @param int length maximum number of bytes to return
  547. @return array result [string bytes, int offset, bool overflow]
  548. """
  549. self._update('tailProcessStdoutLog')
  550. return self._tailProcessLog(name, offset, length, 'stdout')
  551. tailProcessLog = tailProcessStdoutLog # b/c alias
  552. def tailProcessStderrLog(self, name, offset, length):
  553. """
  554. Provides a more efficient way to tail the (stderr) log than
  555. readProcessStderrLog(). Use readProcessStderrLog() to read
  556. chunks and tailProcessStderrLog() to tail.
  557. Requests (length) bytes from the (name)'s log, starting at
  558. (offset). If the total log size is greater than (offset +
  559. length), the overflow flag is set and the (offset) is
  560. automatically increased to position the buffer at the end of
  561. the log. If less than (length) bytes are available, the
  562. maximum number of available bytes will be returned. (offset)
  563. returned is always the last offset in the log +1.
  564. @param string name the name of the process (or 'group:name')
  565. @param int offset offset to start reading from
  566. @param int length maximum number of bytes to return
  567. @return array result [string bytes, int offset, bool overflow]
  568. """
  569. self._update('tailProcessStderrLog')
  570. return self._tailProcessLog(name, offset, length, 'stderr')
  571. def clearProcessLogs(self, name):
  572. """ Clear the stdout and stderr logs for the named process and
  573. reopen them.
  574. @param string name The name of the process (or 'group:name')
  575. @return boolean result Always True unless error
  576. """
  577. self._update('clearProcessLogs')
  578. group, process = self._getGroupAndProcess(name)
  579. if process is None:
  580. raise RPCError(Faults.BAD_NAME, name)
  581. try:
  582. # implies a reopen
  583. process.removelogs()
  584. except (IOError, OSError):
  585. raise RPCError(Faults.FAILED, name)
  586. return True
  587. clearProcessLog = clearProcessLogs # b/c alias
  588. def clearAllProcessLogs(self):
  589. """ Clear all process log files
  590. @return array result An array of process status info structs
  591. """
  592. self._update('clearAllProcessLogs')
  593. results = []
  594. callbacks = []
  595. all_processes = self._getAllProcesses()
  596. for group, process in all_processes:
  597. callbacks.append((group, process, self.clearProcessLog))
  598. def clearall():
  599. if not callbacks:
  600. return results
  601. group, process, callback = callbacks.pop(0)
  602. name = make_namespec(group.config.name, process.config.name)
  603. try:
  604. callback(name)
  605. except RPCError, e:
  606. results.append(
  607. {'name':process.config.name,
  608. 'group':group.config.name,
  609. 'status':e.code,
  610. 'description':e.text})
  611. else:
  612. results.append(
  613. {'name':process.config.name,
  614. 'group':group.config.name,
  615. 'status':Faults.SUCCESS,
  616. 'description':'OK'}
  617. )
  618. if callbacks:
  619. return NOT_DONE_YET
  620. return results
  621. clearall.delay = 0.05
  622. clearall.rpcinterface = self
  623. return clearall # deferred
  624. def sendProcessStdin(self, name, chars):
  625. """ Send a string of chars to the stdin of the process name.
  626. If non-7-bit data is sent (unicode), it is encoded to utf-8
  627. before being sent to the process' stdin. If chars is not a
  628. string or is not unicode, raise INCORRECT_PARAMETERS. If the
  629. process is not running, raise NOT_RUNNING. If the process'
  630. stdin cannot accept input (e.g. it was closed by the child
  631. process), raise NO_FILE.
  632. @param string name The process name to send to (or 'group:name')
  633. @param string chars The character data to send to the process
  634. @return boolean result Always return True unless error
  635. """
  636. self._update('sendProcessStdin')
  637. if isinstance(chars, unicode):
  638. chars = chars.encode('utf-8')
  639. if not isinstance(chars, basestring):
  640. raise RPCError(Faults.INCORRECT_PARAMETERS, chars)
  641. group, process = self._getGroupAndProcess(name)
  642. if process is None:
  643. raise RPCError(Faults.BAD_NAME, name)
  644. if not process.pid or process.killing:
  645. raise RPCError(Faults.NOT_RUNNING, name)
  646. try:
  647. process.write(chars)
  648. except OSError, why:
  649. if why.args[0] == errno.EPIPE:
  650. raise RPCError(Faults.NO_FILE, name)
  651. else:
  652. raise
  653. return True
  654. def sendRemoteCommEvent(self, type, data):
  655. """ Send an event that will be received by event listener
  656. subprocesses subscribing to the RemoteCommunicationEvent.
  657. @param string type String for the "type" key in the event header
  658. @param string data Data for the event body
  659. @return boolean Always return True unless error
  660. """
  661. if isinstance(type, unicode):
  662. type = type.encode('utf-8')
  663. if isinstance(data, unicode):
  664. data = data.encode('utf-8')
  665. notify(
  666. RemoteCommunicationEvent(type, data)
  667. )
  668. return True
  669. def make_allfunc(processes, predicate, func, **extra_kwargs):
  670. """ Return a closure representing a function that calls a
  671. function for every process, and returns a result """
  672. callbacks = []
  673. results = []
  674. def allfunc(
  675. processes=processes,
  676. predicate=predicate,
  677. func=func,
  678. extra_kwargs=extra_kwargs,
  679. callbacks=callbacks, # used only to fool scoping, never passed by caller
  680. results=results, # used only to fool scoping, never passed by caller
  681. ):
  682. if not callbacks:
  683. for group, process in processes:
  684. name = make_namespec(group.config.name, process.config.name)
  685. if predicate(process):
  686. try:
  687. callback = func(name, **extra_kwargs)
  688. except RPCError, e:
  689. results.append({'name':process.config.name,
  690. 'group':group.config.name,
  691. 'status':e.code,
  692. 'description':e.text})
  693. continue
  694. if isinstance(callback, types.FunctionType):
  695. callbacks.append((group, process, callback))
  696. else:
  697. results.append(
  698. {'name':process.config.name,
  699. 'group':group.config.name,
  700. 'status':Faults.SUCCESS,
  701. 'description':'OK'}
  702. )
  703. if not callbacks:
  704. return results
  705. for struct in callbacks[:]:
  706. group, process, cb = struct
  707. try:
  708. value = cb()
  709. except RPCError, e:
  710. results.append(
  711. {'name':process.config.name,
  712. 'group':group.config.name,
  713. 'status':e.code,
  714. 'description':e.text})
  715. callbacks.remove(struct)
  716. else:
  717. if value is not NOT_DONE_YET:
  718. results.append(
  719. {'name':process.config.name,
  720. 'group':group.config.name,
  721. 'status':Faults.SUCCESS,
  722. 'description':'OK'}
  723. )
  724. callbacks.remove(struct)
  725. if callbacks:
  726. return NOT_DONE_YET
  727. return results
  728. # XXX the above implementation has a weakness inasmuch as the
  729. # first call into each individual process callback will always
  730. # return NOT_DONE_YET, so they need to be called twice. The
  731. # symptom of this is that calling this method causes the
  732. # client to block for much longer than it actually requires to
  733. # kill all of the running processes. After the first call to
  734. # the killit callback, the process is actually dead, but the
  735. # above killall method processes the callbacks one at a time
  736. # during the select loop, which, because there is no output
  737. # from child processes after e.g. stopAllProcesses is called,
  738. # is not busy, so hits the timeout for each callback. I
  739. # attempted to make this better, but the only way to make it
  740. # better assumes totally synchronous reaping of child
  741. # processes, which requires infrastructure changes to
  742. # supervisord that are scary at the moment as it could take a
  743. # while to pin down all of the platform differences and might
  744. # require a C extension to the Python signal module to allow
  745. # the setting of ignore flags to signals.
  746. return allfunc
  747. def isRunning(process):
  748. if process.get_state() in RUNNING_STATES:
  749. return True
  750. def isNotRunning(process):
  751. return not isRunning(process)
  752. # this is not used in code but referenced via an entry point in the conf file
  753. def make_main_rpcinterface(supervisord):
  754. return SupervisorNamespaceRPCInterface(supervisord)