rpc.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
  1. import xmlrpclib
  2. import os
  3. from supervisord import ProcessStates
  4. from supervisord import SupervisorStates
  5. from supervisord import getSupervisorStateDescription
  6. import doctags
  7. import signal
  8. import time
  9. from medusa.xmlrpc_handler import xmlrpc_handler
  10. from medusa.http_server import get_header
  11. from medusa import producers
  12. import sys
  13. import types
  14. import re
  15. import traceback
  16. import StringIO
  17. import tempfile
  18. import errno
  19. from http import NOT_DONE_YET
  20. RPC_VERSION = 1.0
  21. class Faults:
  22. UNKNOWN_METHOD = 1
  23. INCORRECT_PARAMETERS = 2
  24. BAD_ARGUMENTS = 3
  25. SIGNATURE_UNSUPPORTED = 4
  26. SHUTDOWN_STATE = 6
  27. BAD_NAME = 10
  28. NO_FILE = 20
  29. FAILED = 30
  30. ABNORMAL_TERMINATION = 40
  31. SPAWN_ERROR = 50
  32. ALREADY_STARTED = 60
  33. NOT_RUNNING = 70
  34. SUCCESS = 80
  35. def getFaultDescription(code):
  36. for faultname in Faults.__dict__:
  37. if getattr(Faults, faultname) == code:
  38. return faultname
  39. return 'UNKNOWN'
  40. class RPCError(Exception):
  41. def __init__(self, code, extra=None):
  42. self.code = code
  43. self.text = getFaultDescription(code)
  44. if extra is not None:
  45. self.text = '%s: %s' % (self.text, extra)
  46. class DeferredXMLRPCResponse:
  47. """ A medusa producer that implements a deferred callback; requires
  48. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  49. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  50. def __init__(self, request, callback):
  51. self.callback = callback
  52. self.request = request
  53. self.finished = False
  54. self.delay = float(callback.delay)
  55. def more(self):
  56. if self.finished:
  57. return ''
  58. try:
  59. try:
  60. value = self.callback()
  61. if value is NOT_DONE_YET:
  62. return NOT_DONE_YET
  63. except RPCError, err:
  64. value = xmlrpclib.Fault(err.code, err.text)
  65. body = xmlrpc_marshal(value)
  66. self.finished = True
  67. return self.getresponse(body)
  68. except:
  69. # report unexpected exception back to server
  70. traceback.print_exc()
  71. self.finished = True
  72. self.request.error(500)
  73. def getresponse(self, body):
  74. self.request['Content-Type'] = 'text/xml'
  75. self.request['Content-Length'] = len(body)
  76. self.request.push(body)
  77. connection = get_header(self.CONNECTION, self.request.header)
  78. close_it = 0
  79. wrap_in_chunking = 0
  80. if self.request.version == '1.0':
  81. if connection == 'keep-alive':
  82. if not self.request.has_key ('Content-Length'):
  83. close_it = 1
  84. else:
  85. self.request['Connection'] = 'Keep-Alive'
  86. else:
  87. close_it = 1
  88. elif self.request.version == '1.1':
  89. if connection == 'close':
  90. close_it = 1
  91. elif not self.request.has_key ('Content-Length'):
  92. if self.request.has_key ('Transfer-Encoding'):
  93. if not self.request['Transfer-Encoding'] == 'chunked':
  94. close_it = 1
  95. elif self.request.use_chunked:
  96. self.request['Transfer-Encoding'] = 'chunked'
  97. wrap_in_chunking = 1
  98. else:
  99. close_it = 1
  100. elif self.request.version is None:
  101. close_it = 1
  102. outgoing_header = producers.simple_producer (
  103. self.request.build_reply_header())
  104. if close_it:
  105. self.request['Connection'] = 'close'
  106. if wrap_in_chunking:
  107. outgoing_producer = producers.chunked_producer (
  108. producers.composite_producer (self.request.outgoing)
  109. )
  110. # prepend the header
  111. outgoing_producer = producers.composite_producer(
  112. [outgoing_header, outgoing_producer]
  113. )
  114. else:
  115. # prepend the header
  116. self.request.outgoing.insert(0, outgoing_header)
  117. outgoing_producer = producers.composite_producer (
  118. self.request.outgoing)
  119. # apply a few final transformations to the output
  120. self.request.channel.push_with_producer (
  121. # globbing gives us large packets
  122. producers.globbing_producer (
  123. # hooking lets us log the number of bytes sent
  124. producers.hooked_producer (
  125. outgoing_producer,
  126. self.request.log
  127. )
  128. )
  129. )
  130. self.request.channel.current_request = None
  131. if close_it:
  132. self.request.channel.close_when_done()
  133. def xmlrpc_marshal(value):
  134. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  135. if ismethodresponse:
  136. if not isinstance(value, tuple):
  137. value = (value,)
  138. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  139. else:
  140. body = xmlrpclib.dumps(value)
  141. return body
  142. class SupervisorNamespaceRPCInterface:
  143. def __init__(self, supervisord):
  144. self.supervisord = supervisord
  145. def _update(self, text):
  146. self.update_text = text # for unit tests, mainly
  147. state = self.supervisord.get_state()
  148. if state == SupervisorStates.SHUTDOWN:
  149. raise RPCError(Faults.SHUTDOWN_STATE)
  150. # RPC API methods
  151. def getVersion(self):
  152. """ Return the version of the RPC API used by supervisord
  153. @return int version version id
  154. """
  155. self._update('getVersion')
  156. return RPC_VERSION
  157. def getIdentification(self):
  158. """ Return identifiying string of supervisord
  159. @return string identifier identifying string
  160. """
  161. self._update('getIdentification')
  162. return self.supervisord.options.identifier
  163. def getState(self):
  164. """ Return current state of supervisord as a struct
  165. @return struct A struct with keys string statecode, int statename
  166. """
  167. self._update('getState')
  168. state = self.supervisord.get_state()
  169. statename = getSupervisorStateDescription(state)
  170. data = {
  171. 'statecode':state,
  172. 'statename':statename,
  173. }
  174. return data
  175. def readLog(self, offset, length):
  176. """ Read length bytes from the main log starting at offset.
  177. @param int offset offset to start reading from.
  178. @param int length number of bytes to read from the log.
  179. @return struct data a struct with keys 'log' (value of 'log' is string).
  180. """
  181. self._update('readLog')
  182. logfile = self.supervisord.options.logfile
  183. if logfile is None or not os.path.exists(logfile):
  184. raise RPCError(Faults.NO_FILE)
  185. try:
  186. return _readFile(logfile, offset, length)
  187. except ValueError, inst:
  188. why = inst.args[0]
  189. raise RPCError(getattr(Faults, why))
  190. def clearLog(self):
  191. """ Clear the main log.
  192. @return boolean result always returns True unless error
  193. """
  194. self._update('clearLog')
  195. logfile = self.supervisord.options.logfile
  196. if logfile is None or not os.path.exists(logfile):
  197. raise RPCError(Faults.NO_FILE)
  198. try:
  199. os.remove(logfile) # there is a race condition here, but ignore it.
  200. except (os.error, IOError):
  201. raise RPCError(Faults.FAILED)
  202. for handler in self.supervisord.options.logger.handlers:
  203. if hasattr(handler, 'reopen'):
  204. self.supervisord.options.logger.info('reopening log file')
  205. handler.reopen()
  206. return True
  207. def shutdown(self):
  208. """ Shut down the supervisor process
  209. @return boolean result always returns True unless error
  210. """
  211. self._update('shutdown')
  212. self.supervisord.mood = -1
  213. return True
  214. def restart(self):
  215. """ Restart the supervisor process
  216. @return boolean result always return True unless error
  217. """
  218. self._update('restart')
  219. self.supervisord.mood = 0
  220. return True
  221. def startProcess(self, name, timeout=500):
  222. """ Start a process
  223. @param string name Process name
  224. @param int timeout Number of milliseconds to wait for process start
  225. @return boolean result Always true unless error
  226. """
  227. self._update('startProcess')
  228. processes = self.supervisord.processes
  229. process = processes.get(name)
  230. try:
  231. timeout = int(timeout)
  232. except:
  233. raise RPCError(Faults.BAD_ARGUMENTS, 'timeout: %s' % timeout)
  234. if process is None:
  235. raise RPCError(Faults.BAD_NAME, name)
  236. if process.pid:
  237. raise RPCError(Faults.ALREADY_STARTED, name)
  238. process.spawn()
  239. if process.spawnerr:
  240. raise RPCError(Faults.SPAWN_ERROR, name)
  241. if not timeout:
  242. timeout = 0
  243. milliseconds = timeout / 1000.0
  244. start = time.time()
  245. def check_still_running(done=False): # done arg is only for unit testing
  246. t = time.time()
  247. runtime = (t - start)
  248. if not done and runtime < milliseconds:
  249. return NOT_DONE_YET
  250. pid = processes[name].pid
  251. if pid:
  252. return True
  253. raise RPCError(Faults.ABNORMAL_TERMINATION, name)
  254. check_still_running.delay = milliseconds
  255. check_still_running.rpcinterface = self
  256. return check_still_running # deferred
  257. def startAllProcesses(self, timeout=500):
  258. """ Start all processes listed in the configuration file
  259. @param int timeout Number of milliseconds to wait for each process start
  260. @return struct result A structure containing start statuses
  261. """
  262. self._update('startAllProcesses')
  263. try:
  264. timeout = int(timeout)
  265. except:
  266. raise RPCError(Faults.BAD_ARGUMENTS, 'timeout: %s' % timeout)
  267. processes = self.supervisord.processes.values()
  268. processes.sort() # asc by priority
  269. results = []
  270. callbacks = []
  271. for process in processes:
  272. if process.get_state() != ProcessStates.RUNNING:
  273. # only start nonrunning processes
  274. try:
  275. callbacks.append((process.config.name,
  276. self.startProcess(process.config.name, timeout)))
  277. except RPCError, e:
  278. results.append({'name':process.config.name, 'status':e.code,
  279. 'description':e.text})
  280. continue
  281. def startall(done=False): # done arg is for unit testing
  282. if not callbacks:
  283. return results
  284. name, callback = callbacks.pop(0)
  285. try:
  286. value = callback(done)
  287. except RPCError, e:
  288. results.append({'name':name, 'status':e.code,
  289. 'description':e.text})
  290. return NOT_DONE_YET
  291. if value is NOT_DONE_YET:
  292. # push it back into the queue; it will finish eventually
  293. callbacks.append((name,callback))
  294. else:
  295. results.append({'name':name, 'status':Faults.SUCCESS,
  296. 'description':'OK'})
  297. if callbacks:
  298. return NOT_DONE_YET
  299. return results
  300. # XXX the above implementation has a weakness inasmuch as the
  301. # first call into each individual process callback will always
  302. # return NOT_DONE_YET, so they need to be called twice. The
  303. # symptom of this is that calling this method causes the
  304. # client to block for much longer than it actually requires to
  305. # start all of the nonrunning processes. See stopAllProcesses
  306. startall.delay = 0.05
  307. startall.rpcinterface = self
  308. return startall # deferred
  309. def stopProcess(self, name):
  310. """ Stop a process named by name
  311. @param string name The name of the process to stop
  312. @return boolean result Always return True unless error
  313. """
  314. self._update('stopProcess')
  315. process = self.supervisord.processes.get(name)
  316. if process is None:
  317. raise RPCError(Faults.BAD_NAME, name)
  318. if process.get_state() != ProcessStates.RUNNING:
  319. raise RPCError(Faults.NOT_RUNNING)
  320. def killit():
  321. if process.killing:
  322. return NOT_DONE_YET
  323. elif process.pid:
  324. msg = process.stop()
  325. if msg is not None:
  326. raise RPCError(Faults.FAILED, name)
  327. return NOT_DONE_YET
  328. else:
  329. return True
  330. killit.delay = 0.2
  331. killit.rpcinterface = self
  332. return killit # deferred
  333. def stopAllProcesses(self):
  334. """ Stop all processes in the process list
  335. @return boolean result Always return true unless error.
  336. """
  337. self._update('stopAllProcesses')
  338. processes = self.supervisord.processes.values()
  339. processes.sort()
  340. processes.reverse() # stop in reverse priority order
  341. callbacks = []
  342. results = []
  343. for process in processes:
  344. if process.get_state() == ProcessStates.RUNNING:
  345. # only stop running processes
  346. try:
  347. callbacks.append((process.config.name,
  348. self.stopProcess(process.config.name)))
  349. except RPCError, e:
  350. results.append({'name':name, 'status':e.code,
  351. 'description':e.text})
  352. continue
  353. def killall():
  354. if not callbacks:
  355. return results
  356. name, callback = callbacks.pop(0)
  357. try:
  358. value = callback()
  359. except RPCError, e:
  360. results.append({'name':name, 'status':e.code,
  361. 'description':e.text})
  362. return NOT_DONE_YET
  363. if value is NOT_DONE_YET:
  364. # push it back into the queue; it will finish eventually
  365. callbacks.append((name, callback))
  366. else:
  367. results.append({'name':name, 'status':Faults.SUCCESS,
  368. 'description':'OK'})
  369. if callbacks:
  370. return NOT_DONE_YET
  371. return results
  372. # XXX the above implementation has a weakness inasmuch as the
  373. # first call into each individual process callback will always
  374. # return NOT_DONE_YET, so they need to be called twice. The
  375. # symptom of this is that calling this method causes the
  376. # client to block for much longer than it actually requires to
  377. # kill all of the running processes. After the first call to
  378. # the killit callback, the process is actually dead, but the
  379. # above killall method processes the callbacks one at a time
  380. # during the select loop, which, because there is no output
  381. # from child processes after stopAllProcesses is called, is
  382. # not busy, so hits the timeout for each callback. I
  383. # attempted to make this better, but the only way to make it
  384. # better assumes totally synchronous reaping of child
  385. # processes, which requires infrastructure changes to
  386. # supervisord that are scary at the moment as it could take a
  387. # while to pin down all of the platform differences and might
  388. # require a C extension to the Python signal module to allow
  389. # the setting of ignore flags to signals.
  390. killall.delay = 0.05
  391. killall.rpcinterface = self
  392. return killall # deferred
  393. def getProcessInfo(self, name):
  394. """ Get info about a process named name
  395. @param string name The name of the process
  396. @return struct result A structure containing data about the process
  397. """
  398. self._update('getProcessInfo')
  399. process = self.supervisord.processes.get(name)
  400. if process is None:
  401. raise RPCError(Faults.BAD_NAME, name)
  402. start = int(process.laststart)
  403. stop = int(process.laststop)
  404. now = int(time.time())
  405. state = process.get_state()
  406. spawnerr = process.spawnerr or ''
  407. exitstatus = process.exitstatus or 0
  408. reportstatusmsg = process.reportstatusmsg or ''
  409. return {
  410. 'name':name,
  411. 'start':start,
  412. 'stop':stop,
  413. 'now':now,
  414. 'state':state,
  415. 'spawnerr':spawnerr,
  416. 'exitstatus':exitstatus,
  417. 'reportstatusmsg':reportstatusmsg,
  418. 'logfile':process.config.logfile,
  419. 'pid':process.pid
  420. }
  421. def getAllProcessInfo(self):
  422. """ Get info about all processes
  423. @return array result An array of process status results
  424. """
  425. self._update('getAllProcessInfo')
  426. processnames = self.supervisord.processes.keys()
  427. processnames.sort()
  428. output = []
  429. for processname in processnames:
  430. output.append(self.getProcessInfo(processname))
  431. return output
  432. def readProcessLog(self, name, offset, length):
  433. """ Read length bytes from processName's log starting at offset
  434. @param string name The name of the process
  435. @param int offset offset to start reading from.
  436. @param int length number of bytes to read from the log.
  437. @return string result Bytes of log
  438. """
  439. self._update('readProcessLog')
  440. process = self.supervisord.processes.get(name)
  441. if process is None:
  442. raise RPCError(Faults.BAD_NAME, name)
  443. logfile = process.config.logfile
  444. if logfile is None or not os.path.exists(logfile):
  445. raise RPCError(Faults.NO_FILE, logfile)
  446. try:
  447. return _readFile(logfile, offset, length)
  448. except ValueError, inst:
  449. why = inst.args[0]
  450. raise RPCError(getattr(Faults, why))
  451. def clearProcessLog(self, name):
  452. """ Clear the log for processName and reopen it
  453. @param string name The name of the process
  454. @return boolean result Always True unless error
  455. """
  456. self._update('clearProcessLog')
  457. process = self.supervisord.processes.get(name)
  458. if process is None:
  459. raise RPCError(Faults.BAD_NAME, name)
  460. try:
  461. # implies a reopen
  462. process.removelogs()
  463. except (IOError, os.error):
  464. raise RPCError(Faults.FAILED, name)
  465. return True
  466. def clearAllProcessLogs(self):
  467. """ Clear all process log files
  468. @return boolean result Always return true
  469. """
  470. self._update('clearAllProcessLogs')
  471. results = []
  472. callbacks = []
  473. processnames = self.supervisord.processes.keys()
  474. processnames.sort()
  475. for processname in processnames:
  476. callbacks.append((processname, self.clearProcessLog))
  477. def clearall():
  478. if not callbacks:
  479. return results
  480. name, callback = callbacks.pop(0)
  481. try:
  482. callback(name)
  483. except RPCError, e:
  484. results.append({'name':name, 'status':e.code,
  485. 'description':e.text})
  486. else:
  487. results.append({'name':name, 'status':Faults.SUCCESS,
  488. 'description':'OK'})
  489. if callbacks:
  490. return NOT_DONE_YET
  491. return results
  492. clearall.delay = 0.05
  493. clearall.rpcinterface = self
  494. return clearall # deferred
  495. def _rotateMainLog(self):
  496. """ Rotate the main supervisord log (for debugging/testing) """
  497. self._update('_rotateMainLog')
  498. for handler in self.supervisord.options.logger.handlers:
  499. if hasattr(handler, 'doRollover'):
  500. handler.doRollover()
  501. return True
  502. class SystemNamespaceRPCInterface:
  503. def __init__(self, namespaces):
  504. self.namespaces = {}
  505. for name, inst in namespaces:
  506. self.namespaces[name] = inst
  507. self.namespaces['system'] = self
  508. def _listMethods(self):
  509. methods = {}
  510. for ns_name in self.namespaces:
  511. namespace = self.namespaces[ns_name]
  512. for method_name in namespace.__class__.__dict__:
  513. # introspect; any methods that don't start with underscore
  514. # are published
  515. func = getattr(namespace, method_name)
  516. meth = getattr(func, 'im_func', None)
  517. if meth is not None:
  518. if not method_name.startswith('_'):
  519. sig = '%s.%s' % (ns_name, method_name)
  520. methods[sig] = str(func.__doc__)
  521. return methods
  522. def listMethods(self):
  523. """ Return an array listing the available method names
  524. @return array result An array of method names available (strings).
  525. """
  526. methods = self._listMethods()
  527. keys = methods.keys()
  528. keys.sort()
  529. return keys
  530. def methodHelp(self, name):
  531. """ Return a string showing the method's documentation
  532. @param string name The name of the method.
  533. @return string result The documentation for the method name.
  534. """
  535. methods = self._listMethods()
  536. for methodname in methods.keys():
  537. if methodname == name:
  538. return methods[methodname]
  539. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  540. def methodSignature(self, name):
  541. """ Return an array describing the method signature in the
  542. form [rtype, ptype, ptype...] where rtype is the return data type
  543. of the method, and ptypes are the parameter data types that the
  544. method accepts in method argument order.
  545. @param string name The name of the method.
  546. @return array result The result.
  547. """
  548. methods = self._listMethods()
  549. L = []
  550. for method in methods:
  551. if method == name:
  552. rtype = None
  553. ptypes = []
  554. parsed = doctags.gettags(methods[method])
  555. for thing in parsed:
  556. if thing[1] == 'return': # tag name
  557. rtype = thing[2] # datatype
  558. elif thing[1] == 'param': # tag name
  559. ptypes.append(thing[2]) # datatype
  560. if rtype is None:
  561. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  562. return [rtype] + ptypes
  563. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  564. def multicall(self, calls):
  565. """Process an array of calls, and return an array of
  566. results. Calls should be structs of the form {'methodName':
  567. string, 'params': array}. Each result will either be a
  568. single-item array containg the result value, or a struct of
  569. the form {'faultCode': int, 'faultString': string}. This is
  570. useful when you need to make lots of small calls without lots
  571. of round trips.
  572. @param array calls An array of call requests
  573. @return array result An array of results
  574. """
  575. producers = []
  576. for call in calls:
  577. try:
  578. name = call['methodName']
  579. params = call.get('params', [])
  580. if name == 'system.multicall':
  581. # Recursive system.multicall forbidden
  582. error = 'INCORRECT_PARAMETERS'
  583. raise xmlrpclib.Fault(Faults.INCORRECT_PARAMETERS,
  584. error)
  585. root = AttrDict(self.namespaces)
  586. value = traverse(root, name, params)
  587. except RPCError, inst:
  588. value = {'faultCode': inst.code,
  589. 'faultString': inst.text}
  590. except:
  591. errmsg = "%s:%s" % (sys.exc_type, sys.exc_value)
  592. value = {'faultCode': 1, 'faultString': errmsg}
  593. producers.append(value)
  594. results = []
  595. def multiproduce():
  596. """ Run through all the producers in order """
  597. if not producers:
  598. return []
  599. callback = producers.pop(0)
  600. if isinstance(callback, types.FunctionType):
  601. try:
  602. value = callback()
  603. except RPCError, inst:
  604. value = {'faultCode':inst.code, 'faultString':inst.text}
  605. if value is NOT_DONE_YET:
  606. # push it back in the front of the queue because we
  607. # need to finish the calls in requested order
  608. producers.insert(0, callback)
  609. return NOT_DONE_YET
  610. else:
  611. value = callback
  612. results.append(value)
  613. if producers:
  614. # only finish when all producers are finished
  615. return NOT_DONE_YET
  616. return results
  617. multiproduce.delay = .05
  618. return multiproduce
  619. class AttrDict(dict):
  620. # hack to make a dict's getattr equivalent to its getitem
  621. def __getattr__(self, name):
  622. return self[name]
  623. class RPCInterface:
  624. def __init__(self, supervisord):
  625. self.supervisord = supervisord
  626. self.supervisor = SupervisorNamespaceRPCInterface(supervisord)
  627. self.system = SystemNamespaceRPCInterface(
  628. [('supervisor', self.supervisor)]
  629. )
  630. class supervisor_xmlrpc_handler(xmlrpc_handler):
  631. def __init__(self, supervisord):
  632. self.rpcinterface = RPCInterface(supervisord)
  633. self.supervisord = supervisord
  634. def continue_request (self, data, request):
  635. logger = self.supervisord.options.logger
  636. try:
  637. params, method = xmlrpclib.loads(data)
  638. try:
  639. # 5 is 'trace' level
  640. logger.log(5, 'XML-RPC method called: %s()' % method)
  641. value = self.call(method, params)
  642. logger.log(5, 'XML-RPC method %s() returned successfully' %
  643. method)
  644. except RPCError, err:
  645. # turn RPCError reported by method into a Fault instance
  646. value = xmlrpclib.Fault(err.code, err.text)
  647. logger.warn('XML-RPC method %s() returned fault: [%d] %s' % (
  648. method,
  649. err.code, err.text))
  650. if isinstance(value, types.FunctionType):
  651. # returning a function from an RPC method implies that
  652. # this needs to be a deferred response (it needs to block).
  653. pushproducer = request.channel.push_with_producer
  654. pushproducer(DeferredXMLRPCResponse(request, value))
  655. else:
  656. # if we get anything but a function, it implies that this
  657. # response doesn't need to be deferred, we can service it
  658. # right away.
  659. body = xmlrpc_marshal(value)
  660. request['Content-Type'] = 'text/xml'
  661. request['Content-Length'] = len(body)
  662. request.push(body)
  663. request.done()
  664. except:
  665. io = StringIO.StringIO()
  666. traceback.print_exc(file=io)
  667. val = io.getvalue()
  668. logger.critical(val)
  669. # internal error, report as HTTP server error
  670. request.error(500)
  671. def call(self, method, params):
  672. return traverse(self.rpcinterface, method, params)
  673. def traverse(ob, method, params):
  674. path = method.split('.')
  675. for name in path:
  676. if name.startswith('_'):
  677. # security (don't allow things that start with an underscore to
  678. # be called remotely)
  679. raise RPCError(Faults.UNKNOWN_METHOD)
  680. ob = getattr(ob, name, None)
  681. if ob is None:
  682. raise RPCError(Faults.UNKNOWN_METHOD)
  683. try:
  684. return ob(*params)
  685. except TypeError:
  686. raise RPCError(Faults.INCORRECT_PARAMETERS)
  687. def _readFile(filename, offset, length):
  688. """ Read length bytes from the file named by filename starting at
  689. offset """
  690. absoffset = abs(offset)
  691. abslength = abs(length)
  692. try:
  693. f = open(filename, 'rb')
  694. if absoffset != offset:
  695. # negative offset returns offset bytes from tail of the file
  696. if length:
  697. raise ValueError('BAD_ARGUMENTS')
  698. f.seek(0, 2)
  699. sz = f.tell()
  700. pos = int(sz - absoffset)
  701. if pos < 0:
  702. pos = 0
  703. f.seek(pos)
  704. data = f.read(absoffset)
  705. else:
  706. if abslength != length:
  707. raise ValueError('BAD_ARGUMENTS')
  708. if length == 0:
  709. f.seek(offset)
  710. data = f.read()
  711. else:
  712. sz = f.seek(offset)
  713. data = f.read(length)
  714. except (os.error, IOError):
  715. raise ValueError('FAILED')
  716. return data