rpc.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846
  1. import xmlrpclib
  2. import os
  3. from supervisord import ProcessStates
  4. from supervisord import SupervisorStates
  5. from supervisord import getSupervisorStateDescription
  6. import doctags
  7. import signal
  8. import time
  9. from medusa.xmlrpc_handler import xmlrpc_handler
  10. from medusa.http_server import get_header
  11. from medusa import producers
  12. import sys
  13. import types
  14. import re
  15. import traceback
  16. import StringIO
  17. import tempfile
  18. import errno
  19. from http import NOT_DONE_YET
  20. from options import readFile
  21. RPC_VERSION = 1.0
  22. class Faults:
  23. UNKNOWN_METHOD = 1
  24. INCORRECT_PARAMETERS = 2
  25. BAD_ARGUMENTS = 3
  26. SIGNATURE_UNSUPPORTED = 4
  27. SHUTDOWN_STATE = 6
  28. BAD_NAME = 10
  29. NO_FILE = 20
  30. FAILED = 30
  31. ABNORMAL_TERMINATION = 40
  32. SPAWN_ERROR = 50
  33. ALREADY_STARTED = 60
  34. NOT_RUNNING = 70
  35. SUCCESS = 80
  36. def getFaultDescription(code):
  37. for faultname in Faults.__dict__:
  38. if getattr(Faults, faultname) == code:
  39. return faultname
  40. return 'UNKNOWN'
  41. class RPCError(Exception):
  42. def __init__(self, code, extra=None):
  43. self.code = code
  44. self.text = getFaultDescription(code)
  45. if extra is not None:
  46. self.text = '%s: %s' % (self.text, extra)
  47. class DeferredXMLRPCResponse:
  48. """ A medusa producer that implements a deferred callback; requires
  49. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  50. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  51. def __init__(self, request, callback):
  52. self.callback = callback
  53. self.request = request
  54. self.finished = False
  55. self.delay = float(callback.delay)
  56. def more(self):
  57. if self.finished:
  58. return ''
  59. try:
  60. try:
  61. value = self.callback()
  62. if value is NOT_DONE_YET:
  63. return NOT_DONE_YET
  64. except RPCError, err:
  65. value = xmlrpclib.Fault(err.code, err.text)
  66. body = xmlrpc_marshal(value)
  67. self.finished = True
  68. return self.getresponse(body)
  69. except:
  70. # report unexpected exception back to server
  71. traceback.print_exc()
  72. self.finished = True
  73. self.request.error(500)
  74. def getresponse(self, body):
  75. self.request['Content-Type'] = 'text/xml'
  76. self.request['Content-Length'] = len(body)
  77. self.request.push(body)
  78. connection = get_header(self.CONNECTION, self.request.header)
  79. close_it = 0
  80. wrap_in_chunking = 0
  81. if self.request.version == '1.0':
  82. if connection == 'keep-alive':
  83. if not self.request.has_key ('Content-Length'):
  84. close_it = 1
  85. else:
  86. self.request['Connection'] = 'Keep-Alive'
  87. else:
  88. close_it = 1
  89. elif self.request.version == '1.1':
  90. if connection == 'close':
  91. close_it = 1
  92. elif not self.request.has_key ('Content-Length'):
  93. if self.request.has_key ('Transfer-Encoding'):
  94. if not self.request['Transfer-Encoding'] == 'chunked':
  95. close_it = 1
  96. elif self.request.use_chunked:
  97. self.request['Transfer-Encoding'] = 'chunked'
  98. wrap_in_chunking = 1
  99. else:
  100. close_it = 1
  101. elif self.request.version is None:
  102. close_it = 1
  103. outgoing_header = producers.simple_producer (
  104. self.request.build_reply_header())
  105. if close_it:
  106. self.request['Connection'] = 'close'
  107. if wrap_in_chunking:
  108. outgoing_producer = producers.chunked_producer (
  109. producers.composite_producer (self.request.outgoing)
  110. )
  111. # prepend the header
  112. outgoing_producer = producers.composite_producer(
  113. [outgoing_header, outgoing_producer]
  114. )
  115. else:
  116. # prepend the header
  117. self.request.outgoing.insert(0, outgoing_header)
  118. outgoing_producer = producers.composite_producer (
  119. self.request.outgoing)
  120. # apply a few final transformations to the output
  121. self.request.channel.push_with_producer (
  122. # globbing gives us large packets
  123. producers.globbing_producer (
  124. # hooking lets us log the number of bytes sent
  125. producers.hooked_producer (
  126. outgoing_producer,
  127. self.request.log
  128. )
  129. )
  130. )
  131. self.request.channel.current_request = None
  132. if close_it:
  133. self.request.channel.close_when_done()
  134. def xmlrpc_marshal(value):
  135. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  136. if ismethodresponse:
  137. if not isinstance(value, tuple):
  138. value = (value,)
  139. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  140. else:
  141. body = xmlrpclib.dumps(value)
  142. return body
  143. class SupervisorNamespaceRPCInterface:
  144. def __init__(self, supervisord):
  145. self.supervisord = supervisord
  146. def _update(self, text):
  147. self.update_text = text # for unit tests, mainly
  148. state = self.supervisord.get_state()
  149. if state == SupervisorStates.SHUTDOWN:
  150. raise RPCError(Faults.SHUTDOWN_STATE)
  151. # RPC API methods
  152. def getVersion(self):
  153. """ Return the version of the RPC API used by supervisord
  154. @return int version version id
  155. """
  156. self._update('getVersion')
  157. return RPC_VERSION
  158. def getIdentification(self):
  159. """ Return identifiying string of supervisord
  160. @return string identifier identifying string
  161. """
  162. self._update('getIdentification')
  163. return self.supervisord.options.identifier
  164. def getState(self):
  165. """ Return current state of supervisord as a struct
  166. @return struct A struct with keys string statecode, int statename
  167. """
  168. self._update('getState')
  169. state = self.supervisord.get_state()
  170. statename = getSupervisorStateDescription(state)
  171. data = {
  172. 'statecode':state,
  173. 'statename':statename,
  174. }
  175. return data
  176. def readLog(self, offset, length):
  177. """ Read length bytes from the main log starting at offset.
  178. @param int offset offset to start reading from.
  179. @param int length number of bytes to read from the log.
  180. @return struct data a struct with keys 'log' (value of 'log' is string).
  181. """
  182. self._update('readLog')
  183. logfile = self.supervisord.options.logfile
  184. if logfile is None or not os.path.exists(logfile):
  185. raise RPCError(Faults.NO_FILE)
  186. try:
  187. return readFile(logfile, offset, length)
  188. except ValueError, inst:
  189. why = inst.args[0]
  190. raise RPCError(getattr(Faults, why))
  191. def clearLog(self):
  192. """ Clear the main log.
  193. @return boolean result always returns True unless error
  194. """
  195. self._update('clearLog')
  196. logfile = self.supervisord.options.logfile
  197. if logfile is None or not os.path.exists(logfile):
  198. raise RPCError(Faults.NO_FILE)
  199. try:
  200. os.remove(logfile) # there is a race condition here, but ignore it.
  201. except (os.error, IOError):
  202. raise RPCError(Faults.FAILED)
  203. for handler in self.supervisord.options.logger.handlers:
  204. if hasattr(handler, 'reopen'):
  205. self.supervisord.options.logger.info('reopening log file')
  206. handler.reopen()
  207. return True
  208. def shutdown(self):
  209. """ Shut down the supervisor process
  210. @return boolean result always returns True unless error
  211. """
  212. self._update('shutdown')
  213. self.supervisord.mood = -1
  214. return True
  215. def restart(self):
  216. """ Restart the supervisor process
  217. @return boolean result always return True unless error
  218. """
  219. self._update('restart')
  220. self.supervisord.mood = 0
  221. return True
  222. def startProcess(self, name, timeout=500):
  223. """ Start a process
  224. @param string name Process name
  225. @param int timeout Number of milliseconds to wait for process start
  226. @return boolean result Always true unless error
  227. """
  228. self._update('startProcess')
  229. processes = self.supervisord.processes
  230. process = processes.get(name)
  231. try:
  232. timeout = int(timeout)
  233. except:
  234. raise RPCError(Faults.BAD_ARGUMENTS, 'timeout: %s' % timeout)
  235. if process is None:
  236. raise RPCError(Faults.BAD_NAME, name)
  237. if process.pid:
  238. raise RPCError(Faults.ALREADY_STARTED, name)
  239. process.spawn()
  240. if process.spawnerr:
  241. raise RPCError(Faults.SPAWN_ERROR, name)
  242. if not timeout:
  243. timeout = 0
  244. milliseconds = timeout / 1000.0
  245. start = time.time()
  246. def check_still_running(done=False): # done arg is only for unit testing
  247. t = time.time()
  248. runtime = (t - start)
  249. if not done and runtime < milliseconds:
  250. return NOT_DONE_YET
  251. pid = processes[name].pid
  252. if pid:
  253. return True
  254. raise RPCError(Faults.ABNORMAL_TERMINATION, name)
  255. check_still_running.delay = milliseconds
  256. check_still_running.rpcinterface = self
  257. return check_still_running # deferred
  258. def startAllProcesses(self, timeout=500):
  259. """ Start all processes listed in the configuration file
  260. @param int timeout Number of milliseconds to wait for each process start
  261. @return struct result A structure containing start statuses
  262. """
  263. self._update('startAllProcesses')
  264. try:
  265. timeout = int(timeout)
  266. except:
  267. raise RPCError(Faults.BAD_ARGUMENTS, 'timeout: %s' % timeout)
  268. processes = self.supervisord.processes.values()
  269. processes.sort() # asc by priority
  270. results = []
  271. callbacks = []
  272. for process in processes:
  273. if process.get_state() != ProcessStates.RUNNING:
  274. # only start nonrunning processes
  275. try:
  276. callbacks.append((process.config.name,
  277. self.startProcess(process.config.name, timeout)))
  278. except RPCError, e:
  279. results.append({'name':process.config.name, 'status':e.code,
  280. 'description':e.text})
  281. continue
  282. def startall(done=False): # done arg is for unit testing
  283. if not callbacks:
  284. return results
  285. name, callback = callbacks.pop(0)
  286. try:
  287. value = callback(done)
  288. except RPCError, e:
  289. results.append({'name':name, 'status':e.code,
  290. 'description':e.text})
  291. return NOT_DONE_YET
  292. if value is NOT_DONE_YET:
  293. # push it back into the queue; it will finish eventually
  294. callbacks.append((name,callback))
  295. else:
  296. results.append({'name':name, 'status':Faults.SUCCESS,
  297. 'description':'OK'})
  298. if callbacks:
  299. return NOT_DONE_YET
  300. return results
  301. # XXX the above implementation has a weakness inasmuch as the
  302. # first call into each individual process callback will always
  303. # return NOT_DONE_YET, so they need to be called twice. The
  304. # symptom of this is that calling this method causes the
  305. # client to block for much longer than it actually requires to
  306. # start all of the nonrunning processes. See stopAllProcesses
  307. startall.delay = 0.05
  308. startall.rpcinterface = self
  309. return startall # deferred
  310. def stopProcess(self, name):
  311. """ Stop a process named by name
  312. @param string name The name of the process to stop
  313. @return boolean result Always return True unless error
  314. """
  315. self._update('stopProcess')
  316. process = self.supervisord.processes.get(name)
  317. if process is None:
  318. raise RPCError(Faults.BAD_NAME, name)
  319. if process.get_state() != ProcessStates.RUNNING:
  320. raise RPCError(Faults.NOT_RUNNING)
  321. def killit():
  322. if process.killing:
  323. return NOT_DONE_YET
  324. elif process.pid:
  325. msg = process.stop()
  326. if msg is not None:
  327. raise RPCError(Faults.FAILED, name)
  328. return NOT_DONE_YET
  329. else:
  330. return True
  331. killit.delay = 0.2
  332. killit.rpcinterface = self
  333. return killit # deferred
  334. def stopAllProcesses(self):
  335. """ Stop all processes in the process list
  336. @return boolean result Always return true unless error.
  337. """
  338. self._update('stopAllProcesses')
  339. processes = self.supervisord.processes.values()
  340. processes.sort()
  341. processes.reverse() # stop in reverse priority order
  342. callbacks = []
  343. results = []
  344. for process in processes:
  345. if process.get_state() == ProcessStates.RUNNING:
  346. # only stop running processes
  347. try:
  348. callbacks.append((process.config.name,
  349. self.stopProcess(process.config.name)))
  350. except RPCError, e:
  351. results.append({'name':name, 'status':e.code,
  352. 'description':e.text})
  353. continue
  354. def killall():
  355. if not callbacks:
  356. return results
  357. name, callback = callbacks.pop(0)
  358. try:
  359. value = callback()
  360. except RPCError, e:
  361. results.append({'name':name, 'status':e.code,
  362. 'description':e.text})
  363. return NOT_DONE_YET
  364. if value is NOT_DONE_YET:
  365. # push it back into the queue; it will finish eventually
  366. callbacks.append((name, callback))
  367. else:
  368. results.append({'name':name, 'status':Faults.SUCCESS,
  369. 'description':'OK'})
  370. if callbacks:
  371. return NOT_DONE_YET
  372. return results
  373. # XXX the above implementation has a weakness inasmuch as the
  374. # first call into each individual process callback will always
  375. # return NOT_DONE_YET, so they need to be called twice. The
  376. # symptom of this is that calling this method causes the
  377. # client to block for much longer than it actually requires to
  378. # kill all of the running processes. After the first call to
  379. # the killit callback, the process is actually dead, but the
  380. # above killall method processes the callbacks one at a time
  381. # during the select loop, which, because there is no output
  382. # from child processes after stopAllProcesses is called, is
  383. # not busy, so hits the timeout for each callback. I
  384. # attempted to make this better, but the only way to make it
  385. # better assumes totally synchronous reaping of child
  386. # processes, which requires infrastructure changes to
  387. # supervisord that are scary at the moment as it could take a
  388. # while to pin down all of the platform differences and might
  389. # require a C extension to the Python signal module to allow
  390. # the setting of ignore flags to signals.
  391. killall.delay = 0.05
  392. killall.rpcinterface = self
  393. return killall # deferred
  394. def getProcessInfo(self, name):
  395. """ Get info about a process named name
  396. @param string name The name of the process
  397. @return struct result A structure containing data about the process
  398. """
  399. self._update('getProcessInfo')
  400. process = self.supervisord.processes.get(name)
  401. if process is None:
  402. raise RPCError(Faults.BAD_NAME, name)
  403. start = int(process.laststart)
  404. stop = int(process.laststop)
  405. now = int(time.time())
  406. state = process.get_state()
  407. spawnerr = process.spawnerr or ''
  408. exitstatus = process.exitstatus or 0
  409. reportstatusmsg = process.reportstatusmsg or ''
  410. return {
  411. 'name':name,
  412. 'start':start,
  413. 'stop':stop,
  414. 'now':now,
  415. 'state':state,
  416. 'spawnerr':spawnerr,
  417. 'exitstatus':exitstatus,
  418. 'reportstatusmsg':reportstatusmsg,
  419. 'logfile':process.config.logfile,
  420. 'pid':process.pid
  421. }
  422. def getAllProcessInfo(self):
  423. """ Get info about all processes
  424. @return array result An array of process status results
  425. """
  426. self._update('getAllProcessInfo')
  427. processnames = self.supervisord.processes.keys()
  428. processnames.sort()
  429. output = []
  430. for processname in processnames:
  431. output.append(self.getProcessInfo(processname))
  432. return output
  433. def readProcessLog(self, name, offset, length):
  434. """ Read length bytes from processName's log starting at offset
  435. @param string name The name of the process
  436. @param int offset offset to start reading from.
  437. @param int length number of bytes to read from the log.
  438. @return string result Bytes of log
  439. """
  440. self._update('readProcessLog')
  441. process = self.supervisord.processes.get(name)
  442. if process is None:
  443. raise RPCError(Faults.BAD_NAME, name)
  444. logfile = process.config.logfile
  445. if logfile is None or not os.path.exists(logfile):
  446. raise RPCError(Faults.NO_FILE, logfile)
  447. try:
  448. return readFile(logfile, offset, length)
  449. except ValueError, inst:
  450. why = inst.args[0]
  451. raise RPCError(getattr(Faults, why))
  452. def clearProcessLog(self, name):
  453. """ Clear the log for processName and reopen it
  454. @param string name The name of the process
  455. @return boolean result Always True unless error
  456. """
  457. self._update('clearProcessLog')
  458. process = self.supervisord.processes.get(name)
  459. if process is None:
  460. raise RPCError(Faults.BAD_NAME, name)
  461. try:
  462. # implies a reopen
  463. process.removelogs()
  464. except (IOError, os.error):
  465. raise RPCError(Faults.FAILED, name)
  466. return True
  467. def clearAllProcessLogs(self):
  468. """ Clear all process log files
  469. @return boolean result Always return true
  470. """
  471. self._update('clearAllProcessLogs')
  472. results = []
  473. callbacks = []
  474. processnames = self.supervisord.processes.keys()
  475. processnames.sort()
  476. for processname in processnames:
  477. callbacks.append((processname, self.clearProcessLog))
  478. def clearall():
  479. if not callbacks:
  480. return results
  481. name, callback = callbacks.pop(0)
  482. try:
  483. callback(name)
  484. except RPCError, e:
  485. results.append({'name':name, 'status':e.code,
  486. 'description':e.text})
  487. else:
  488. results.append({'name':name, 'status':Faults.SUCCESS,
  489. 'description':'OK'})
  490. if callbacks:
  491. return NOT_DONE_YET
  492. return results
  493. clearall.delay = 0.05
  494. clearall.rpcinterface = self
  495. return clearall # deferred
  496. def _rotateMainLog(self):
  497. """ Rotate the main supervisord log (for debugging/testing) """
  498. self._update('_rotateMainLog')
  499. for handler in self.supervisord.options.logger.handlers:
  500. if hasattr(handler, 'doRollover'):
  501. handler.doRollover()
  502. return True
  503. class SystemNamespaceRPCInterface:
  504. def __init__(self, namespaces):
  505. self.namespaces = {}
  506. for name, inst in namespaces:
  507. self.namespaces[name] = inst
  508. self.namespaces['system'] = self
  509. def _listMethods(self):
  510. methods = {}
  511. for ns_name in self.namespaces:
  512. namespace = self.namespaces[ns_name]
  513. for method_name in namespace.__class__.__dict__:
  514. # introspect; any methods that don't start with underscore
  515. # are published
  516. func = getattr(namespace, method_name)
  517. meth = getattr(func, 'im_func', None)
  518. if meth is not None:
  519. if not method_name.startswith('_'):
  520. sig = '%s.%s' % (ns_name, method_name)
  521. methods[sig] = str(func.__doc__)
  522. return methods
  523. def listMethods(self):
  524. """ Return an array listing the available method names
  525. @return array result An array of method names available (strings).
  526. """
  527. methods = self._listMethods()
  528. keys = methods.keys()
  529. keys.sort()
  530. return keys
  531. def methodHelp(self, name):
  532. """ Return a string showing the method's documentation
  533. @param string name The name of the method.
  534. @return string result The documentation for the method name.
  535. """
  536. methods = self._listMethods()
  537. for methodname in methods.keys():
  538. if methodname == name:
  539. return methods[methodname]
  540. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  541. def methodSignature(self, name):
  542. """ Return an array describing the method signature in the
  543. form [rtype, ptype, ptype...] where rtype is the return data type
  544. of the method, and ptypes are the parameter data types that the
  545. method accepts in method argument order.
  546. @param string name The name of the method.
  547. @return array result The result.
  548. """
  549. methods = self._listMethods()
  550. L = []
  551. for method in methods:
  552. if method == name:
  553. rtype = None
  554. ptypes = []
  555. parsed = doctags.gettags(methods[method])
  556. for thing in parsed:
  557. if thing[1] == 'return': # tag name
  558. rtype = thing[2] # datatype
  559. elif thing[1] == 'param': # tag name
  560. ptypes.append(thing[2]) # datatype
  561. if rtype is None:
  562. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  563. return [rtype] + ptypes
  564. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  565. def multicall(self, calls):
  566. """Process an array of calls, and return an array of
  567. results. Calls should be structs of the form {'methodName':
  568. string, 'params': array}. Each result will either be a
  569. single-item array containg the result value, or a struct of
  570. the form {'faultCode': int, 'faultString': string}. This is
  571. useful when you need to make lots of small calls without lots
  572. of round trips.
  573. @param array calls An array of call requests
  574. @return array result An array of results
  575. """
  576. producers = []
  577. for call in calls:
  578. try:
  579. name = call['methodName']
  580. params = call.get('params', [])
  581. if name == 'system.multicall':
  582. # Recursive system.multicall forbidden
  583. error = 'INCORRECT_PARAMETERS'
  584. raise xmlrpclib.Fault(Faults.INCORRECT_PARAMETERS,
  585. error)
  586. root = AttrDict(self.namespaces)
  587. value = traverse(root, name, params)
  588. except RPCError, inst:
  589. value = {'faultCode': inst.code,
  590. 'faultString': inst.text}
  591. except:
  592. errmsg = "%s:%s" % (sys.exc_type, sys.exc_value)
  593. value = {'faultCode': 1, 'faultString': errmsg}
  594. producers.append(value)
  595. results = []
  596. def multiproduce():
  597. """ Run through all the producers in order """
  598. if not producers:
  599. return []
  600. callback = producers.pop(0)
  601. if isinstance(callback, types.FunctionType):
  602. try:
  603. value = callback()
  604. except RPCError, inst:
  605. value = {'faultCode':inst.code, 'faultString':inst.text}
  606. if value is NOT_DONE_YET:
  607. # push it back in the front of the queue because we
  608. # need to finish the calls in requested order
  609. producers.insert(0, callback)
  610. return NOT_DONE_YET
  611. else:
  612. value = callback
  613. results.append(value)
  614. if producers:
  615. # only finish when all producers are finished
  616. return NOT_DONE_YET
  617. return results
  618. multiproduce.delay = .05
  619. return multiproduce
  620. class AttrDict(dict):
  621. # hack to make a dict's getattr equivalent to its getitem
  622. def __getattr__(self, name):
  623. return self[name]
  624. class RPCInterface:
  625. def __init__(self, supervisord):
  626. self.supervisord = supervisord
  627. self.supervisor = SupervisorNamespaceRPCInterface(supervisord)
  628. self.system = SystemNamespaceRPCInterface(
  629. [('supervisor', self.supervisor)]
  630. )
  631. class supervisor_xmlrpc_handler(xmlrpc_handler):
  632. def __init__(self, supervisord):
  633. self.rpcinterface = RPCInterface(supervisord)
  634. self.supervisord = supervisord
  635. def continue_request (self, data, request):
  636. logger = self.supervisord.options.logger
  637. try:
  638. params, method = xmlrpclib.loads(data)
  639. try:
  640. # 5 is 'trace' level
  641. logger.log(5, 'XML-RPC method called: %s()' % method)
  642. value = self.call(method, params)
  643. logger.log(5, 'XML-RPC method %s() returned successfully' %
  644. method)
  645. except RPCError, err:
  646. # turn RPCError reported by method into a Fault instance
  647. value = xmlrpclib.Fault(err.code, err.text)
  648. logger.warn('XML-RPC method %s() returned fault: [%d] %s' % (
  649. method,
  650. err.code, err.text))
  651. if isinstance(value, types.FunctionType):
  652. # returning a function from an RPC method implies that
  653. # this needs to be a deferred response (it needs to block).
  654. pushproducer = request.channel.push_with_producer
  655. pushproducer(DeferredXMLRPCResponse(request, value))
  656. else:
  657. # if we get anything but a function, it implies that this
  658. # response doesn't need to be deferred, we can service it
  659. # right away.
  660. body = xmlrpc_marshal(value)
  661. request['Content-Type'] = 'text/xml'
  662. request['Content-Length'] = len(body)
  663. request.push(body)
  664. request.done()
  665. except:
  666. io = StringIO.StringIO()
  667. traceback.print_exc(file=io)
  668. val = io.getvalue()
  669. logger.critical(val)
  670. # internal error, report as HTTP server error
  671. request.error(500)
  672. def call(self, method, params):
  673. return traverse(self.rpcinterface, method, params)
  674. def traverse(ob, method, params):
  675. path = method.split('.')
  676. for name in path:
  677. if name.startswith('_'):
  678. # security (don't allow things that start with an underscore to
  679. # be called remotely)
  680. raise RPCError(Faults.UNKNOWN_METHOD)
  681. ob = getattr(ob, name, None)
  682. if ob is None:
  683. raise RPCError(Faults.UNKNOWN_METHOD)
  684. try:
  685. return ob(*params)
  686. except TypeError:
  687. raise RPCError(Faults.INCORRECT_PARAMETERS)