xmlrpc.py 21 KB


  1. import types
  2. import socket
  3. import xmlrpclib
  4. import httplib
  5. import urllib
  6. import re
  7. from cStringIO import StringIO
  8. import traceback
  9. import sys
  10. import base64
  11. from supervisor.medusa.http_server import get_header
  12. from supervisor.medusa.xmlrpc_handler import xmlrpc_handler
  13. from supervisor.medusa import producers
  14. from supervisor.http import NOT_DONE_YET
  15. class Faults:
  16. UNKNOWN_METHOD = 1
  17. INCORRECT_PARAMETERS = 2
  18. BAD_ARGUMENTS = 3
  19. SIGNATURE_UNSUPPORTED = 4
  20. SHUTDOWN_STATE = 6
  21. BAD_NAME = 10
  22. BAD_SIGNAL = 11
  23. NO_FILE = 20
  24. NOT_EXECUTABLE = 21
  25. FAILED = 30
  26. ABNORMAL_TERMINATION = 40
  27. SPAWN_ERROR = 50
  28. ALREADY_STARTED = 60
  29. NOT_RUNNING = 70
  30. SUCCESS = 80
  31. ALREADY_ADDED = 90
  32. STILL_RUNNING = 91
  33. CANT_REREAD = 92
  34. def getFaultDescription(code):
  35. for faultname in Faults.__dict__:
  36. if getattr(Faults, faultname) == code:
  37. return faultname
  38. return 'UNKNOWN'
  39. class RPCError(Exception):
  40. def __init__(self, code, extra=None):
  41. self.code = code
  42. self.text = getFaultDescription(code)
  43. if extra is not None:
  44. self.text = '%s: %s' % (self.text, extra)
  45. def __str__(self):
  46. return 'code=%r, text=%r' % (self.code, self.text)
  47. class DeferredXMLRPCResponse:
  48. """ A medusa producer that implements a deferred callback; requires
  49. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  50. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  51. def __init__(self, request, callback):
  52. self.callback = callback
  53. self.request = request
  54. self.finished = False
  55. self.delay = float(callback.delay)
  56. def more(self):
  57. if self.finished:
  58. return ''
  59. try:
  60. try:
  61. value = self.callback()
  62. if value is NOT_DONE_YET:
  63. return NOT_DONE_YET
  64. except RPCError, err:
  65. value = xmlrpclib.Fault(err.code, err.text)
  66. body = xmlrpc_marshal(value)
  67. self.finished = True
  68. return self.getresponse(body)
  69. except:
  70. tb = traceback.format_exc()
  71. self.request.channel.server.logger.log(
  72. "XML-RPC response callback error", tb
  73. )
  74. self.finished = True
  75. self.request.error(500)
  76. def getresponse(self, body):
  77. self.request['Content-Type'] = 'text/xml'
  78. self.request['Content-Length'] = len(body)
  79. self.request.push(body)
  80. connection = get_header(self.CONNECTION, self.request.header)
  81. close_it = 0
  82. wrap_in_chunking = 0
  83. if self.request.version == '1.0':
  84. if connection == 'keep-alive':
  85. if not self.request.has_key ('Content-Length'):
  86. close_it = 1
  87. else:
  88. self.request['Connection'] = 'Keep-Alive'
  89. else:
  90. close_it = 1
  91. elif self.request.version == '1.1':
  92. if connection == 'close':
  93. close_it = 1
  94. elif not self.request.has_key ('Content-Length'):
  95. if self.request.has_key ('Transfer-Encoding'):
  96. if not self.request['Transfer-Encoding'] == 'chunked':
  97. close_it = 1
  98. elif self.request.use_chunked:
  99. self.request['Transfer-Encoding'] = 'chunked'
  100. wrap_in_chunking = 1
  101. else:
  102. close_it = 1
  103. elif self.request.version is None:
  104. close_it = 1
  105. outgoing_header = producers.simple_producer (
  106. self.request.build_reply_header())
  107. if close_it:
  108. self.request['Connection'] = 'close'
  109. if wrap_in_chunking:
  110. outgoing_producer = producers.chunked_producer (
  111. producers.composite_producer (self.request.outgoing)
  112. )
  113. # prepend the header
  114. outgoing_producer = producers.composite_producer(
  115. [outgoing_header, outgoing_producer]
  116. )
  117. else:
  118. # prepend the header
  119. self.request.outgoing.insert(0, outgoing_header)
  120. outgoing_producer = producers.composite_producer (
  121. self.request.outgoing)
  122. # apply a few final transformations to the output
  123. self.request.channel.push_with_producer (
  124. # globbing gives us large packets
  125. producers.globbing_producer (
  126. # hooking lets us log the number of bytes sent
  127. producers.hooked_producer (
  128. outgoing_producer,
  129. self.request.log
  130. )
  131. )
  132. )
  133. self.request.channel.current_request = None
  134. if close_it:
  135. self.request.channel.close_when_done()
  136. def xmlrpc_marshal(value):
  137. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  138. if ismethodresponse:
  139. if not isinstance(value, tuple):
  140. value = (value,)
  141. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  142. else:
  143. body = xmlrpclib.dumps(value)
  144. return body
  145. class SystemNamespaceRPCInterface:
  146. def __init__(self, namespaces):
  147. self.namespaces = {}
  148. for name, inst in namespaces:
  149. self.namespaces[name] = inst
  150. self.namespaces['system'] = self
  151. def _listMethods(self):
  152. methods = {}
  153. for ns_name in self.namespaces:
  154. namespace = self.namespaces[ns_name]
  155. for method_name in namespace.__class__.__dict__:
  156. # introspect; any methods that don't start with underscore
  157. # are published
  158. func = getattr(namespace, method_name)
  159. meth = getattr(func, 'im_func', None)
  160. if meth is not None:
  161. if not method_name.startswith('_'):
  162. sig = '%s.%s' % (ns_name, method_name)
  163. methods[sig] = str(func.__doc__)
  164. return methods
  165. def listMethods(self):
  166. """ Return an array listing the available method names
  167. @return array result An array of method names available (strings).
  168. """
  169. methods = self._listMethods()
  170. keys = methods.keys()
  171. keys.sort()
  172. return keys
  173. def methodHelp(self, name):
  174. """ Return a string showing the method's documentation
  175. @param string name The name of the method.
  176. @return string result The documentation for the method name.
  177. """
  178. methods = self._listMethods()
  179. for methodname in methods.keys():
  180. if methodname == name:
  181. return methods[methodname]
  182. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  183. def methodSignature(self, name):
  184. """ Return an array describing the method signature in the
  185. form [rtype, ptype, ptype...] where rtype is the return data type
  186. of the method, and ptypes are the parameter data types that the
  187. method accepts in method argument order.
  188. @param string name The name of the method.
  189. @return array result The result.
  190. """
  191. methods = self._listMethods()
  192. for method in methods:
  193. if method == name:
  194. rtype = None
  195. ptypes = []
  196. parsed = gettags(methods[method])
  197. for thing in parsed:
  198. if thing[1] == 'return': # tag name
  199. rtype = thing[2] # datatype
  200. elif thing[1] == 'param': # tag name
  201. ptypes.append(thing[2]) # datatype
  202. if rtype is None:
  203. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  204. return [rtype] + ptypes
  205. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  206. def multicall(self, calls):
  207. """Process an array of calls, and return an array of
  208. results. Calls should be structs of the form {'methodName':
  209. string, 'params': array}. Each result will either be a
  210. single-item array containing the result value, or a struct of
  211. the form {'faultCode': int, 'faultString': string}. This is
  212. useful when you need to make lots of small calls without lots
  213. of round trips.
  214. @param array calls An array of call requests
  215. @return array result An array of results
  216. """
  217. remaining_calls = calls[:] # [{'methodName':x, 'params':x}, ...]
  218. callbacks = [] # always empty or 1 callback function only
  219. results = [] # results of completed calls
  220. # args are only to fool scoping and are never passed by caller
  221. def multi(remaining_calls=remaining_calls,
  222. callbacks=callbacks,
  223. results=results):
  224. # if waiting on a callback, call it, then remove it if it's done
  225. if callbacks:
  226. try:
  227. value = callbacks[0]()
  228. except RPCError, exc:
  229. value = {'faultCode': exc.code,
  230. 'faultString': exc.text}
  231. except:
  232. info = sys.exc_info()
  233. errmsg = "%s:%s" % (info[0], info[1])
  234. value = {'faultCode': Faults.FAILED,
  235. 'faultString': 'FAILED: ' + errmsg}
  236. if value is not NOT_DONE_YET:
  237. callbacks.pop(0)
  238. results.append(value)
  239. # if we don't have a callback now, pop calls and call them in
  240. # order until one returns a callback.
  241. while (not callbacks) and remaining_calls:
  242. call = remaining_calls.pop(0)
  243. name = call.get('methodName', None)
  244. params = call.get('params', [])
  245. try:
  246. if name is None:
  247. raise RPCError(Faults.INCORRECT_PARAMETERS,
  248. 'No methodName')
  249. if name == 'system.multicall':
  250. raise RPCError(Faults.INCORRECT_PARAMETERS,
  251. 'Recursive system.multicall forbidden')
  252. # make the call, may return a callback or not
  253. root = AttrDict(self.namespaces)
  254. value = traverse(root, name, params)
  255. except RPCError, exc:
  256. value = {'faultCode': exc.code,
  257. 'faultString': exc.text}
  258. except:
  259. info = sys.exc_info()
  260. errmsg = "%s:%s" % (info[0], info[1])
  261. value = {'faultCode': Faults.FAILED,
  262. 'faultString': 'FAILED: ' + errmsg}
  263. if isinstance(value, types.FunctionType):
  264. callbacks.append(value)
  265. else:
  266. results.append(value)
  267. # we are done when there's no callback and no more calls queued
  268. if callbacks or remaining_calls:
  269. return NOT_DONE_YET
  270. else:
  271. return results
  272. multi.delay = 0.05
  273. # optimization: multi() is called here instead of just returning
  274. # multi in case all calls complete and we can return with no delay.
  275. value = multi()
  276. if value is NOT_DONE_YET:
  277. return multi
  278. else:
  279. return value
  280. class AttrDict(dict):
  281. # hack to make a dict's getattr equivalent to its getitem
  282. def __getattr__(self, name):
  283. return self.get(name)
  284. class RootRPCInterface:
  285. def __init__(self, subinterfaces):
  286. for name, rpcinterface in subinterfaces:
  287. setattr(self, name, rpcinterface)
  288. def capped_int(value):
  289. i = int(value)
  290. if i < xmlrpclib.MININT:
  291. i = xmlrpclib.MININT
  292. elif i > xmlrpclib.MAXINT:
  293. i = xmlrpclib.MAXINT
  294. return i
  295. def make_datetime(text):
  296. return datetime.datetime(
  297. *time.strptime(text, "%Y%m%dT%H:%M:%S")[:6]
  298. )
  299. class supervisor_xmlrpc_handler(xmlrpc_handler):
  300. path = '/RPC2'
  301. IDENT = 'Supervisor XML-RPC Handler'
  302. def __init__(self, supervisord, subinterfaces):
  303. self.rpcinterface = RootRPCInterface(subinterfaces)
  304. self.supervisord = supervisord
  305. if loads:
  306. self.loads = loads
  307. else:
  308. self.supervisord.options.logger.warn(
  309. 'cElementTree not installed, using slower XML parser for '
  310. 'XML-RPC'
  311. )
  312. self.loads = xmlrpclib.loads
  313. def match(self, request):
  314. return request.uri.startswith(self.path)
  315. def continue_request(self, data, request):
  316. logger = self.supervisord.options.logger
  317. try:
  318. try:
  319. params, method = self.loads(data)
  320. except:
  321. logger.error(
  322. 'XML-RPC request data %r is invalid: unmarshallable' %
  323. (data,)
  324. )
  325. request.error(400)
  326. return
  327. # no <methodName> in the request or name is an empty string
  328. if not method:
  329. logger.error(
  330. 'XML-RPC request data %r is invalid: no method name' %
  331. (data,)
  332. )
  333. request.error(400)
  334. return
  335. # we allow xml-rpc clients that do not send empty <params>
  336. # when there are no parameters for the method call
  337. if params is None:
  338. params = ()
  339. try:
  340. logger.trace('XML-RPC method called: %s()' % method)
  341. value = self.call(method, params)
  342. logger.trace('XML-RPC method %s() returned successfully' %
  343. method)
  344. except RPCError, err:
  345. # turn RPCError reported by method into a Fault instance
  346. value = xmlrpclib.Fault(err.code, err.text)
  347. logger.trace('XML-RPC method %s() returned fault: [%d] %s' % (
  348. method,
  349. err.code, err.text))
  350. if isinstance(value, types.FunctionType):
  351. # returning a function from an RPC method implies that
  352. # this needs to be a deferred response (it needs to block).
  353. pushproducer = request.channel.push_with_producer
  354. pushproducer(DeferredXMLRPCResponse(request, value))
  355. else:
  356. # if we get anything but a function, it implies that this
  357. # response doesn't need to be deferred, we can service it
  358. # right away.
  359. body = xmlrpc_marshal(value)
  360. request['Content-Type'] = 'text/xml'
  361. request['Content-Length'] = len(body)
  362. request.push(body)
  363. request.done()
  364. except:
  365. tb = traceback.format_exc()
  366. logger.critical(
  367. "Handling XML-RPC request with data %r raised an unexpected "
  368. "exception: %s" % (data, tb)
  369. )
  370. # internal error, report as HTTP server error
  371. request.error(500)
  372. def call(self, method, params):
  373. return traverse(self.rpcinterface, method, params)
  374. def traverse(ob, method, params):
  375. path = method.split('.')
  376. for name in path:
  377. if name.startswith('_'):
  378. # security (don't allow things that start with an underscore to
  379. # be called remotely)
  380. raise RPCError(Faults.UNKNOWN_METHOD)
  381. ob = getattr(ob, name, None)
  382. if ob is None:
  383. raise RPCError(Faults.UNKNOWN_METHOD)
  384. try:
  385. return ob(*params)
  386. except TypeError:
  387. raise RPCError(Faults.INCORRECT_PARAMETERS)
  388. class SupervisorTransport(xmlrpclib.Transport):
  389. """
  390. Provides a Transport for xmlrpclib that uses
  391. httplib.HTTPConnection in order to support persistent
  392. connections. Also support basic auth and UNIX domain socket
  393. servers.
  394. """
  395. connection = None
  396. _use_datetime = 0 # python 2.5 fwd compatibility
  397. def __init__(self, username=None, password=None, serverurl=None):
  398. self.username = username
  399. self.password = password
  400. self.verbose = False
  401. self.serverurl = serverurl
  402. if serverurl.startswith('http://'):
  403. type, uri = urllib.splittype(serverurl)
  404. host, path = urllib.splithost(uri)
  405. host, port = urllib.splitport(host)
  406. if port is None:
  407. port = 80
  408. else:
  409. port = int(port)
  410. def get_connection(host=host, port=port):
  411. return httplib.HTTPConnection(host, port)
  412. self._get_connection = get_connection
  413. elif serverurl.startswith('unix://'):
  414. def get_connection(serverurl=serverurl):
  415. # we use 'localhost' here because domain names must be
  416. # < 64 chars (or we'd use the serverurl filename)
  417. conn = UnixStreamHTTPConnection('localhost')
  418. conn.socketfile = serverurl[7:]
  419. return conn
  420. self._get_connection = get_connection
  421. else:
  422. raise ValueError('Unknown protocol for serverurl %s' % serverurl)
  423. def request(self, host, handler, request_body, verbose=0):
  424. if not self.connection:
  425. self.connection = self._get_connection()
  426. self.headers = {
  427. "User-Agent" : self.user_agent,
  428. "Content-Type" : "text/xml",
  429. "Accept": "text/xml"
  430. }
  431. # basic auth
  432. if self.username is not None and self.password is not None:
  433. unencoded = "%s:%s" % (self.username, self.password)
  434. encoded = base64.encodestring(unencoded).replace('\n', '')
  435. self.headers["Authorization"] = "Basic %s" % encoded
  436. self.headers["Content-Length"] = str(len(request_body))
  437. self.connection.request('POST', handler, request_body, self.headers)
  438. r = self.connection.getresponse()
  439. if r.status != 200:
  440. self.connection.close()
  441. self.connection = None
  442. raise xmlrpclib.ProtocolError(host + handler,
  443. r.status,
  444. r.reason,
  445. '' )
  446. data = r.read()
  447. p, u = self.getparser()
  448. p.feed(data)
  449. p.close()
  450. return u.close()
  451. class UnixStreamHTTPConnection(httplib.HTTPConnection):
  452. def connect(self):
  453. self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  454. # we abuse the host parameter as the socketname
  455. self.sock.connect(self.socketfile)
  456. def gettags(comment):
  457. """ Parse documentation strings into JavaDoc-like tokens """
  458. tags = []
  459. tag = None
  460. datatype = None
  461. name = None
  462. tag_lineno = lineno = 0
  463. tag_text = []
  464. for line in comment.split('\n'):
  465. line = line.strip()
  466. if line.startswith("@"):
  467. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  468. parts = line.split(None, 3)
  469. if len(parts) == 1:
  470. datatype = ''
  471. name = ''
  472. tag_text = []
  473. elif len(parts) == 2:
  474. datatype = parts[1]
  475. name = ''
  476. tag_text = []
  477. elif len(parts) == 3:
  478. datatype = parts[1]
  479. name = parts[2]
  480. tag_text = []
  481. elif len(parts) == 4:
  482. datatype = parts[1]
  483. name = parts[2]
  484. tag_text = [parts[3].lstrip()]
  485. tag = parts[0][1:]
  486. tag_lineno = lineno
  487. else:
  488. if line:
  489. tag_text.append(line)
  490. lineno = lineno + 1
  491. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  492. return tags
  493. try:
  494. # Python 2.6 contains a version of cElementTree inside it.
  495. from xml.etree.ElementTree import iterparse
  496. except ImportError:
  497. try:
  498. # Failing that, try cElementTree instead.
  499. from cElementTree import iterparse
  500. except ImportError:
  501. iterparse = None
  502. if iterparse is not None:
  503. import datetime, time
  504. from base64 import decodestring
  505. unmarshallers = {
  506. "int": lambda x: int(x.text),
  507. "i4": lambda x: int(x.text),
  508. "boolean": lambda x: x.text == "1",
  509. "string": lambda x: x.text or "",
  510. "double": lambda x: float(x.text),
  511. "dateTime.iso8601": lambda x: make_datetime(x.text),
  512. "array": lambda x: x[0].text,
  513. "data": lambda x: [v.text for v in x],
  514. "struct": lambda x: dict([(k.text or "", v.text) for k, v in x]),
  515. "base64": lambda x: decodestring(x.text or ""),
  516. "param": lambda x: x[0].text,
  517. }
  518. def loads(data):
  519. params = method = None
  520. for action, elem in iterparse(StringIO(data)):
  521. unmarshal = unmarshallers.get(elem.tag)
  522. if unmarshal:
  523. data = unmarshal(elem)
  524. elem.clear()
  525. elem.text = data
  526. elif elem.tag == "value":
  527. try:
  528. data = elem[0].text
  529. except IndexError:
  530. data = elem.text or ""
  531. elem.clear()
  532. elem.text = data
  533. elif elem.tag == "methodName":
  534. method = elem.text
  535. elif elem.tag == "params":
  536. params = tuple([v.text for v in elem])
  537. return params, method
  538. else:
  539. loads = None