rpcinterface.py 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001
  1. import os
  2. import time
  3. import datetime
  4. import errno
  5. import types
  6. from supervisor.compat import as_string
  7. from supervisor.compat import as_bytes
  8. from supervisor.compat import unicode
  9. from supervisor.datatypes import signal_number
  10. from supervisor.options import readFile
  11. from supervisor.options import tailFile
  12. from supervisor.options import NotExecutable
  13. from supervisor.options import NotFound
  14. from supervisor.options import NoPermission
  15. from supervisor.options import make_namespec
  16. from supervisor.options import split_namespec
  17. from supervisor.options import VERSION
  18. from supervisor.events import notify
  19. from supervisor.events import RemoteCommunicationEvent
  20. from supervisor.http import NOT_DONE_YET
  21. from supervisor.xmlrpc import Faults
  22. from supervisor.xmlrpc import RPCError
  23. from supervisor.states import SupervisorStates
  24. from supervisor.states import getSupervisorStateDescription
  25. from supervisor.states import ProcessStates
  26. from supervisor.states import getProcessStateDescription
  27. from supervisor.states import (
  28. RUNNING_STATES,
  29. STOPPED_STATES,
  30. )
  31. API_VERSION = '3.0'
  32. class SupervisorNamespaceRPCInterface:
  33. def __init__(self, supervisord):
  34. self.supervisord = supervisord
  35. def _update(self, text):
  36. self.update_text = text # for unit tests, mainly
  37. if ( isinstance(self.supervisord.options.mood, int) and
  38. self.supervisord.options.mood < SupervisorStates.RUNNING ):
  39. raise RPCError(Faults.SHUTDOWN_STATE)
  40. # RPC API methods
  41. def getAPIVersion(self):
  42. """ Return the version of the RPC API used by supervisord
  43. @return string version version id
  44. """
  45. self._update('getAPIVersion')
  46. return API_VERSION
  47. getVersion = getAPIVersion # b/w compatibility with releases before 3.0
  48. def getSupervisorVersion(self):
  49. """ Return the version of the supervisor package in use by supervisord
  50. @return string version version id
  51. """
  52. self._update('getSupervisorVersion')
  53. return VERSION
  54. def getIdentification(self):
  55. """ Return identifying string of supervisord
  56. @return string identifier identifying string
  57. """
  58. self._update('getIdentification')
  59. return self.supervisord.options.identifier
  60. def getState(self):
  61. """ Return current state of supervisord as a struct
  62. @return struct A struct with keys int statecode, string statename
  63. """
  64. self._update('getState')
  65. state = self.supervisord.options.mood
  66. statename = getSupervisorStateDescription(state)
  67. data = {
  68. 'statecode':state,
  69. 'statename':statename,
  70. }
  71. return data
  72. def getPID(self):
  73. """ Return the PID of supervisord
  74. @return int PID
  75. """
  76. self._update('getPID')
  77. return self.supervisord.options.get_pid()
  78. def readLog(self, offset, length):
  79. """ Read length bytes from the main log starting at offset
  80. @param int offset offset to start reading from.
  81. @param int length number of bytes to read from the log.
  82. @return string result Bytes of log
  83. """
  84. self._update('readLog')
  85. logfile = self.supervisord.options.logfile
  86. if logfile is None or not os.path.exists(logfile):
  87. raise RPCError(Faults.NO_FILE, logfile)
  88. try:
  89. return as_string(readFile(logfile, int(offset), int(length)))
  90. except ValueError as inst:
  91. why = inst.args[0]
  92. raise RPCError(getattr(Faults, why))
  93. readMainLog = readLog # b/w compatibility with releases before 2.1
  94. def clearLog(self):
  95. """ Clear the main log.
  96. @return boolean result always returns True unless error
  97. """
  98. self._update('clearLog')
  99. logfile = self.supervisord.options.logfile
  100. if logfile is None or not self.supervisord.options.exists(logfile):
  101. raise RPCError(Faults.NO_FILE)
  102. # there is a race condition here, but ignore it.
  103. try:
  104. self.supervisord.options.remove(logfile)
  105. except (OSError, IOError):
  106. raise RPCError(Faults.FAILED)
  107. for handler in self.supervisord.options.logger.handlers:
  108. if hasattr(handler, 'reopen'):
  109. self.supervisord.options.logger.info('reopening log file')
  110. handler.reopen()
  111. return True
  112. def shutdown(self):
  113. """ Shut down the supervisor process
  114. @return boolean result always returns True unless error
  115. """
  116. self._update('shutdown')
  117. self.supervisord.options.mood = SupervisorStates.SHUTDOWN
  118. return True
  119. def restart(self):
  120. """ Restart the supervisor process
  121. @return boolean result always return True unless error
  122. """
  123. self._update('restart')
  124. self.supervisord.options.mood = SupervisorStates.RESTARTING
  125. return True
  126. def reloadConfig(self):
  127. """
  128. Reload configuration
  129. @return boolean result always return True unless error
  130. """
  131. self._update('reloadConfig')
  132. try:
  133. self.supervisord.options.process_config(do_usage=False)
  134. except ValueError as msg:
  135. raise RPCError(Faults.CANT_REREAD, msg)
  136. added, changed, removed = self.supervisord.diff_to_active()
  137. added = [group.name for group in added]
  138. changed = [group.name for group in changed]
  139. removed = [group.name for group in removed]
  140. return [[added, changed, removed]] # cannot return len > 1, apparently
  141. def addProcessGroup(self, name):
  142. """ Update the config for a running process from config file.
  143. @param string name name of process group to add
  144. @return boolean result true if successful
  145. """
  146. self._update('addProcessGroup')
  147. for config in self.supervisord.options.process_group_configs:
  148. if config.name == name:
  149. result = self.supervisord.add_process_group(config)
  150. if not result:
  151. raise RPCError(Faults.ALREADY_ADDED, name)
  152. return True
  153. raise RPCError(Faults.BAD_NAME, name)
  154. def removeProcessGroup(self, name):
  155. """ Remove a stopped process from the active configuration.
  156. @param string name name of process group to remove
  157. @return boolean result Indicates whether the removal was successful
  158. """
  159. self._update('removeProcessGroup')
  160. if name not in self.supervisord.process_groups:
  161. raise RPCError(Faults.BAD_NAME, name)
  162. result = self.supervisord.remove_process_group(name)
  163. if not result:
  164. raise RPCError(Faults.STILL_RUNNING, name)
  165. return True
  166. def _getAllProcesses(self, lexical=False):
  167. # if lexical is true, return processes sorted in lexical order,
  168. # otherwise, sort in priority order
  169. all_processes = []
  170. if lexical:
  171. group_names = list(self.supervisord.process_groups.keys())
  172. group_names.sort()
  173. for group_name in group_names:
  174. group = self.supervisord.process_groups[group_name]
  175. process_names = list(group.processes.keys())
  176. process_names.sort()
  177. for process_name in process_names:
  178. process = group.processes[process_name]
  179. all_processes.append((group, process))
  180. else:
  181. groups = list(self.supervisord.process_groups.values())
  182. groups.sort() # asc by priority
  183. for group in groups:
  184. processes = list(group.processes.values())
  185. processes.sort() # asc by priority
  186. for process in processes:
  187. all_processes.append((group, process))
  188. return all_processes
  189. def _getGroupAndProcess(self, name):
  190. # get process to start from name
  191. group_name, process_name = split_namespec(name)
  192. group = self.supervisord.process_groups.get(group_name)
  193. if group is None:
  194. raise RPCError(Faults.BAD_NAME, name)
  195. if process_name is None:
  196. return group, None
  197. process = group.processes.get(process_name)
  198. if process is None:
  199. raise RPCError(Faults.BAD_NAME, name)
  200. return group, process
  201. def startProcess(self, name, wait=True):
  202. """ Start a process
  203. @param string name Process name (or ``group:name``, or ``group:*``)
  204. @param boolean wait Wait for process to be fully started
  205. @return boolean result Always true unless error
  206. """
  207. self._update('startProcess')
  208. group, process = self._getGroupAndProcess(name)
  209. if process is None:
  210. group_name, process_name = split_namespec(name)
  211. return self.startProcessGroup(group_name, wait)
  212. # test filespec, don't bother trying to spawn if we know it will
  213. # eventually fail
  214. try:
  215. filename, argv = process.get_execv_args()
  216. except NotFound as why:
  217. raise RPCError(Faults.NO_FILE, why.args[0])
  218. except (NotExecutable, NoPermission) as why:
  219. raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
  220. if process.get_state() in RUNNING_STATES:
  221. raise RPCError(Faults.ALREADY_STARTED, name)
  222. process.spawn()
  223. # We call reap() in order to more quickly obtain the side effects of
  224. # process.finish(), which reap() eventually ends up calling. This
  225. # might be the case if the spawn() was successful but then the process
  226. # died before its startsecs elapsed or it exited with an unexpected
  227. # exit code. In particular, finish() may set spawnerr, which we can
  228. # check and immediately raise an RPCError, avoiding the need to
  229. # defer by returning a callback.
  230. self.supervisord.reap()
  231. if process.spawnerr:
  232. raise RPCError(Faults.SPAWN_ERROR, name)
  233. # We call process.transition() in order to more quickly obtain its
  234. # side effects. In particular, it might set the process' state from
  235. # STARTING->RUNNING if the process has a startsecs==0.
  236. process.transition()
  237. if wait and process.get_state() != ProcessStates.RUNNING:
  238. # by default, this branch will almost always be hit for processes
  239. # with default startsecs configurations, because the default number
  240. # of startsecs for a process is "1", and the process will not have
  241. # entered the RUNNING state yet even though we've called
  242. # transition() on it. This is because a process is not considered
  243. # RUNNING until it has stayed up > startsecs.
  244. def onwait():
  245. if process.spawnerr:
  246. raise RPCError(Faults.SPAWN_ERROR, name)
  247. state = process.get_state()
  248. if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
  249. raise RPCError(Faults.ABNORMAL_TERMINATION, name)
  250. if state == ProcessStates.RUNNING:
  251. return True
  252. return NOT_DONE_YET
  253. onwait.delay = 0.05
  254. onwait.rpcinterface = self
  255. return onwait # deferred
  256. return True
  257. def startProcessGroup(self, name, wait=True):
  258. """ Start all processes in the group named 'name'
  259. @param string name The group name
  260. @param boolean wait Wait for each process to be fully started
  261. @return array result An array of process status info structs
  262. """
  263. self._update('startProcessGroup')
  264. group = self.supervisord.process_groups.get(name)
  265. if group is None:
  266. raise RPCError(Faults.BAD_NAME, name)
  267. processes = list(group.processes.values())
  268. processes.sort()
  269. processes = [ (group, process) for process in processes ]
  270. startall = make_allfunc(processes, isNotRunning, self.startProcess,
  271. wait=wait)
  272. startall.delay = 0.05
  273. startall.rpcinterface = self
  274. return startall # deferred
  275. def startAllProcesses(self, wait=True):
  276. """ Start all processes listed in the configuration file
  277. @param boolean wait Wait for each process to be fully started
  278. @return array result An array of process status info structs
  279. """
  280. self._update('startAllProcesses')
  281. processes = self._getAllProcesses()
  282. startall = make_allfunc(processes, isNotRunning, self.startProcess,
  283. wait=wait)
  284. startall.delay = 0.05
  285. startall.rpcinterface = self
  286. return startall # deferred
  287. def stopProcess(self, name, wait=True):
  288. """ Stop a process named by name
  289. @param string name The name of the process to stop (or 'group:name')
  290. @param boolean wait Wait for the process to be fully stopped
  291. @return boolean result Always return True unless error
  292. """
  293. self._update('stopProcess')
  294. group, process = self._getGroupAndProcess(name)
  295. if process is None:
  296. group_name, process_name = split_namespec(name)
  297. return self.stopProcessGroup(group_name, wait)
  298. if process.get_state() not in RUNNING_STATES:
  299. raise RPCError(Faults.NOT_RUNNING, name)
  300. msg = process.stop()
  301. if msg is not None:
  302. raise RPCError(Faults.FAILED, msg)
  303. # We'll try to reap any killed child. FWIW, reap calls waitpid, and
  304. # then, if waitpid returns a pid, calls finish() on the process with
  305. # that pid, which drains any I/O from the process' dispatchers and
  306. # changes the process' state. I chose to call reap without once=True
  307. # because we don't really care if we reap more than one child. Even if
  308. # we only reap one child. we may not even be reaping the child that we
  309. # just stopped (this is all async, and process.stop() may not work, and
  310. # we'll need to wait for SIGKILL during process.transition() as the
  311. # result of normal select looping).
  312. self.supervisord.reap()
  313. if wait and process.get_state() not in STOPPED_STATES:
  314. def onwait():
  315. # process will eventually enter a stopped state by
  316. # virtue of the supervisord.reap() method being called
  317. # during normal operations
  318. process.stop_report()
  319. if process.get_state() not in STOPPED_STATES:
  320. return NOT_DONE_YET
  321. return True
  322. onwait.delay = 0
  323. onwait.rpcinterface = self
  324. return onwait # deferred
  325. return True
  326. def stopProcessGroup(self, name, wait=True):
  327. """ Stop all processes in the process group named 'name'
  328. @param string name The group name
  329. @param boolean wait Wait for each process to be fully stopped
  330. @return array result An array of process status info structs
  331. """
  332. self._update('stopProcessGroup')
  333. group = self.supervisord.process_groups.get(name)
  334. if group is None:
  335. raise RPCError(Faults.BAD_NAME, name)
  336. processes = list(group.processes.values())
  337. processes.sort()
  338. processes = [ (group, process) for process in processes ]
  339. killall = make_allfunc(processes, isRunning, self.stopProcess,
  340. wait=wait)
  341. killall.delay = 0.05
  342. killall.rpcinterface = self
  343. return killall # deferred
  344. def stopAllProcesses(self, wait=True):
  345. """ Stop all processes in the process list
  346. @param boolean wait Wait for each process to be fully stopped
  347. @return array result An array of process status info structs
  348. """
  349. self._update('stopAllProcesses')
  350. processes = self._getAllProcesses()
  351. killall = make_allfunc(processes, isRunning, self.stopProcess,
  352. wait=wait)
  353. killall.delay = 0.05
  354. killall.rpcinterface = self
  355. return killall # deferred
  356. def signalProcess(self, name, signal):
  357. """ Send an arbitrary UNIX signal to the process named by name
  358. @param string name Name of the process to signal (or 'group:name')
  359. @param string signal Signal to send, as name ('HUP') or number ('1')
  360. @return boolean
  361. """
  362. self._update('signalProcess')
  363. group, process = self._getGroupAndProcess(name)
  364. if process is None:
  365. group_name, process_name = split_namespec(name)
  366. return self.signalProcessGroup(group_name, signal=signal)
  367. try:
  368. sig = signal_number(signal)
  369. except ValueError:
  370. raise RPCError(Faults.BAD_SIGNAL, signal)
  371. if process.get_state() not in RUNNING_STATES:
  372. raise RPCError(Faults.NOT_RUNNING, name)
  373. msg = process.signal(sig)
  374. if not msg is None:
  375. raise RPCError(Faults.FAILED, msg)
  376. return True
  377. def signalProcessGroup(self, name, signal):
  378. """ Send a signal to all processes in the group named 'name'
  379. @param string name The group name
  380. @param string signal Signal to send, as name ('HUP') or number ('1')
  381. @return array
  382. """
  383. group = self.supervisord.process_groups.get(name)
  384. self._update('signalProcessGroup')
  385. if group is None:
  386. raise RPCError(Faults.BAD_NAME, name)
  387. processes = list(group.processes.values())
  388. processes.sort()
  389. processes = [(group, process) for process in processes]
  390. sendall = make_allfunc(processes, isRunning, self.signalProcess,
  391. signal=signal)
  392. result = sendall()
  393. self._update('signalProcessGroup')
  394. return result
  395. def signalAllProcesses(self, signal):
  396. """ Send a signal to all processes in the process list
  397. @param string signal Signal to send, as name ('HUP') or number ('1')
  398. @return array An array of process status info structs
  399. """
  400. processes = self._getAllProcesses()
  401. signalall = make_allfunc(processes, isRunning, self.signalProcess,
  402. signal=signal)
  403. result = signalall()
  404. self._update('signalAllProcesses')
  405. return result
  406. def getAllConfigInfo(self):
  407. """ Get info about all available process configurations. Each struct
  408. represents a single process (i.e. groups get flattened).
  409. @return array result An array of process config info structs
  410. """
  411. self._update('getAllConfigInfo')
  412. configinfo = []
  413. for gconfig in self.supervisord.options.process_group_configs:
  414. inuse = gconfig.name in self.supervisord.process_groups
  415. for pconfig in gconfig.process_configs:
  416. configinfo.append(
  417. {
  418. 'autostart': pconfig.autostart,
  419. 'command': pconfig.command,
  420. 'exitcodes': pconfig.exitcodes,
  421. 'group': gconfig.name,
  422. 'group_prio': gconfig.priority,
  423. 'inuse': inuse,
  424. 'killasgroup': pconfig.killasgroup,
  425. 'name': pconfig.name,
  426. 'process_prio': pconfig.priority,
  427. 'redirect_stderr': pconfig.redirect_stderr,
  428. 'startretries': pconfig.startretries,
  429. 'startsecs': pconfig.startsecs,
  430. 'stdout_capture_maxbytes': pconfig.stdout_capture_maxbytes,
  431. 'stdout_events_enabled': pconfig.stdout_events_enabled,
  432. 'stdout_logfile': pconfig.stdout_logfile,
  433. 'stdout_logfile_backups': pconfig.stdout_logfile_backups,
  434. 'stdout_logfile_maxbytes': pconfig.stdout_logfile_maxbytes,
  435. 'stdout_syslog': pconfig.stdout_syslog,
  436. 'stopsignal': pconfig.stopsignal,
  437. 'stopwaitsecs': pconfig.stopwaitsecs,
  438. 'stderr_capture_maxbytes': pconfig.stderr_capture_maxbytes,
  439. 'stderr_events_enabled': pconfig.stderr_events_enabled,
  440. 'stderr_logfile': pconfig.stderr_logfile,
  441. 'stderr_logfile_backups': pconfig.stderr_logfile_backups,
  442. 'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes,
  443. 'stderr_syslog': pconfig.stderr_syslog,
  444. }
  445. )
  446. configinfo.sort(key=lambda r: r['name'])
  447. return configinfo
  448. def _interpretProcessInfo(self, info):
  449. state = info['state']
  450. if state == ProcessStates.RUNNING:
  451. start = info['start']
  452. now = info['now']
  453. start_dt = datetime.datetime(*time.gmtime(start)[:6])
  454. now_dt = datetime.datetime(*time.gmtime(now)[:6])
  455. uptime = now_dt - start_dt
  456. if _total_seconds(uptime) < 0: # system time set back
  457. uptime = datetime.timedelta(0)
  458. desc = 'pid %s, uptime %s' % (info['pid'], uptime)
  459. elif state in (ProcessStates.FATAL, ProcessStates.BACKOFF):
  460. desc = info['spawnerr']
  461. if not desc:
  462. desc = 'unknown error (try "tail %s")' % info['name']
  463. elif state in (ProcessStates.STOPPED, ProcessStates.EXITED):
  464. if info['start']:
  465. stop = info['stop']
  466. stop_dt = datetime.datetime(*time.localtime(stop)[:7])
  467. desc = stop_dt.strftime('%b %d %I:%M %p')
  468. else:
  469. desc = 'Not started'
  470. else:
  471. desc = ''
  472. return desc
  473. def getProcessInfo(self, name):
  474. """ Get info about a process named name
  475. @param string name The name of the process (or 'group:name')
  476. @return struct result A structure containing data about the process
  477. """
  478. self._update('getProcessInfo')
  479. group, process = self._getGroupAndProcess(name)
  480. if process is None:
  481. raise RPCError(Faults.BAD_NAME, name)
  482. start = int(process.laststart)
  483. stop = int(process.laststop)
  484. now = int(time.time())
  485. state = process.get_state()
  486. spawnerr = process.spawnerr or ''
  487. exitstatus = process.exitstatus or 0
  488. stdout_logfile = process.config.stdout_logfile or ''
  489. stderr_logfile = process.config.stderr_logfile or ''
  490. info = {
  491. 'name':process.config.name,
  492. 'group':group.config.name,
  493. 'start':start,
  494. 'stop':stop,
  495. 'now':now,
  496. 'state':state,
  497. 'statename':getProcessStateDescription(state),
  498. 'spawnerr':spawnerr,
  499. 'exitstatus':exitstatus,
  500. 'logfile':stdout_logfile, # b/c alias
  501. 'stdout_logfile':stdout_logfile,
  502. 'stderr_logfile':stderr_logfile,
  503. 'pid':process.pid,
  504. }
  505. description = self._interpretProcessInfo(info)
  506. info['description'] = description
  507. return info
  508. def getAllProcessInfo(self):
  509. """ Get info about all processes
  510. @return array result An array of process status results
  511. """
  512. self._update('getAllProcessInfo')
  513. all_processes = self._getAllProcesses(lexical=True)
  514. output = []
  515. for group, process in all_processes:
  516. name = make_namespec(group.config.name, process.config.name)
  517. output.append(self.getProcessInfo(name))
  518. return output
  519. def _readProcessLog(self, name, offset, length, channel):
  520. group, process = self._getGroupAndProcess(name)
  521. if process is None:
  522. raise RPCError(Faults.BAD_NAME, name)
  523. logfile = getattr(process.config, '%s_logfile' % channel)
  524. if logfile is None or not os.path.exists(logfile):
  525. raise RPCError(Faults.NO_FILE, logfile)
  526. try:
  527. return as_string(readFile(logfile, int(offset), int(length)))
  528. except ValueError as inst:
  529. why = inst.args[0]
  530. raise RPCError(getattr(Faults, why))
  531. def readProcessStdoutLog(self, name, offset, length):
  532. """ Read length bytes from name's stdout log starting at offset
  533. @param string name the name of the process (or 'group:name')
  534. @param int offset offset to start reading from.
  535. @param int length number of bytes to read from the log.
  536. @return string result Bytes of log
  537. """
  538. self._update('readProcessStdoutLog')
  539. return self._readProcessLog(name, offset, length, 'stdout')
  540. readProcessLog = readProcessStdoutLog # b/c alias
  541. def readProcessStderrLog(self, name, offset, length):
  542. """ Read length bytes from name's stderr log starting at offset
  543. @param string name the name of the process (or 'group:name')
  544. @param int offset offset to start reading from.
  545. @param int length number of bytes to read from the log.
  546. @return string result Bytes of log
  547. """
  548. self._update('readProcessStderrLog')
  549. return self._readProcessLog(name, offset, length, 'stderr')
  550. def _tailProcessLog(self, name, offset, length, channel):
  551. group, process = self._getGroupAndProcess(name)
  552. if process is None:
  553. raise RPCError(Faults.BAD_NAME, name)
  554. logfile = getattr(process.config, '%s_logfile' % channel)
  555. if logfile is None or not os.path.exists(logfile):
  556. return ['', 0, False]
  557. return tailFile(logfile, int(offset), int(length))
  558. def tailProcessStdoutLog(self, name, offset, length):
  559. """
  560. Provides a more efficient way to tail the (stdout) log than
  561. readProcessStdoutLog(). Use readProcessStdoutLog() to read
  562. chunks and tailProcessStdoutLog() to tail.
  563. Requests (length) bytes from the (name)'s log, starting at
  564. (offset). If the total log size is greater than (offset +
  565. length), the overflow flag is set and the (offset) is
  566. automatically increased to position the buffer at the end of
  567. the log. If less than (length) bytes are available, the
  568. maximum number of available bytes will be returned. (offset)
  569. returned is always the last offset in the log +1.
  570. @param string name the name of the process (or 'group:name')
  571. @param int offset offset to start reading from
  572. @param int length maximum number of bytes to return
  573. @return array result [string bytes, int offset, bool overflow]
  574. """
  575. self._update('tailProcessStdoutLog')
  576. return self._tailProcessLog(name, offset, length, 'stdout')
  577. tailProcessLog = tailProcessStdoutLog # b/c alias
  578. def tailProcessStderrLog(self, name, offset, length):
  579. """
  580. Provides a more efficient way to tail the (stderr) log than
  581. readProcessStderrLog(). Use readProcessStderrLog() to read
  582. chunks and tailProcessStderrLog() to tail.
  583. Requests (length) bytes from the (name)'s log, starting at
  584. (offset). If the total log size is greater than (offset +
  585. length), the overflow flag is set and the (offset) is
  586. automatically increased to position the buffer at the end of
  587. the log. If less than (length) bytes are available, the
  588. maximum number of available bytes will be returned. (offset)
  589. returned is always the last offset in the log +1.
  590. @param string name the name of the process (or 'group:name')
  591. @param int offset offset to start reading from
  592. @param int length maximum number of bytes to return
  593. @return array result [string bytes, int offset, bool overflow]
  594. """
  595. self._update('tailProcessStderrLog')
  596. return self._tailProcessLog(name, offset, length, 'stderr')
  597. def clearProcessLogs(self, name):
  598. """ Clear the stdout and stderr logs for the named process and
  599. reopen them.
  600. @param string name The name of the process (or 'group:name')
  601. @return boolean result Always True unless error
  602. """
  603. self._update('clearProcessLogs')
  604. group, process = self._getGroupAndProcess(name)
  605. if process is None:
  606. raise RPCError(Faults.BAD_NAME, name)
  607. try:
  608. # implies a reopen
  609. process.removelogs()
  610. except (IOError, OSError):
  611. raise RPCError(Faults.FAILED, name)
  612. return True
  613. clearProcessLog = clearProcessLogs # b/c alias
  614. def clearAllProcessLogs(self):
  615. """ Clear all process log files
  616. @return array result An array of process status info structs
  617. """
  618. self._update('clearAllProcessLogs')
  619. results = []
  620. callbacks = []
  621. all_processes = self._getAllProcesses()
  622. for group, process in all_processes:
  623. callbacks.append((group, process, self.clearProcessLog))
  624. def clearall():
  625. if not callbacks:
  626. return results
  627. group, process, callback = callbacks.pop(0)
  628. name = make_namespec(group.config.name, process.config.name)
  629. try:
  630. callback(name)
  631. except RPCError as e:
  632. results.append(
  633. {'name':process.config.name,
  634. 'group':group.config.name,
  635. 'status':e.code,
  636. 'description':e.text})
  637. else:
  638. results.append(
  639. {'name':process.config.name,
  640. 'group':group.config.name,
  641. 'status':Faults.SUCCESS,
  642. 'description':'OK'}
  643. )
  644. if callbacks:
  645. return NOT_DONE_YET
  646. return results
  647. clearall.delay = 0.05
  648. clearall.rpcinterface = self
  649. return clearall # deferred
  650. def sendProcessStdin(self, name, chars):
  651. """ Send a string of chars to the stdin of the process name.
  652. If non-7-bit data is sent (unicode), it is encoded to utf-8
  653. before being sent to the process' stdin. If chars is not a
  654. string or is not unicode, raise INCORRECT_PARAMETERS. If the
  655. process is not running, raise NOT_RUNNING. If the process'
  656. stdin cannot accept input (e.g. it was closed by the child
  657. process), raise NO_FILE.
  658. @param string name The process name to send to (or 'group:name')
  659. @param string chars The character data to send to the process
  660. @return boolean result Always return True unless error
  661. """
  662. self._update('sendProcessStdin')
  663. if not isinstance(chars, (str, bytes, unicode)):
  664. raise RPCError(Faults.INCORRECT_PARAMETERS, chars)
  665. chars = as_bytes(chars)
  666. group, process = self._getGroupAndProcess(name)
  667. if process is None:
  668. raise RPCError(Faults.BAD_NAME, name)
  669. if not process.pid or process.killing:
  670. raise RPCError(Faults.NOT_RUNNING, name)
  671. try:
  672. process.write(chars)
  673. except OSError as why:
  674. if why.args[0] == errno.EPIPE:
  675. raise RPCError(Faults.NO_FILE, name)
  676. else:
  677. raise
  678. return True
  679. def sendRemoteCommEvent(self, type, data):
  680. """ Send an event that will be received by event listener
  681. subprocesses subscribing to the RemoteCommunicationEvent.
  682. @param string type String for the "type" key in the event header
  683. @param string data Data for the event body
  684. @return boolean Always return True unless error
  685. """
  686. if isinstance(type, unicode):
  687. type = type.encode('utf-8')
  688. if isinstance(data, unicode):
  689. data = data.encode('utf-8')
  690. notify(
  691. RemoteCommunicationEvent(type, data)
  692. )
  693. return True
  694. def _total_seconds(timedelta):
  695. return ((timedelta.days * 86400 + timedelta.seconds) * 10**6 +
  696. timedelta.microseconds) / 10**6
  697. def make_allfunc(processes, predicate, func, **extra_kwargs):
  698. """ Return a closure representing a function that calls a
  699. function for every process, and returns a result """
  700. callbacks = []
  701. results = []
  702. def allfunc(
  703. processes=processes,
  704. predicate=predicate,
  705. func=func,
  706. extra_kwargs=extra_kwargs,
  707. callbacks=callbacks, # used only to fool scoping, never passed by caller
  708. results=results, # used only to fool scoping, never passed by caller
  709. ):
  710. if not callbacks:
  711. for group, process in processes:
  712. name = make_namespec(group.config.name, process.config.name)
  713. if predicate(process):
  714. try:
  715. callback = func(name, **extra_kwargs)
  716. except RPCError as e:
  717. results.append({'name':process.config.name,
  718. 'group':group.config.name,
  719. 'status':e.code,
  720. 'description':e.text})
  721. continue
  722. if isinstance(callback, types.FunctionType):
  723. callbacks.append((group, process, callback))
  724. else:
  725. results.append(
  726. {'name':process.config.name,
  727. 'group':group.config.name,
  728. 'status':Faults.SUCCESS,
  729. 'description':'OK'}
  730. )
  731. if not callbacks:
  732. return results
  733. for struct in callbacks[:]:
  734. group, process, cb = struct
  735. try:
  736. value = cb()
  737. except RPCError as e:
  738. results.append(
  739. {'name':process.config.name,
  740. 'group':group.config.name,
  741. 'status':e.code,
  742. 'description':e.text})
  743. callbacks.remove(struct)
  744. else:
  745. if value is not NOT_DONE_YET:
  746. results.append(
  747. {'name':process.config.name,
  748. 'group':group.config.name,
  749. 'status':Faults.SUCCESS,
  750. 'description':'OK'}
  751. )
  752. callbacks.remove(struct)
  753. if callbacks:
  754. return NOT_DONE_YET
  755. return results
  756. # XXX the above implementation has a weakness inasmuch as the
  757. # first call into each individual process callback will always
  758. # return NOT_DONE_YET, so they need to be called twice. The
  759. # symptom of this is that calling this method causes the
  760. # client to block for much longer than it actually requires to
  761. # kill all of the running processes. After the first call to
  762. # the killit callback, the process is actually dead, but the
  763. # above killall method processes the callbacks one at a time
  764. # during the select loop, which, because there is no output
  765. # from child processes after e.g. stopAllProcesses is called,
  766. # is not busy, so hits the timeout for each callback. I
  767. # attempted to make this better, but the only way to make it
  768. # better assumes totally synchronous reaping of child
  769. # processes, which requires infrastructure changes to
  770. # supervisord that are scary at the moment as it could take a
  771. # while to pin down all of the platform differences and might
  772. # require a C extension to the Python signal module to allow
  773. # the setting of ignore flags to signals.
  774. return allfunc
  775. def isRunning(process):
  776. if process.get_state() in RUNNING_STATES:
  777. return True
  778. def isNotRunning(process):
  779. return not isRunning(process)
  780. # this is not used in code but referenced via an entry point in the conf file
  781. def make_main_rpcinterface(supervisord):
  782. return SupervisorNamespaceRPCInterface(supervisord)