xmlrpc.py 21 KB


  1. import types
  2. import socket
  3. import xmlrpclib
  4. import httplib
  5. import urllib
  6. import re
  7. from cStringIO import StringIO
  8. import traceback
  9. import sys
  10. import base64
  11. from supervisor.medusa.http_server import get_header
  12. from supervisor.medusa.xmlrpc_handler import xmlrpc_handler
  13. from supervisor.medusa import producers
  14. from supervisor.http import NOT_DONE_YET
  15. class Faults:
  16. UNKNOWN_METHOD = 1
  17. INCORRECT_PARAMETERS = 2
  18. BAD_ARGUMENTS = 3
  19. SIGNATURE_UNSUPPORTED = 4
  20. SHUTDOWN_STATE = 6
  21. BAD_NAME = 10
  22. BAD_SIGNAL = 11
  23. NO_FILE = 20
  24. NOT_EXECUTABLE = 21
  25. FAILED = 30
  26. ABNORMAL_TERMINATION = 40
  27. SPAWN_ERROR = 50
  28. ALREADY_STARTED = 60
  29. NOT_RUNNING = 70
  30. SUCCESS = 80
  31. ALREADY_ADDED = 90
  32. STILL_RUNNING = 91
  33. CANT_REREAD = 92
  34. def getFaultDescription(code):
  35. for faultname in Faults.__dict__:
  36. if getattr(Faults, faultname) == code:
  37. return faultname
  38. return 'UNKNOWN'
  39. class RPCError(Exception):
  40. def __init__(self, code, extra=None):
  41. self.code = code
  42. self.text = getFaultDescription(code)
  43. if extra is not None:
  44. self.text = '%s: %s' % (self.text, extra)
  45. def __str__(self):
  46. return 'code=%r, text=%r' % (self.code, self.text)
  47. class DeferredXMLRPCResponse:
  48. """ A medusa producer that implements a deferred callback; requires
  49. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  50. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  51. def __init__(self, request, callback):
  52. self.callback = callback
  53. self.request = request
  54. self.finished = False
  55. self.delay = float(callback.delay)
  56. def more(self):
  57. if self.finished:
  58. return ''
  59. try:
  60. try:
  61. value = self.callback()
  62. if value is NOT_DONE_YET:
  63. return NOT_DONE_YET
  64. except RPCError, err:
  65. value = xmlrpclib.Fault(err.code, err.text)
  66. body = xmlrpc_marshal(value)
  67. self.finished = True
  68. return self.getresponse(body)
  69. except:
  70. tb = traceback.format_exc()
  71. self.request.channel.server.logger.log(
  72. "XML-RPC response callback error", tb
  73. )
  74. self.finished = True
  75. self.request.error(500)
  76. def getresponse(self, body):
  77. self.request['Content-Type'] = 'text/xml'
  78. self.request['Content-Length'] = len(body)
  79. self.request.push(body)
  80. connection = get_header(self.CONNECTION, self.request.header)
  81. close_it = 0
  82. wrap_in_chunking = 0
  83. if self.request.version == '1.0':
  84. if connection == 'keep-alive':
  85. if not self.request.has_key ('Content-Length'):
  86. close_it = 1
  87. else:
  88. self.request['Connection'] = 'Keep-Alive'
  89. else:
  90. close_it = 1
  91. elif self.request.version == '1.1':
  92. if connection == 'close':
  93. close_it = 1
  94. elif not self.request.has_key ('Content-Length'):
  95. if self.request.has_key ('Transfer-Encoding'):
  96. if not self.request['Transfer-Encoding'] == 'chunked':
  97. close_it = 1
  98. elif self.request.use_chunked:
  99. self.request['Transfer-Encoding'] = 'chunked'
  100. wrap_in_chunking = 1
  101. else:
  102. close_it = 1
  103. elif self.request.version is None:
  104. close_it = 1
  105. outgoing_header = producers.simple_producer (
  106. self.request.build_reply_header())
  107. if close_it:
  108. self.request['Connection'] = 'close'
  109. if wrap_in_chunking:
  110. outgoing_producer = producers.chunked_producer (
  111. producers.composite_producer (self.request.outgoing)
  112. )
  113. # prepend the header
  114. outgoing_producer = producers.composite_producer(
  115. [outgoing_header, outgoing_producer]
  116. )
  117. else:
  118. # prepend the header
  119. self.request.outgoing.insert(0, outgoing_header)
  120. outgoing_producer = producers.composite_producer (
  121. self.request.outgoing)
  122. # apply a few final transformations to the output
  123. self.request.channel.push_with_producer (
  124. # globbing gives us large packets
  125. producers.globbing_producer (
  126. # hooking lets us log the number of bytes sent
  127. producers.hooked_producer (
  128. outgoing_producer,
  129. self.request.log
  130. )
  131. )
  132. )
  133. self.request.channel.current_request = None
  134. if close_it:
  135. self.request.channel.close_when_done()
  136. def xmlrpc_marshal(value):
  137. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  138. if ismethodresponse:
  139. if not isinstance(value, tuple):
  140. value = (value,)
  141. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  142. else:
  143. body = xmlrpclib.dumps(value)
  144. return body
  145. class SystemNamespaceRPCInterface:
  146. def __init__(self, namespaces):
  147. self.namespaces = {}
  148. for name, inst in namespaces:
  149. self.namespaces[name] = inst
  150. self.namespaces['system'] = self
  151. def _listMethods(self):
  152. methods = {}
  153. for ns_name in self.namespaces:
  154. namespace = self.namespaces[ns_name]
  155. for method_name in namespace.__class__.__dict__:
  156. # introspect; any methods that don't start with underscore
  157. # are published
  158. func = getattr(namespace, method_name)
  159. meth = getattr(func, 'im_func', None)
  160. if meth is not None:
  161. if not method_name.startswith('_'):
  162. sig = '%s.%s' % (ns_name, method_name)
  163. methods[sig] = str(func.__doc__)
  164. return methods
  165. def listMethods(self):
  166. """ Return an array listing the available method names
  167. @return array result An array of method names available (strings).
  168. """
  169. methods = self._listMethods()
  170. keys = methods.keys()
  171. keys.sort()
  172. return keys
  173. def methodHelp(self, name):
  174. """ Return a string showing the method's documentation
  175. @param string name The name of the method.
  176. @return string result The documentation for the method name.
  177. """
  178. methods = self._listMethods()
  179. for methodname in methods.keys():
  180. if methodname == name:
  181. return methods[methodname]
  182. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  183. def methodSignature(self, name):
  184. """ Return an array describing the method signature in the
  185. form [rtype, ptype, ptype...] where rtype is the return data type
  186. of the method, and ptypes are the parameter data types that the
  187. method accepts in method argument order.
  188. @param string name The name of the method.
  189. @return array result The result.
  190. """
  191. methods = self._listMethods()
  192. for method in methods:
  193. if method == name:
  194. rtype = None
  195. ptypes = []
  196. parsed = gettags(methods[method])
  197. for thing in parsed:
  198. if thing[1] == 'return': # tag name
  199. rtype = thing[2] # datatype
  200. elif thing[1] == 'param': # tag name
  201. ptypes.append(thing[2]) # datatype
  202. if rtype is None:
  203. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  204. return [rtype] + ptypes
  205. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  206. def multicall(self, calls):
  207. """Process an array of calls, and return an array of
  208. results. Calls should be structs of the form {'methodName':
  209. string, 'params': array}. Each result will either be a
  210. single-item array containing the result value, or a struct of
  211. the form {'faultCode': int, 'faultString': string}. This is
  212. useful when you need to make lots of small calls without lots
  213. of round trips.
  214. @param array calls An array of call requests
  215. @return array result An array of results
  216. """
  217. remaining_calls = calls[:] # [{'methodName':x, 'params':x}, ...]
  218. callbacks = [] # always empty or 1 callback function only
  219. results = [] # results of completed calls
  220. # args are only to fool scoping and are never passed by caller
  221. def multi(remaining_calls=remaining_calls,
  222. callbacks=callbacks,
  223. results=results):
  224. # if waiting on a callback, call it, then remove it if it's done
  225. if callbacks:
  226. try:
  227. value = callbacks[0]()
  228. except RPCError, exc:
  229. value = {'faultCode': exc.code,
  230. 'faultString': exc.text}
  231. except:
  232. info = sys.exc_info()
  233. errmsg = "%s:%s" % (info[0], info[1])
  234. value = {'faultCode': Faults.FAILED,
  235. 'faultString': 'FAILED: ' + errmsg}
  236. if value is not NOT_DONE_YET:
  237. callbacks.pop(0)
  238. results.append(value)
  239. # if we don't have a callback now, pop calls and call them in
  240. # order until one returns a callback.
  241. while (not callbacks) and remaining_calls:
  242. call = remaining_calls.pop(0)
  243. name = call.get('methodName', None)
  244. params = call.get('params', [])
  245. try:
  246. if name is None:
  247. raise RPCError(Faults.INCORRECT_PARAMETERS,
  248. 'No methodName')
  249. if name == 'system.multicall':
  250. raise RPCError(Faults.INCORRECT_PARAMETERS,
  251. 'Recursive system.multicall forbidden')
  252. # make the call, may return a callback or not
  253. root = AttrDict(self.namespaces)
  254. value = traverse(root, name, params)
  255. except RPCError, exc:
  256. value = {'faultCode': exc.code,
  257. 'faultString': exc.text}
  258. except:
  259. info = sys.exc_info()
  260. errmsg = "%s:%s" % (info[0], info[1])
  261. value = {'faultCode': Faults.FAILED,
  262. 'faultString': 'FAILED: ' + errmsg}
  263. if isinstance(value, types.FunctionType):
  264. callbacks.append(value)
  265. else:
  266. results.append(value)
  267. # we are done when there's no callback and no more calls queued
  268. if callbacks or remaining_calls:
  269. return NOT_DONE_YET
  270. else:
  271. return results
  272. multi.delay = 0.05
  273. # optimization: multi() is called here instead of just returning
  274. # multi in case all calls complete and we can return with no delay.
  275. value = multi()
  276. if value is NOT_DONE_YET:
  277. return multi
  278. else:
  279. return value
  280. class AttrDict(dict):
  281. # hack to make a dict's getattr equivalent to its getitem
  282. def __getattr__(self, name):
  283. return self.get(name)
  284. class RootRPCInterface:
  285. def __init__(self, subinterfaces):
  286. for name, rpcinterface in subinterfaces:
  287. setattr(self, name, rpcinterface)
  288. class supervisor_xmlrpc_handler(xmlrpc_handler):
  289. path = '/RPC2'
  290. IDENT = 'Supervisor XML-RPC Handler'
  291. def __init__(self, supervisord, subinterfaces):
  292. self.rpcinterface = RootRPCInterface(subinterfaces)
  293. self.supervisord = supervisord
  294. if loads:
  295. self.loads = loads
  296. else:
  297. self.supervisord.options.logger.warn(
  298. 'cElementTree not installed, using slower XML parser for '
  299. 'XML-RPC'
  300. )
  301. self.loads = xmlrpclib.loads
  302. def match(self, request):
  303. return request.uri.startswith(self.path)
  304. def continue_request (self, data, request):
  305. logger = self.supervisord.options.logger
  306. try:
  307. params, method = self.loads(data)
  308. # no <methodName> in the request or name is an empty string
  309. if not method:
  310. logger.trace('XML-RPC request received with no method name')
  311. request.error(400)
  312. return
  313. # we allow xml-rpc clients that do not send empty <params>
  314. # when there are no parameters for the method call
  315. if params is None:
  316. params = ()
  317. try:
  318. logger.trace('XML-RPC method called: %s()' % method)
  319. value = self.call(method, params)
  320. # application-specific: instead of we never want to
  321. # marshal None (even though we could by saying allow_none=True
  322. # in dumps within xmlrpc_marshall), this is meant as
  323. # a debugging fixture, see issue 223.
  324. assert value is not None, (
  325. 'return value from method %r with params %r is None' %
  326. (method, params)
  327. )
  328. logger.trace('XML-RPC method %s() returned successfully' %
  329. method)
  330. except RPCError, err:
  331. # turn RPCError reported by method into a Fault instance
  332. value = xmlrpclib.Fault(err.code, err.text)
  333. logger.trace('XML-RPC method %s() returned fault: [%d] %s' % (
  334. method,
  335. err.code, err.text))
  336. if isinstance(value, types.FunctionType):
  337. # returning a function from an RPC method implies that
  338. # this needs to be a deferred response (it needs to block).
  339. pushproducer = request.channel.push_with_producer
  340. pushproducer(DeferredXMLRPCResponse(request, value))
  341. else:
  342. # if we get anything but a function, it implies that this
  343. # response doesn't need to be deferred, we can service it
  344. # right away.
  345. body = xmlrpc_marshal(value)
  346. request['Content-Type'] = 'text/xml'
  347. request['Content-Length'] = len(body)
  348. request.push(body)
  349. request.done()
  350. except:
  351. tb = traceback.format_exc()
  352. logger.critical(tb)
  353. # internal error, report as HTTP server error
  354. request.error(500)
  355. def call(self, method, params):
  356. return traverse(self.rpcinterface, method, params)
  357. def traverse(ob, method, params):
  358. path = method.split('.')
  359. for name in path:
  360. if name.startswith('_'):
  361. # security (don't allow things that start with an underscore to
  362. # be called remotely)
  363. raise RPCError(Faults.UNKNOWN_METHOD)
  364. ob = getattr(ob, name, None)
  365. if ob is None:
  366. raise RPCError(Faults.UNKNOWN_METHOD)
  367. try:
  368. return ob(*params)
  369. except TypeError:
  370. raise RPCError(Faults.INCORRECT_PARAMETERS)
  371. class SupervisorTransport(xmlrpclib.Transport):
  372. """
  373. Provides a Transport for xmlrpclib that uses
  374. httplib.HTTPConnection in order to support persistent
  375. connections. Also support basic auth and UNIX domain socket
  376. servers.
  377. """
  378. connection = None
  379. _use_datetime = 0 # python 2.5 fwd compatibility
  380. def __init__(self, username=None, password=None, serverurl=None):
  381. self.username = username
  382. self.password = password
  383. self.verbose = False
  384. self.serverurl = serverurl
  385. if serverurl.startswith('http://'):
  386. type, uri = urllib.splittype(serverurl)
  387. host, path = urllib.splithost(uri)
  388. host, port = urllib.splitport(host)
  389. if port is None:
  390. port = 80
  391. else:
  392. port = int(port)
  393. def get_connection(host=host, port=port):
  394. return httplib.HTTPConnection(host, port)
  395. self._get_connection = get_connection
  396. elif serverurl.startswith('unix://'):
  397. def get_connection(serverurl=serverurl):
  398. # we use 'localhost' here because domain names must be
  399. # < 64 chars (or we'd use the serverurl filename)
  400. conn = UnixStreamHTTPConnection('localhost')
  401. conn.socketfile = serverurl[7:]
  402. return conn
  403. self._get_connection = get_connection
  404. else:
  405. raise ValueError('Unknown protocol for serverurl %s' % serverurl)
  406. def request(self, host, handler, request_body, verbose=0):
  407. if not self.connection:
  408. self.connection = self._get_connection()
  409. self.headers = {
  410. "User-Agent" : self.user_agent,
  411. "Content-Type" : "text/xml",
  412. "Accept": "text/xml"
  413. }
  414. # basic auth
  415. if self.username is not None and self.password is not None:
  416. unencoded = "%s:%s" % (self.username, self.password)
  417. encoded = base64.encodestring(unencoded).replace('\n', '')
  418. self.headers["Authorization"] = "Basic %s" % encoded
  419. self.headers["Content-Length"] = str(len(request_body))
  420. self.connection.request('POST', handler, request_body, self.headers)
  421. r = self.connection.getresponse()
  422. if r.status != 200:
  423. self.connection.close()
  424. self.connection = None
  425. raise xmlrpclib.ProtocolError(host + handler,
  426. r.status,
  427. r.reason,
  428. '' )
  429. data = r.read()
  430. p, u = self.getparser()
  431. p.feed(data)
  432. p.close()
  433. return u.close()
  434. class UnixStreamHTTPConnection(httplib.HTTPConnection):
  435. def connect(self):
  436. self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  437. # we abuse the host parameter as the socketname
  438. self.sock.connect(self.socketfile)
  439. def gettags(comment):
  440. """ Parse documentation strings into JavaDoc-like tokens """
  441. tags = []
  442. tag = None
  443. datatype = None
  444. name = None
  445. tag_lineno = lineno = 0
  446. tag_text = []
  447. for line in comment.split('\n'):
  448. line = line.strip()
  449. if line.startswith("@"):
  450. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  451. parts = line.split(None, 3)
  452. if len(parts) == 1:
  453. datatype = ''
  454. name = ''
  455. tag_text = []
  456. elif len(parts) == 2:
  457. datatype = parts[1]
  458. name = ''
  459. tag_text = []
  460. elif len(parts) == 3:
  461. datatype = parts[1]
  462. name = parts[2]
  463. tag_text = []
  464. elif len(parts) == 4:
  465. datatype = parts[1]
  466. name = parts[2]
  467. tag_text = [parts[3].lstrip()]
  468. tag = parts[0][1:]
  469. tag_lineno = lineno
  470. else:
  471. if line:
  472. tag_text.append(line)
  473. lineno = lineno + 1
  474. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  475. return tags
  476. try:
  477. # Python 2.6 contains a version of cElementTree inside it.
  478. from xml.etree.ElementTree import iterparse
  479. except ImportError:
  480. try:
  481. # Failing that, try cElementTree instead.
  482. from cElementTree import iterparse
  483. except ImportError:
  484. iterparse = None
  485. if iterparse is not None:
  486. import datetime, time
  487. from base64 import decodestring
  488. def make_datetime(text):
  489. return datetime.datetime(
  490. *time.strptime(text, "%Y%m%dT%H:%M:%S")[:6]
  491. )
  492. unmarshallers = {
  493. "int": lambda x: int(x.text),
  494. "i4": lambda x: int(x.text),
  495. "boolean": lambda x: x.text == "1",
  496. "string": lambda x: x.text or "",
  497. "double": lambda x: float(x.text),
  498. "dateTime.iso8601": lambda x: make_datetime(x.text),
  499. "array": lambda x: x[0].text,
  500. "data": lambda x: [v.text for v in x],
  501. "struct": lambda x: dict([(k.text or "", v.text) for k, v in x]),
  502. "base64": lambda x: decodestring(x.text or ""),
  503. "param": lambda x: x[0].text,
  504. }
  505. def loads(data):
  506. params = method = None
  507. for action, elem in iterparse(StringIO(data)):
  508. unmarshal = unmarshallers.get(elem.tag)
  509. if unmarshal:
  510. data = unmarshal(elem)
  511. elem.clear()
  512. elem.text = data
  513. elif elem.tag == "value":
  514. try:
  515. data = elem[0].text
  516. except IndexError:
  517. data = elem.text or ""
  518. elem.clear()
  519. elem.text = data
  520. elif elem.tag == "methodName":
  521. method = elem.text
  522. elif elem.tag == "params":
  523. params = tuple([v.text for v in elem])
  524. return params, method
  525. else:
  526. loads = None