rpcinterface.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. import os
  2. import time
  3. import datetime
  4. import errno
  5. from supervisor.compat import as_string
  6. from supervisor.compat import unicode
  7. from supervisor.compat import basestring
  8. from supervisor.options import readFile
  9. from supervisor.options import tailFile
  10. from supervisor.options import NotExecutable
  11. from supervisor.options import NotFound
  12. from supervisor.options import NoPermission
  13. from supervisor.options import make_namespec
  14. from supervisor.options import split_namespec
  15. from supervisor.options import VERSION
  16. from supervisor.events import notify
  17. from supervisor.events import RemoteCommunicationEvent
  18. from supervisor.http import NOT_DONE_YET
  19. from supervisor.xmlrpc import Faults
  20. from supervisor.xmlrpc import RPCError
  21. from supervisor.states import SupervisorStates
  22. from supervisor.states import getSupervisorStateDescription
  23. from supervisor.states import ProcessStates
  24. from supervisor.states import getProcessStateDescription
  25. from supervisor.states import RUNNING_STATES
  26. API_VERSION = '3.0'
  27. class SupervisorNamespaceRPCInterface:
  28. def __init__(self, supervisord):
  29. self.supervisord = supervisord
  30. def _update(self, text):
  31. self.update_text = text # for unit tests, mainly
  32. if isinstance(self.supervisord.options.mood, int) and self.supervisord.options.mood < SupervisorStates.RUNNING:
  33. raise RPCError(Faults.SHUTDOWN_STATE)
  34. # RPC API methods
  35. def getAPIVersion(self):
  36. """ Return the version of the RPC API used by supervisord
  37. @return string version version id
  38. """
  39. self._update('getAPIVersion')
  40. return API_VERSION
  41. getVersion = getAPIVersion # b/w compatibility with releases before 3.0
  42. def getSupervisorVersion(self):
  43. """ Return the version of the supervisor package in use by supervisord
  44. @return string version version id
  45. """
  46. self._update('getSupervisorVersion')
  47. return VERSION
  48. def getIdentification(self):
  49. """ Return identifying string of supervisord
  50. @return string identifier identifying string
  51. """
  52. self._update('getIdentification')
  53. return self.supervisord.options.identifier
  54. def getState(self):
  55. """ Return current state of supervisord as a struct
  56. @return struct A struct with keys int statecode, string statename
  57. """
  58. self._update('getState')
  59. state = self.supervisord.options.mood
  60. statename = getSupervisorStateDescription(state)
  61. data = {
  62. 'statecode':state,
  63. 'statename':statename,
  64. }
  65. return data
  66. def getPID(self):
  67. """ Return the PID of supervisord
  68. @return int PID
  69. """
  70. self._update('getPID')
  71. return self.supervisord.options.get_pid()
  72. def readLog(self, offset, length):
  73. """ Read length bytes from the main log starting at offset
  74. @param int offset offset to start reading from.
  75. @param int length number of bytes to read from the log.
  76. @return string result Bytes of log
  77. """
  78. self._update('readLog')
  79. logfile = self.supervisord.options.logfile
  80. if logfile is None or not os.path.exists(logfile):
  81. raise RPCError(Faults.NO_FILE, logfile)
  82. try:
  83. return as_string(readFile(logfile, int(offset), int(length)))
  84. except ValueError as inst:
  85. why = inst.args[0]
  86. raise RPCError(getattr(Faults, why))
  87. readMainLog = readLog # b/w compatibility with releases before 2.1
  88. def clearLog(self):
  89. """ Clear the main log.
  90. @return boolean result always returns True unless error
  91. """
  92. self._update('clearLog')
  93. logfile = self.supervisord.options.logfile
  94. if logfile is None or not self.supervisord.options.exists(logfile):
  95. raise RPCError(Faults.NO_FILE)
  96. # there is a race condition here, but ignore it.
  97. try:
  98. self.supervisord.options.remove(logfile)
  99. except (OSError, IOError):
  100. raise RPCError(Faults.FAILED)
  101. for handler in self.supervisord.options.logger.handlers:
  102. if hasattr(handler, 'reopen'):
  103. self.supervisord.options.logger.info('reopening log file')
  104. handler.reopen()
  105. return True
  106. def shutdown(self):
  107. """ Shut down the supervisor process
  108. @return boolean result always returns True unless error
  109. """
  110. self._update('shutdown')
  111. self.supervisord.options.mood = SupervisorStates.SHUTDOWN
  112. return True
  113. def restart(self):
  114. """ Restart the supervisor process
  115. @return boolean result always return True unless error
  116. """
  117. self._update('restart')
  118. self.supervisord.options.mood = SupervisorStates.RESTARTING
  119. return True
  120. def reloadConfig(self):
  121. """
  122. Reload configuration
  123. @return boolean result always return True unless error
  124. """
  125. self._update('reloadConfig')
  126. try:
  127. self.supervisord.options.process_config(do_usage=False)
  128. except ValueError as msg:
  129. raise RPCError(Faults.CANT_REREAD, msg)
  130. added, changed, removed = self.supervisord.diff_to_active()
  131. added = [group.name for group in added]
  132. changed = [group.name for group in changed]
  133. removed = [group.name for group in removed]
  134. return [[added, changed, removed]] # cannot return len > 1, apparently
  135. def addProcessGroup(self, name):
  136. """ Update the config for a running process from config file.
  137. @param string name name of process group to add
  138. @return boolean result true if successful
  139. """
  140. self._update('addProcessGroup')
  141. for config in self.supervisord.options.process_group_configs:
  142. if config.name == name:
  143. result = self.supervisord.add_process_group(config)
  144. if not result:
  145. raise RPCError(Faults.ALREADY_ADDED, name)
  146. return True
  147. raise RPCError(Faults.BAD_NAME, name)
  148. def removeProcessGroup(self, name):
  149. """ Remove a stopped process from the active configuration.
  150. @param string name name of process group to remove
  151. @return boolean result Indicates whether the removal was successful
  152. """
  153. self._update('removeProcessGroup')
  154. if name not in self.supervisord.process_groups:
  155. raise RPCError(Faults.BAD_NAME, name)
  156. result = self.supervisord.remove_process_group(name)
  157. if not result:
  158. raise RPCError(Faults.STILL_RUNNING)
  159. return True
  160. def _getAllProcesses(self, lexical=False):
  161. # if lexical is true, return processes sorted in lexical order,
  162. # otherwise, sort in priority order
  163. all_processes = []
  164. if lexical:
  165. group_names = list(self.supervisord.process_groups.keys())
  166. group_names.sort()
  167. for group_name in group_names:
  168. group = self.supervisord.process_groups[group_name]
  169. process_names = list(group.processes.keys())
  170. process_names.sort()
  171. for process_name in process_names:
  172. process = group.processes[process_name]
  173. all_processes.append((group, process))
  174. else:
  175. groups = list(self.supervisord.process_groups.values())
  176. groups.sort() # asc by priority
  177. for group in groups:
  178. processes = list(group.processes.values())
  179. processes.sort() # asc by priority
  180. for process in processes:
  181. all_processes.append((group, process))
  182. return all_processes
  183. def _getGroupAndProcess(self, name):
  184. # get process to start from name
  185. group_name, process_name = split_namespec(name)
  186. group = self.supervisord.process_groups.get(group_name)
  187. if group is None:
  188. raise RPCError(Faults.BAD_NAME, name)
  189. if process_name is None:
  190. return group, None
  191. process = group.processes.get(process_name)
  192. if process is None:
  193. raise RPCError(Faults.BAD_NAME, name)
  194. return group, process
  195. def startProcess(self, name, wait=True):
  196. """ Start a process
  197. @param string name Process name (or ``group:name``, or ``group:*``)
  198. @param boolean wait Wait for process to be fully started
  199. @return boolean result Always true unless error
  200. """
  201. self._update('startProcess')
  202. group, process = self._getGroupAndProcess(name)
  203. if process is None:
  204. group_name, process_name = split_namespec(name)
  205. return self.startProcessGroup(group_name, wait)
  206. # test filespec, don't bother trying to spawn if we know it will
  207. # eventually fail
  208. try:
  209. filename, argv = process.get_execv_args()
  210. except NotFound as why:
  211. raise RPCError(Faults.NO_FILE, why.args[0])
  212. except (NotExecutable, NoPermission) as why:
  213. raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
  214. started = []
  215. startsecs = process.config.startsecs
  216. def startit():
  217. if not started:
  218. if process.get_state() in RUNNING_STATES:
  219. raise RPCError(Faults.ALREADY_STARTED, name)
  220. process.spawn()
  221. if process.spawnerr:
  222. raise RPCError(Faults.SPAWN_ERROR, name)
  223. # we use a list here to fake out lexical scoping;
  224. # using a direct assignment to 'started' in the
  225. # function appears to not work (symptom: 2nd or 3rd
  226. # call through, it forgets about 'started', claiming
  227. # it's undeclared).
  228. started.append(time.time())
  229. if not wait or not startsecs:
  230. return True
  231. t = time.time()
  232. runtime = (t - started[0])
  233. state = process.get_state()
  234. if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
  235. raise RPCError(Faults.ABNORMAL_TERMINATION, name)
  236. if runtime < startsecs:
  237. return NOT_DONE_YET
  238. if state == ProcessStates.RUNNING:
  239. return True
  240. raise RPCError(Faults.ABNORMAL_TERMINATION, name)
  241. startit.delay = 0.05
  242. startit.rpcinterface = self
  243. return startit # deferred
  244. def startProcessGroup(self, name, wait=True):
  245. """ Start all processes in the group named 'name'
  246. @param string name The group name
  247. @param boolean wait Wait for each process to be fully started
  248. @return array result An array of process status info structs
  249. """
  250. self._update('startProcessGroup')
  251. group = self.supervisord.process_groups.get(name)
  252. if group is None:
  253. raise RPCError(Faults.BAD_NAME, name)
  254. processes = list(group.processes.values())
  255. processes.sort()
  256. processes = [ (group, process) for process in processes ]
  257. startall = make_allfunc(processes, isNotRunning, self.startProcess,
  258. wait=wait)
  259. startall.delay = 0.05
  260. startall.rpcinterface = self
  261. return startall # deferred
  262. def startAllProcesses(self, wait=True):
  263. """ Start all processes listed in the configuration file
  264. @param boolean wait Wait for each process to be fully started
  265. @return array result An array of process status info structs
  266. """
  267. self._update('startAllProcesses')
  268. processes = self._getAllProcesses()
  269. startall = make_allfunc(processes, isNotRunning, self.startProcess,
  270. wait=wait)
  271. startall.delay = 0.05
  272. startall.rpcinterface = self
  273. return startall # deferred
  274. def stopProcess(self, name, wait=True):
  275. """ Stop a process named by name
  276. @param string name The name of the process to stop (or 'group:name')
  277. @param boolean wait Wait for the process to be fully stopped
  278. @return boolean result Always return True unless error
  279. """
  280. self._update('stopProcess')
  281. group, process = self._getGroupAndProcess(name)
  282. if process is None:
  283. group_name, process_name = split_namespec(name)
  284. return self.stopProcessGroup(group_name, wait)
  285. stopped = []
  286. called = []
  287. def killit():
  288. if not called:
  289. if process.get_state() not in RUNNING_STATES:
  290. raise RPCError(Faults.NOT_RUNNING)
  291. # use a mutable for lexical scoping; see startProcess
  292. called.append(1)
  293. if not stopped:
  294. msg = process.stop()
  295. if msg is not None:
  296. raise RPCError(Faults.FAILED, msg)
  297. stopped.append(1)
  298. if wait:
  299. return NOT_DONE_YET
  300. else:
  301. return True
  302. if process.get_state() not in (ProcessStates.STOPPED,
  303. ProcessStates.EXITED):
  304. return NOT_DONE_YET
  305. else:
  306. return True
  307. killit.delay = 0.2
  308. killit.rpcinterface = self
  309. return killit # deferred
  310. def stopProcessGroup(self, name, wait=True):
  311. """ Stop all processes in the process group named 'name'
  312. @param string name The group name
  313. @param boolean wait Wait for each process to be fully stopped
  314. @return array result An array of process status info structs
  315. """
  316. self._update('stopProcessGroup')
  317. group = self.supervisord.process_groups.get(name)
  318. if group is None:
  319. raise RPCError(Faults.BAD_NAME, name)
  320. processes = list(group.processes.values())
  321. processes.sort()
  322. processes = [ (group, process) for process in processes ]
  323. killall = make_allfunc(processes, isRunning, self.stopProcess,
  324. wait=wait)
  325. killall.delay = 0.05
  326. killall.rpcinterface = self
  327. return killall # deferred
  328. def stopAllProcesses(self, wait=True):
  329. """ Stop all processes in the process list
  330. @param boolean wait Wait for each process to be fully stopped
  331. @return array result An array of process status info structs
  332. """
  333. self._update('stopAllProcesses')
  334. processes = self._getAllProcesses()
  335. killall = make_allfunc(processes, isRunning, self.stopProcess,
  336. wait=wait)
  337. killall.delay = 0.05
  338. killall.rpcinterface = self
  339. return killall # deferred
  340. def getAllConfigInfo(self):
  341. """ Get info about all available process configurations. Each struct
  342. represents a single process (i.e. groups get flattened).
  343. @return array result An array of process config info structs
  344. """
  345. self._update('getAllConfigInfo')
  346. configinfo = []
  347. for gconfig in self.supervisord.options.process_group_configs:
  348. inuse = gconfig.name in self.supervisord.process_groups
  349. for pconfig in gconfig.process_configs:
  350. configinfo.append(
  351. { 'name': pconfig.name,
  352. 'group': gconfig.name,
  353. 'inuse': inuse,
  354. 'autostart': pconfig.autostart,
  355. 'group_prio': gconfig.priority,
  356. 'process_prio': pconfig.priority })
  357. configinfo.sort(key=lambda r: r['name'])
  358. return configinfo
  359. def _interpretProcessInfo(self, info):
  360. state = info['state']
  361. if state == ProcessStates.RUNNING:
  362. start = info['start']
  363. now = info['now']
  364. start_dt = datetime.datetime(*time.gmtime(start)[:6])
  365. now_dt = datetime.datetime(*time.gmtime(now)[:6])
  366. uptime = now_dt - start_dt
  367. desc = 'pid %s, uptime %s' % (info['pid'], uptime)
  368. elif state in (ProcessStates.FATAL, ProcessStates.BACKOFF):
  369. desc = info['spawnerr']
  370. if not desc:
  371. desc = 'unknown error (try "tail %s")' % info['name']
  372. elif state in (ProcessStates.STOPPED, ProcessStates.EXITED):
  373. if info['start']:
  374. stop = info['stop']
  375. stop_dt = datetime.datetime(*time.localtime(stop)[:7])
  376. desc = stop_dt.strftime('%b %d %I:%M %p')
  377. else:
  378. desc = 'Not started'
  379. else:
  380. desc = ''
  381. return desc
  382. def getProcessInfo(self, name):
  383. """ Get info about a process named name
  384. @param string name The name of the process (or 'group:name')
  385. @return struct result A structure containing data about the process
  386. """
  387. self._update('getProcessInfo')
  388. group, process = self._getGroupAndProcess(name)
  389. if process is None:
  390. raise RPCError(Faults.BAD_NAME, name)
  391. start = int(process.laststart)
  392. stop = int(process.laststop)
  393. now = int(time.time())
  394. state = process.get_state()
  395. spawnerr = process.spawnerr or ''
  396. exitstatus = process.exitstatus or 0
  397. stdout_logfile = process.config.stdout_logfile or ''
  398. stderr_logfile = process.config.stderr_logfile or ''
  399. info = {
  400. 'name':process.config.name,
  401. 'group':group.config.name,
  402. 'start':start,
  403. 'stop':stop,
  404. 'now':now,
  405. 'state':state,
  406. 'statename':getProcessStateDescription(state),
  407. 'spawnerr':spawnerr,
  408. 'exitstatus':exitstatus,
  409. 'logfile':stdout_logfile, # b/c alias
  410. 'stdout_logfile':stdout_logfile,
  411. 'stderr_logfile':stderr_logfile,
  412. 'pid':process.pid,
  413. }
  414. description = self._interpretProcessInfo(info)
  415. info['description'] = description
  416. return info
  417. def getAllProcessInfo(self):
  418. """ Get info about all processes
  419. @return array result An array of process status results
  420. """
  421. self._update('getAllProcessInfo')
  422. all_processes = self._getAllProcesses(lexical=True)
  423. output = []
  424. for group, process in all_processes:
  425. name = make_namespec(group.config.name, process.config.name)
  426. output.append(self.getProcessInfo(name))
  427. return output
  428. def _readProcessLog(self, name, offset, length, channel):
  429. group, process = self._getGroupAndProcess(name)
  430. logfile = getattr(process.config, '%s_logfile' % channel)
  431. if logfile is None or not os.path.exists(logfile):
  432. raise RPCError(Faults.NO_FILE, logfile)
  433. try:
  434. return as_string(readFile(logfile, int(offset), int(length)))
  435. except ValueError as inst:
  436. why = inst.args[0]
  437. raise RPCError(getattr(Faults, why))
  438. def readProcessStdoutLog(self, name, offset, length):
  439. """ Read length bytes from name's stdout log starting at offset
  440. @param string name the name of the process (or 'group:name')
  441. @param int offset offset to start reading from.
  442. @param int length number of bytes to read from the log.
  443. @return string result Bytes of log
  444. """
  445. self._update('readProcessStdoutLog')
  446. return self._readProcessLog(name, offset, length, 'stdout')
  447. readProcessLog = readProcessStdoutLog # b/c alias
  448. def readProcessStderrLog(self, name, offset, length):
  449. """ Read length bytes from name's stderr log starting at offset
  450. @param string name the name of the process (or 'group:name')
  451. @param int offset offset to start reading from.
  452. @param int length number of bytes to read from the log.
  453. @return string result Bytes of log
  454. """
  455. self._update('readProcessStderrLog')
  456. return self._readProcessLog(name, offset, length, 'stderr')
  457. def _tailProcessLog(self, name, offset, length, channel):
  458. group, process = self._getGroupAndProcess(name)
  459. logfile = getattr(process.config, '%s_logfile' % channel)
  460. if logfile is None or not os.path.exists(logfile):
  461. return ['', 0, False]
  462. return tailFile(logfile, int(offset), int(length))
  463. def tailProcessStdoutLog(self, name, offset, length):
  464. """
  465. Provides a more efficient way to tail the (stdout) log than
  466. readProcessStdoutLog(). Use readProcessStdoutLog() to read
  467. chunks and tailProcessStdoutLog() to tail.
  468. Requests (length) bytes from the (name)'s log, starting at
  469. (offset). If the total log size is greater than (offset +
  470. length), the overflow flag is set and the (offset) is
  471. automatically increased to position the buffer at the end of
  472. the log. If less than (length) bytes are available, the
  473. maximum number of available bytes will be returned. (offset)
  474. returned is always the last offset in the log +1.
  475. @param string name the name of the process (or 'group:name')
  476. @param int offset offset to start reading from
  477. @param int length maximum number of bytes to return
  478. @return array result [string bytes, int offset, bool overflow]
  479. """
  480. self._update('tailProcessStdoutLog')
  481. return self._tailProcessLog(name, offset, length, 'stdout')
  482. tailProcessLog = tailProcessStdoutLog # b/c alias
  483. def tailProcessStderrLog(self, name, offset, length):
  484. """
  485. Provides a more efficient way to tail the (stderr) log than
  486. readProcessStderrLog(). Use readProcessStderrLog() to read
  487. chunks and tailProcessStderrLog() to tail.
  488. Requests (length) bytes from the (name)'s log, starting at
  489. (offset). If the total log size is greater than (offset +
  490. length), the overflow flag is set and the (offset) is
  491. automatically increased to position the buffer at the end of
  492. the log. If less than (length) bytes are available, the
  493. maximum number of available bytes will be returned. (offset)
  494. returned is always the last offset in the log +1.
  495. @param string name the name of the process (or 'group:name')
  496. @param int offset offset to start reading from
  497. @param int length maximum number of bytes to return
  498. @return array result [string bytes, int offset, bool overflow]
  499. """
  500. self._update('tailProcessStderrLog')
  501. return self._tailProcessLog(name, offset, length, 'stderr')
  502. def clearProcessLogs(self, name):
  503. """ Clear the stdout and stderr logs for the named process and
  504. reopen them.
  505. @param string name The name of the process (or 'group:name')
  506. @return boolean result Always True unless error
  507. """
  508. self._update('clearProcessLogs')
  509. group, process = self._getGroupAndProcess(name)
  510. if process is None:
  511. raise RPCError(Faults.BAD_NAME, name)
  512. try:
  513. # implies a reopen
  514. process.removelogs()
  515. except (IOError, OSError):
  516. raise RPCError(Faults.FAILED, name)
  517. return True
  518. clearProcessLog = clearProcessLogs # b/c alias
  519. def clearAllProcessLogs(self):
  520. """ Clear all process log files
  521. @return array result An array of process status info structs
  522. """
  523. self._update('clearAllProcessLogs')
  524. results = []
  525. callbacks = []
  526. all_processes = self._getAllProcesses()
  527. for group, process in all_processes:
  528. callbacks.append((group, process, self.clearProcessLog))
  529. def clearall():
  530. if not callbacks:
  531. return results
  532. group, process, callback = callbacks.pop(0)
  533. name = make_namespec(group.config.name, process.config.name)
  534. try:
  535. callback(name)
  536. except RPCError as e:
  537. results.append(
  538. {'name':process.config.name,
  539. 'group':group.config.name,
  540. 'status':e.code,
  541. 'description':e.text})
  542. else:
  543. results.append(
  544. {'name':process.config.name,
  545. 'group':group.config.name,
  546. 'status':Faults.SUCCESS,
  547. 'description':'OK'}
  548. )
  549. if callbacks:
  550. return NOT_DONE_YET
  551. return results
  552. clearall.delay = 0.05
  553. clearall.rpcinterface = self
  554. return clearall # deferred
  555. def sendProcessStdin(self, name, chars):
  556. """ Send a string of chars to the stdin of the process name.
  557. If non-7-bit data is sent (unicode), it is encoded to utf-8
  558. before being sent to the process' stdin. If chars is not a
  559. string or is not unicode, raise INCORRECT_PARAMETERS. If the
  560. process is not running, raise NOT_RUNNING. If the process'
  561. stdin cannot accept input (e.g. it was closed by the child
  562. process), raise NO_FILE.
  563. @param string name The process name to send to (or 'group:name')
  564. @param string chars The character data to send to the process
  565. @return boolean result Always return True unless error
  566. """
  567. self._update('sendProcessStdin')
  568. if isinstance(chars, unicode):
  569. chars = chars.encode('utf-8')
  570. if not isinstance(chars, basestring):
  571. raise RPCError(Faults.INCORRECT_PARAMETERS, chars)
  572. group, process = self._getGroupAndProcess(name)
  573. if process is None:
  574. raise RPCError(Faults.BAD_NAME, name)
  575. if not process.pid or process.killing:
  576. raise RPCError(Faults.NOT_RUNNING, name)
  577. try:
  578. process.write(chars)
  579. except OSError as why:
  580. if why.args[0] == errno.EPIPE:
  581. raise RPCError(Faults.NO_FILE, name)
  582. else:
  583. raise
  584. return True
  585. def sendRemoteCommEvent(self, type, data):
  586. """ Send an event that will be received by event listener
  587. subprocesses subscribing to the RemoteCommunicationEvent.
  588. @param string type String for the "type" key in the event header
  589. @param string data Data for the event body
  590. @return boolean Always return True unless error
  591. """
  592. if isinstance(type, unicode):
  593. type = type.encode('utf-8')
  594. if isinstance(data, unicode):
  595. data = data.encode('utf-8')
  596. notify(
  597. RemoteCommunicationEvent(type, data)
  598. )
  599. return True
  600. def make_allfunc(processes, predicate, func, **extra_kwargs):
  601. """ Return a closure representing a function that calls a
  602. function for every process, and returns a result """
  603. callbacks = []
  604. results = []
  605. def allfunc(processes=processes, predicate=predicate, func=func,
  606. extra_kwargs=extra_kwargs, callbacks=callbacks,
  607. results=results):
  608. if not callbacks:
  609. for group, process in processes:
  610. name = make_namespec(group.config.name, process.config.name)
  611. if predicate(process):
  612. try:
  613. callback = func(name, **extra_kwargs)
  614. callbacks.append((group, process, callback))
  615. except RPCError as e:
  616. results.append({'name':process.config.name,
  617. 'group':group.config.name,
  618. 'status':e.code,
  619. 'description':e.text})
  620. continue
  621. if not callbacks:
  622. return results
  623. group, process, callback = callbacks.pop(0)
  624. try:
  625. value = callback()
  626. except RPCError as e:
  627. results.append(
  628. {'name':process.config.name,
  629. 'group':group.config.name,
  630. 'status':e.code,
  631. 'description':e.text})
  632. return NOT_DONE_YET
  633. if value is NOT_DONE_YET:
  634. # push it back into the queue; it will finish eventually
  635. callbacks.append((group, process, callback))
  636. else:
  637. results.append(
  638. {'name':process.config.name,
  639. 'group':group.config.name,
  640. 'status':Faults.SUCCESS,
  641. 'description':'OK'}
  642. )
  643. if callbacks:
  644. return NOT_DONE_YET
  645. return results
  646. # XXX the above implementation has a weakness inasmuch as the
  647. # first call into each individual process callback will always
  648. # return NOT_DONE_YET, so they need to be called twice. The
  649. # symptom of this is that calling this method causes the
  650. # client to block for much longer than it actually requires to
  651. # kill all of the running processes. After the first call to
  652. # the killit callback, the process is actually dead, but the
  653. # above killall method processes the callbacks one at a time
  654. # during the select loop, which, because there is no output
  655. # from child processes after e.g. stopAllProcesses is called,
  656. # is not busy, so hits the timeout for each callback. I
  657. # attempted to make this better, but the only way to make it
  658. # better assumes totally synchronous reaping of child
  659. # processes, which requires infrastructure changes to
  660. # supervisord that are scary at the moment as it could take a
  661. # while to pin down all of the platform differences and might
  662. # require a C extension to the Python signal module to allow
  663. # the setting of ignore flags to signals.
  664. return allfunc
  665. def isRunning(process):
  666. if process.get_state() in RUNNING_STATES:
  667. return True
  668. def isNotRunning(process):
  669. return not isRunning(process)
  670. # this is not used in code but referenced via an entry point in the conf file
  671. def make_main_rpcinterface(supervisord):
  672. return SupervisorNamespaceRPCInterface(supervisord)