rpc.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. import xmlrpclib
  2. import os
  3. from supervisord import ProcessStates
  4. from supervisord import SupervisorStates
  5. from supervisord import getSupervisorStateDescription
  6. import doctags
  7. import signal
  8. import time
  9. from medusa.xmlrpc_handler import xmlrpc_handler
  10. from medusa.http_server import get_header
  11. from medusa import producers
  12. import sys
  13. import types
  14. import re
  15. import traceback
  16. import StringIO
  17. import tempfile
  18. import errno
  19. from http import NOT_DONE_YET
  20. RPC_VERSION = 1.0
  21. FAULTS = {
  22. 'SUPER_READ_NO_FILE':1000,
  23. 'SUPER_READ_BAD_ARGUMENTS':1001,
  24. 'SUPER_CLEAR_FAILED':1010,
  25. 'SUPER_CLEAR_NO_FILE':1011,
  26. 'START_BAD_NAME':2002,
  27. 'START_ABNORMAL_TERMINATION':2003,
  28. 'START_SPAWN_ERROR':2004,
  29. 'START_ALREADY_STARTED':2006,
  30. 'STOP_BAD_NAME':2010,
  31. 'STOP_UNSUCCESSFUL':2011,
  32. 'STOP_NOT_RUNNING':2012,
  33. 'INFO_BAD_NAME':2020,
  34. 'READ_BAD_NAME':2040,
  35. 'READ_BAD_ARGUMENTS':2041,
  36. 'READ_NO_FILE':2042,
  37. 'CLEAR_BAD_NAME':2050,
  38. 'CLEAR_FAILED':2051,
  39. 'UNKNOWN_METHOD':1,
  40. 'INCORRECT_PARAMETERS':2,
  41. 'SIGNATURE_UNSUPPORTED':4,
  42. 'SHUTDOWN_STATE':6,
  43. }
  44. RFAULTS = dict([(v, k) for k, v in FAULTS.items()])
  45. class RPCError(Exception):
  46. def __init__(self, text, code=None):
  47. self.text = text
  48. if code is None:
  49. self.code = FAULTS[text]
  50. else:
  51. self.code = code
  52. class DeferredXMLRPCResponse:
  53. """ A medusa producer that implements a deferred callback; requires
  54. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  55. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  56. def __init__(self, request, callback):
  57. self.callback = callback
  58. self.request = request
  59. self.finished = False
  60. self.delay = float(callback.delay)
  61. def more(self):
  62. if self.finished:
  63. return ''
  64. try:
  65. try:
  66. value = self.callback()
  67. if value is NOT_DONE_YET:
  68. return NOT_DONE_YET
  69. except RPCError, err:
  70. value = xmlrpclib.Fault(err.code, err.text)
  71. body = xmlrpc_marshal(value)
  72. self.finished = True
  73. return self.getresponse(body)
  74. except:
  75. # report unexpected exception back to server
  76. traceback.print_exc()
  77. self.finished = True
  78. self.request.error(500)
  79. def getresponse(self, body):
  80. self.request['Content-Type'] = 'text/xml'
  81. self.request['Content-Length'] = len(body)
  82. self.request.push(body)
  83. connection = get_header(self.CONNECTION, self.request.header)
  84. close_it = 0
  85. wrap_in_chunking = 0
  86. if self.request.version == '1.0':
  87. if connection == 'keep-alive':
  88. if not self.request.has_key ('Content-Length'):
  89. close_it = 1
  90. else:
  91. self.request['Connection'] = 'Keep-Alive'
  92. else:
  93. close_it = 1
  94. elif self.request.version == '1.1':
  95. if connection == 'close':
  96. close_it = 1
  97. elif not self.request.has_key ('Content-Length'):
  98. if self.request.has_key ('Transfer-Encoding'):
  99. if not self.request['Transfer-Encoding'] == 'chunked':
  100. close_it = 1
  101. elif self.request.use_chunked:
  102. self.request['Transfer-Encoding'] = 'chunked'
  103. wrap_in_chunking = 1
  104. else:
  105. close_it = 1
  106. elif self.request.version is None:
  107. close_it = 1
  108. outgoing_header = producers.simple_producer (
  109. self.request.build_reply_header())
  110. if close_it:
  111. self.request['Connection'] = 'close'
  112. if wrap_in_chunking:
  113. outgoing_producer = producers.chunked_producer (
  114. producers.composite_producer (self.request.outgoing)
  115. )
  116. # prepend the header
  117. outgoing_producer = producers.composite_producer(
  118. [outgoing_header, outgoing_producer]
  119. )
  120. else:
  121. # prepend the header
  122. self.request.outgoing.insert(0, outgoing_header)
  123. outgoing_producer = producers.composite_producer (
  124. self.request.outgoing)
  125. # apply a few final transformations to the output
  126. self.request.channel.push_with_producer (
  127. # globbing gives us large packets
  128. producers.globbing_producer (
  129. # hooking lets us log the number of bytes sent
  130. producers.hooked_producer (
  131. outgoing_producer,
  132. self.request.log
  133. )
  134. )
  135. )
  136. self.request.channel.current_request = None
  137. if close_it:
  138. self.request.channel.close_when_done()
  139. def xmlrpc_marshal(value):
  140. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  141. if ismethodresponse:
  142. if not isinstance(value, tuple):
  143. value = (value,)
  144. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  145. else:
  146. body = xmlrpclib.dumps(value)
  147. return body
  148. class SupervisorNamespaceRPCInterface:
  149. def __init__(self, supervisord):
  150. self.supervisord = supervisord
  151. self.cache = {}
  152. self._update('__init__')
  153. if not self.supervisord.options.nocleanup:
  154. self._clear_childlogdir()
  155. def _update(self, text):
  156. self.update_text = text # for unit tests, mainly
  157. state = self.supervisord.get_state()
  158. if state == SupervisorStates.SHUTDOWN:
  159. raise RPCError('SHUTDOWN_STATE')
  160. def _clear_childlogdir(self):
  161. options = self.supervisord.options
  162. childlogdir = options.childlogdir
  163. fnre = re.compile(r'.+?---\d{1,11}-%s-\S+?\.xlog' % options.identifier)
  164. try:
  165. filenames = os.listdir(childlogdir)
  166. except (IOError, OSError):
  167. options.logger.info('Could not clear childlog dir')
  168. return
  169. for filename in filenames:
  170. if fnre.match(filename):
  171. pathname = os.path.join(childlogdir, filename)
  172. try:
  173. os.remove(pathname)
  174. except (os.error, IOError):
  175. options.logger.info('Failed to clean up %r' % pathname)
  176. def _readFile(self, filename, offset, length):
  177. """ Read length bytes from the file named by filename starting at
  178. offset """
  179. absoffset = abs(offset)
  180. abslength = abs(length)
  181. try:
  182. f = open(filename, 'rb')
  183. if absoffset != offset:
  184. # negative offset returns offset bytes from tail of the file
  185. if length:
  186. raise ValueError('BAD_ARGUMENTS')
  187. f.seek(0, 2)
  188. sz = f.tell()
  189. pos = int(sz - absoffset)
  190. f.seek(pos)
  191. data = f.read(absoffset)
  192. else:
  193. if abslength != length:
  194. raise ValueError('BAD_ARGUMENTS')
  195. if length == 0:
  196. f.seek(offset)
  197. data = f.read()
  198. else:
  199. sz = f.seek(offset)
  200. data = f.read(length)
  201. except (os.error, IOError):
  202. # XXX don't set fatal state, bad offset or length
  203. # shouldn't cause fatal
  204. #msg = 'FATAL: failed reading log file %r' % filename
  205. raise ValueError('FAILED')
  206. return data
  207. # RPC API methods
  208. def getVersion(self):
  209. """ Return the version of the RPC API used by supervisord
  210. @return int version version id
  211. """
  212. self._update('getVersion')
  213. return RPC_VERSION
  214. def getIdentification(self):
  215. """ Return identifiying string of supervisord
  216. @return string identifier identifying string
  217. """
  218. self._update('getIdentification')
  219. return self.supervisord.options.identifier
  220. def getState(self):
  221. """ Return current state of supervisord as a struct
  222. @return struct A struct with keys string statecode, int statename
  223. """
  224. self._update('getState')
  225. state = self.supervisord.get_state()
  226. statename = getSupervisorStateDescription(state)
  227. data = {
  228. 'statecode':state,
  229. 'statename':statename,
  230. }
  231. return data
  232. def readLog(self, offset, length):
  233. """ Read length bytes from the main log starting at offset.
  234. @param int offset offset to start reading from.
  235. @param int length number of bytes to read from the log.
  236. @return struct data a struct with keys 'log' (value of 'log' is string).
  237. """
  238. self._update('readLog')
  239. logfile = self.supervisord.options.logfile
  240. if logfile is None or not os.path.exists(logfile):
  241. raise RPCError('SUPER_READ_NO_FILE')
  242. try:
  243. return self._readFile(logfile, offset, length)
  244. except ValueError, inst:
  245. why = inst.args[0]
  246. raise RPCError('SUPER_READ_' + why)
  247. def clearLog(self):
  248. """ Clear the main log.
  249. @return boolean result always returns True unless error
  250. """
  251. self._update('clearLog')
  252. logfile = self.supervisord.options.logfile
  253. if logfile is None or not os.path.exists(logfile):
  254. raise RPCError('SUPER_CLEAR_NO_FILE')
  255. try:
  256. os.remove(logfile) # there is a race condition here, but ignore it.
  257. except (os.error, IOError):
  258. raise RPCError('SUPER_CLEAR_FAILED')
  259. for handler in self.supervisord.options.logger.handlers:
  260. if hasattr(handler, 'reopen'):
  261. self.supervisord.options.logger.info('reopening log file')
  262. handler.reopen()
  263. return True
  264. def shutdown(self):
  265. """ Shut down the supervisor process
  266. @return boolean result always returns True unless error
  267. """
  268. # we don't want to call _update here.
  269. if self.supervisord.get_state() == SupervisorStates.SHUTDOWN:
  270. raise RPCError('SHUTDOWN_STATE')
  271. self.supervisord.mood = -1
  272. return True
  273. def restart(self):
  274. """ Restart the supervisor process
  275. @return boolean result always return True unless error
  276. """
  277. if self.supervisord.get_state() == SupervisorStates.SHUTDOWN:
  278. raise RPCError('SHUTDOWN_STATE')
  279. self.supervisord.mood = 0
  280. return True
  281. def startProcess(self, name, timeout=500):
  282. """ Start a process
  283. @param string name Process name
  284. @param int timeout Number of milliseconds to wait for process start
  285. @return boolean result Always true unless error
  286. """
  287. self._update('startProcess')
  288. processes = self.supervisord.processes
  289. process = processes.get(name)
  290. if process is None:
  291. raise RPCError('START_BAD_NAME')
  292. if process.pid:
  293. raise RPCError('START_ALREADY_STARTED')
  294. process.spawn()
  295. if process.spawnerr:
  296. raise RPCError('START_SPAWN_ERROR')
  297. if not timeout:
  298. return True
  299. milliseconds = timeout / 1000.0
  300. start = time.time()
  301. def check_still_running(done=False): # done arg is only for unit testing
  302. t = time.time()
  303. runtime = (t - start)
  304. if not done and runtime < milliseconds:
  305. return NOT_DONE_YET
  306. pid = processes[name].pid
  307. if pid:
  308. return True
  309. raise RPCError('START_ABNORMAL_TERMINATION')
  310. check_still_running.delay = milliseconds
  311. check_still_running.rpcinterface = self
  312. return check_still_running # deferred
  313. def startAllProcesses(self, timeout=500):
  314. """ Start all processes listed in the configuration file
  315. @param int timeout Number of milliseconds to wait for each process start
  316. @return boolean result Always true unless error
  317. """
  318. self._update('startAllProcesses')
  319. processes = self.supervisord.processes
  320. callbacks = []
  321. processnames = processes.keys()
  322. processnames.sort()
  323. for processname in processnames:
  324. process = processes[processname]
  325. if process.get_state() != ProcessStates.RUNNING:
  326. # only start nonrunning processes
  327. callbacks.append(self.startProcess(processname, timeout))
  328. def startall(done=False): # done arg is for unit testing
  329. if not callbacks:
  330. return True
  331. callback = callbacks.pop(0)
  332. value = callback(done)
  333. if value is NOT_DONE_YET:
  334. # push it back into the queue; it will finish eventually
  335. callbacks.append(callback)
  336. if callbacks:
  337. return NOT_DONE_YET
  338. return True
  339. # XXX the above implementation has a weakness inasmuch as the
  340. # first call into each individual process callback will always
  341. # return NOT_DONE_YET, so they need to be called twice. The
  342. # symptom of this is that calling this method causes the
  343. # client to block for much longer than it actually requires to
  344. # start all of the nonrunning processes. See stopAllProcesses
  345. startall.delay = 0.05
  346. startall.rpcinterface = self
  347. return startall # deferred
  348. def stopProcess(self, name):
  349. """ Stop a process named by name
  350. @param string name The name of the process to stop
  351. @return boolean result Always return True unless error
  352. """
  353. self._update('stopProcess')
  354. process = self.supervisord.processes.get(name)
  355. if process is None:
  356. raise RPCError('STOP_BAD_NAME')
  357. if process.get_state() != ProcessStates.RUNNING:
  358. raise RPCEror('STOP_NOT_RUNNING')
  359. def killit():
  360. if process.killing:
  361. return NOT_DONE_YET
  362. elif process.pid:
  363. msg = process.stop()
  364. if msg is not None:
  365. raise RPCError('STOP_UNSUCCESSFUL')
  366. return NOT_DONE_YET
  367. else:
  368. return True
  369. killit.delay = 0.2
  370. killit.rpcinterface = self
  371. return killit # deferred
  372. def stopAllProcesses(self):
  373. """ Stop all processes in the process list
  374. @return boolean result Always return true unless error.
  375. """
  376. self._update('stopAllProcesses')
  377. processes = self.supervisord.processes
  378. callbacks = []
  379. processnames = processes.keys()
  380. processnames.sort()
  381. for processname in processnames:
  382. process = processes[processname]
  383. if process.pid:
  384. # only stop running processes
  385. callbacks.append(self.stopProcess(processname))
  386. def killall():
  387. if not callbacks:
  388. return True
  389. callback = callbacks.pop(0)
  390. try:
  391. value = callback()
  392. except RPCError, inst:
  393. if inst.text == 'STOP_UNSUCCESSFUL':
  394. raise RPCError('STOP_ALL_UNSUCCESSFUL')
  395. if value is NOT_DONE_YET:
  396. # push it back into the queue; it will finish eventually
  397. callbacks.append(callback)
  398. if callbacks:
  399. return NOT_DONE_YET
  400. return True
  401. # XXX the above implementation has a weakness inasmuch as the
  402. # first call into each individual process callback will always
  403. # return NOT_DONE_YET, so they need to be called twice. The
  404. # symptom of this is that calling this method causes the
  405. # client to block for much longer than it actually requires to
  406. # kill all of the running processes. After the first call to
  407. # the killit callback, the process is actually dead, but the
  408. # above killall method processes the callbacks one at a time
  409. # during the select loop, which, because there is no output
  410. # from child processes after stopAllProcesses is called, is
  411. # not busy, so hits the timeout for each callback. I
  412. # attempted to make this better, but the only way to make it
  413. # better assumes totally synchronous reaping of child
  414. # processes, which requires infrastructure changes to
  415. # supervisord that are scary at the moment as it could take a
  416. # while to pin down all of the platform differences and might
  417. # require a C extension to the Python signal module to allow
  418. # the setting of ignore flags to signals.
  419. killall.delay = 0.05
  420. killall.rpcinterface = self
  421. return killall # deferred
  422. def getProcessInfo(self, name):
  423. """ Get info about a process named name
  424. @param string name The name of the process
  425. @return struct result A structure containing data about the process
  426. """
  427. self._update('getProcessInfo')
  428. process = self.supervisord.processes.get(name)
  429. if process is None:
  430. raise RPCError('INFO_BAD_NAME')
  431. start = int(process.laststart)
  432. stop = int(process.laststop)
  433. now = int(time.time())
  434. state = process.get_state()
  435. spawnerr = process.spawnerr or ''
  436. exitstatus = process.exitstatus or 0
  437. reportstatusmsg = process.reportstatusmsg or ''
  438. return {
  439. 'name':name,
  440. 'start':start,
  441. 'stop':stop,
  442. 'now':now,
  443. 'state':state,
  444. 'spawnerr':spawnerr,
  445. 'exitstatus':exitstatus,
  446. 'reportstatusmsg':reportstatusmsg,
  447. 'logfile':process.config.logfile,
  448. 'pid':process.pid
  449. }
  450. def getAllProcessInfo(self):
  451. """ Get info about all processes
  452. @return array result An array of process status results
  453. """
  454. self._update('getAllProcessInfo')
  455. processnames = self.supervisord.processes.keys()
  456. processnames.sort()
  457. output = []
  458. for processname in processnames:
  459. output.append(self.getProcessInfo(processname))
  460. return output
  461. def readProcessLog(self, processName, offset, length):
  462. """ Read length bytes from processName's log starting at offset
  463. @param string processName The name of the process
  464. @param int offset offset to start reading from.
  465. @param int length number of bytes to read from the log.
  466. @return string result Bytes of log
  467. """
  468. self._update('readProcessLog')
  469. process = self.supervisord.processes.get(processName)
  470. if process is None:
  471. raise RPCError('READ_BAD_NAME')
  472. logfile = process.config.logfile
  473. if logfile is None or not os.path.exists(logfile):
  474. # XXX problematic: processes that don't start won't have a log
  475. # file and we probably don't want to go into fatal state if we try
  476. # to read the log of a process that did not start.
  477. raise RPCError('READ_NO_FILE')
  478. try:
  479. return self._readFile(logfile, offset, length)
  480. except ValueError, inst:
  481. why = inst.args[0]
  482. raise RPCError('READ_' + why)
  483. def clearProcessLog(self, processName):
  484. """ Clear the log for processName and reopen it
  485. @param string processName The name of the process
  486. @return boolean result Always True unless error
  487. """
  488. self._update('clearProcessLog')
  489. process = self.supervisord.processes.get(processName)
  490. if process is None:
  491. raise RPCError('CLEAR_BAD_NAME')
  492. try:
  493. # implies a reopen
  494. process.removelogs()
  495. except (IOError, os.error):
  496. msg = 'FATAL: failed clearing log file %r' % logfile
  497. raise RPCError('CLEAR_FAILED')
  498. return True
  499. def _rotateMainLog(self):
  500. """ Rotate the main supervisord log (for debugging/testing) """
  501. self._update('_rotateMainLog')
  502. for handler in self.supervisord.options.logger.handlers:
  503. if hasattr(handler, 'doRollover'):
  504. handler.doRollover()
  505. return True
  506. class SystemNamespaceRPCInterface:
  507. def __init__(self, namespaces):
  508. self.namespaces = {}
  509. for name, inst in namespaces:
  510. self.namespaces[name] = inst
  511. self.namespaces['system'] = self
  512. def _listMethods(self):
  513. methods = {}
  514. for ns_name in self.namespaces:
  515. namespace = self.namespaces[ns_name]
  516. for method_name in namespace.__class__.__dict__:
  517. # introspect; any methods that don't start with underscore
  518. # are published
  519. func = getattr(namespace, method_name)
  520. meth = getattr(func, 'im_func', None)
  521. if meth is not None:
  522. if not method_name.startswith('_'):
  523. sig = '%s.%s' % (ns_name, method_name)
  524. methods[sig] = str(func.__doc__)
  525. return methods
  526. def listMethods(self):
  527. """ Return an array listing the available method names
  528. @return array result An array of method names available (strings).
  529. """
  530. methods = self._listMethods()
  531. keys = methods.keys()
  532. keys.sort()
  533. return keys
  534. def methodHelp(self, name):
  535. """ Return a string showing the method's documentation
  536. @param string name The name of the method.
  537. @return string result The documentation for the method name.
  538. """
  539. methods = self._listMethods()
  540. for methodname in methods.keys():
  541. if methodname == name:
  542. return methods[methodname]
  543. raise RPCError('SIGNATURE_UNSUPPORTED')
  544. def methodSignature(self, name):
  545. """ Return an array describing the method signature in the
  546. form [rtype, ptype, ptype...] where rtype is the return data type
  547. of the method, and ptypes are the parameter data types that the
  548. method accepts in method argument order.
  549. @param string name The name of the method.
  550. @return array result The result.
  551. """
  552. methods = self._listMethods()
  553. L = []
  554. for method in methods:
  555. if method == name:
  556. rtype = None
  557. ptypes = []
  558. parsed = doctags.gettags(methods[method])
  559. for thing in parsed:
  560. if thing[1] == 'return': # tag name
  561. rtype = thing[2] # datatype
  562. elif thing[1] == 'param': # tag name
  563. ptypes.append(thing[2]) # datatype
  564. if rtype is None:
  565. raise RPCError('SIGNATURE_UNSUPPORTED')
  566. return [rtype] + ptypes
  567. raise RPCError('SIGNATURE_UNSUPPORTED')
  568. def multicall(self, calls):
  569. """Process an array of calls, and return an array of
  570. results. Calls should be structs of the form {'methodName':
  571. string, 'params': array}. Each result will either be a
  572. single-item array containg the result value, or a struct of
  573. the form {'faultCode': int, 'faultString': string}. This is
  574. useful when you need to make lots of small calls without lots
  575. of round trips.
  576. @param array calls An array of call requests
  577. @return array result An array of results
  578. """
  579. producers = []
  580. for call in calls:
  581. try:
  582. name = call['methodName']
  583. params = call.get('params', [])
  584. if name == 'system.multicall':
  585. # Recursive system.multicall forbidden
  586. error = 'INCORRECT_PARAMETERS'
  587. raise xmlrpclib.Fault(FAULTS[error], error)
  588. root = AttrDict(self.namespaces)
  589. value = traverse(root, name, params)
  590. except RPCError, inst:
  591. value = {'faultCode': inst.code,
  592. 'faultString': inst.text}
  593. except:
  594. errmsg = "%s:%s" % (sys.exc_type, sys.exc_value)
  595. value = {'faultCode': 1, 'faultString': errmsg}
  596. producers.append(value)
  597. results = []
  598. def multiproduce():
  599. """ Run through all the producers in order """
  600. if not producers:
  601. return []
  602. callback = producers.pop(0)
  603. if isinstance(callback, types.FunctionType):
  604. try:
  605. value = callback()
  606. except RPCError, inst:
  607. value = {'faultCode':inst.code, 'faultString':inst.text}
  608. if value is NOT_DONE_YET:
  609. # push it back in the front of the queue because we
  610. # need to finish the calls in requested order
  611. producers.insert(0, callback)
  612. return NOT_DONE_YET
  613. else:
  614. value = callback
  615. results.append(value)
  616. if producers:
  617. # only finish when all producers are finished
  618. return NOT_DONE_YET
  619. return results
  620. multiproduce.delay = .05
  621. return multiproduce
  622. class AttrDict(dict):
  623. # hack to make a dict's getattr equivalent to its getitem
  624. def __getattr__(self, name):
  625. return self[name]
  626. class RPCInterface:
  627. def __init__(self, supervisord):
  628. self.supervisord = supervisord
  629. self.supervisor = SupervisorNamespaceRPCInterface(supervisord)
  630. self.system = SystemNamespaceRPCInterface(
  631. [('supervisor', self.supervisor)]
  632. )
  633. class supervisor_xmlrpc_handler(xmlrpc_handler):
  634. def __init__(self, supervisord):
  635. self.rpcinterface = RPCInterface(supervisord)
  636. self.supervisord = supervisord
  637. def continue_request (self, data, request):
  638. logger = self.supervisord.options.logger
  639. try:
  640. params, method = xmlrpclib.loads(data)
  641. try:
  642. # 5 is 'trace' level
  643. logger.log(5, 'XML-RPC method called: %s()' % method)
  644. value = self.call(method, params)
  645. logger.log(5, 'XML-RPC method %s() returned successfully' %
  646. method)
  647. except RPCError, err:
  648. # turn RPCError reported by method into a Fault instance
  649. value = xmlrpclib.Fault(err.code, err.text)
  650. logger.warn('XML-RPC method %s() returned fault: [%d] %s' % (
  651. method,
  652. err.code, err.text))
  653. if isinstance(value, types.FunctionType):
  654. # returning a function from an RPC method implies that
  655. # this needs to be a deferred response (it needs to block).
  656. pushproducer = request.channel.push_with_producer
  657. pushproducer(DeferredXMLRPCResponse(request, value))
  658. else:
  659. # if we get anything but a function, it implies that this
  660. # response doesn't need to be deferred, we can service it
  661. # right away.
  662. body = xmlrpc_marshal(value)
  663. request['Content-Type'] = 'text/xml'
  664. request['Content-Length'] = len(body)
  665. request.push(body)
  666. request.done()
  667. except:
  668. io = StringIO.StringIO()
  669. traceback.print_exc(file=io)
  670. val = io.getvalue()
  671. logger.critical(val)
  672. # internal error, report as HTTP server error
  673. request.error(500)
  674. def call(self, method, params):
  675. return traverse(self.rpcinterface, method, params)
  676. def traverse(ob, method, params):
  677. path = method.split('.')
  678. for name in path:
  679. if name.startswith('_'):
  680. # security (don't allow things like _readFile to be called
  681. # remotely)
  682. raise RPCError('UNKNOWN_METHOD')
  683. ob = getattr(ob, name, None)
  684. if ob is None:
  685. raise RPCError('UNKNOWN_METHOD')
  686. try:
  687. return ob(*params)
  688. except TypeError:
  689. raise RPCError('INCORRECT_PARAMETERS')