xmlrpc.py 20 KB


  1. import types
  2. import socket
  3. import xmlrpclib
  4. import httplib
  5. import urllib
  6. import re
  7. from cStringIO import StringIO
  8. import traceback
  9. import sys
  10. from supervisor.medusa.http_server import get_header
  11. from supervisor.medusa.xmlrpc_handler import xmlrpc_handler
  12. from supervisor.medusa import producers
  13. from supervisor.http import NOT_DONE_YET
  14. class Faults:
  15. UNKNOWN_METHOD = 1
  16. INCORRECT_PARAMETERS = 2
  17. BAD_ARGUMENTS = 3
  18. SIGNATURE_UNSUPPORTED = 4
  19. SHUTDOWN_STATE = 6
  20. BAD_NAME = 10
  21. NO_FILE = 20
  22. NOT_EXECUTABLE = 21
  23. FAILED = 30
  24. ABNORMAL_TERMINATION = 40
  25. SPAWN_ERROR = 50
  26. ALREADY_STARTED = 60
  27. NOT_RUNNING = 70
  28. SUCCESS = 80
  29. ALREADY_ADDED = 90
  30. STILL_RUNNING = 91
  31. CANT_REREAD = 92
  32. def getFaultDescription(code):
  33. for faultname in Faults.__dict__:
  34. if getattr(Faults, faultname) == code:
  35. return faultname
  36. return 'UNKNOWN'
  37. class RPCError(Exception):
  38. def __init__(self, code, extra=None):
  39. self.code = code
  40. self.text = getFaultDescription(code)
  41. if extra is not None:
  42. self.text = '%s: %s' % (self.text, extra)
  43. class DeferredXMLRPCResponse:
  44. """ A medusa producer that implements a deferred callback; requires
  45. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  46. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  47. def __init__(self, request, callback):
  48. self.callback = callback
  49. self.request = request
  50. self.finished = False
  51. self.delay = float(callback.delay)
  52. def more(self):
  53. if self.finished:
  54. return ''
  55. try:
  56. try:
  57. value = self.callback()
  58. if value is NOT_DONE_YET:
  59. return NOT_DONE_YET
  60. except RPCError, err:
  61. value = xmlrpclib.Fault(err.code, err.text)
  62. body = xmlrpc_marshal(value)
  63. self.finished = True
  64. return self.getresponse(body)
  65. except:
  66. # report unexpected exception back to server
  67. traceback.print_exc()
  68. self.finished = True
  69. self.request.error(500)
  70. def getresponse(self, body):
  71. self.request['Content-Type'] = 'text/xml'
  72. self.request['Content-Length'] = len(body)
  73. self.request.push(body)
  74. connection = get_header(self.CONNECTION, self.request.header)
  75. close_it = 0
  76. wrap_in_chunking = 0
  77. if self.request.version == '1.0':
  78. if connection == 'keep-alive':
  79. if not self.request.has_key ('Content-Length'):
  80. close_it = 1
  81. else:
  82. self.request['Connection'] = 'Keep-Alive'
  83. else:
  84. close_it = 1
  85. elif self.request.version == '1.1':
  86. if connection == 'close':
  87. close_it = 1
  88. elif not self.request.has_key ('Content-Length'):
  89. if self.request.has_key ('Transfer-Encoding'):
  90. if not self.request['Transfer-Encoding'] == 'chunked':
  91. close_it = 1
  92. elif self.request.use_chunked:
  93. self.request['Transfer-Encoding'] = 'chunked'
  94. wrap_in_chunking = 1
  95. else:
  96. close_it = 1
  97. elif self.request.version is None:
  98. close_it = 1
  99. outgoing_header = producers.simple_producer (
  100. self.request.build_reply_header())
  101. if close_it:
  102. self.request['Connection'] = 'close'
  103. if wrap_in_chunking:
  104. outgoing_producer = producers.chunked_producer (
  105. producers.composite_producer (self.request.outgoing)
  106. )
  107. # prepend the header
  108. outgoing_producer = producers.composite_producer(
  109. [outgoing_header, outgoing_producer]
  110. )
  111. else:
  112. # prepend the header
  113. self.request.outgoing.insert(0, outgoing_header)
  114. outgoing_producer = producers.composite_producer (
  115. self.request.outgoing)
  116. # apply a few final transformations to the output
  117. self.request.channel.push_with_producer (
  118. # globbing gives us large packets
  119. producers.globbing_producer (
  120. # hooking lets us log the number of bytes sent
  121. producers.hooked_producer (
  122. outgoing_producer,
  123. self.request.log
  124. )
  125. )
  126. )
  127. self.request.channel.current_request = None
  128. if close_it:
  129. self.request.channel.close_when_done()
  130. def xmlrpc_marshal(value):
  131. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  132. if ismethodresponse:
  133. if not isinstance(value, tuple):
  134. value = (value,)
  135. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  136. else:
  137. body = xmlrpclib.dumps(value)
  138. return body
  139. class SystemNamespaceRPCInterface:
  140. def __init__(self, namespaces):
  141. self.namespaces = {}
  142. for name, inst in namespaces:
  143. self.namespaces[name] = inst
  144. self.namespaces['system'] = self
  145. def _listMethods(self):
  146. methods = {}
  147. for ns_name in self.namespaces:
  148. namespace = self.namespaces[ns_name]
  149. for method_name in namespace.__class__.__dict__:
  150. # introspect; any methods that don't start with underscore
  151. # are published
  152. func = getattr(namespace, method_name)
  153. meth = getattr(func, 'im_func', None)
  154. if meth is not None:
  155. if not method_name.startswith('_'):
  156. sig = '%s.%s' % (ns_name, method_name)
  157. methods[sig] = str(func.__doc__)
  158. return methods
  159. def listMethods(self):
  160. """ Return an array listing the available method names
  161. @return array result An array of method names available (strings).
  162. """
  163. methods = self._listMethods()
  164. keys = methods.keys()
  165. keys.sort()
  166. return keys
  167. def methodHelp(self, name):
  168. """ Return a string showing the method's documentation
  169. @param string name The name of the method.
  170. @return string result The documentation for the method name.
  171. """
  172. methods = self._listMethods()
  173. for methodname in methods.keys():
  174. if methodname == name:
  175. return methods[methodname]
  176. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  177. def methodSignature(self, name):
  178. """ Return an array describing the method signature in the
  179. form [rtype, ptype, ptype...] where rtype is the return data type
  180. of the method, and ptypes are the parameter data types that the
  181. method accepts in method argument order.
  182. @param string name The name of the method.
  183. @return array result The result.
  184. """
  185. methods = self._listMethods()
  186. for method in methods:
  187. if method == name:
  188. rtype = None
  189. ptypes = []
  190. parsed = gettags(methods[method])
  191. for thing in parsed:
  192. if thing[1] == 'return': # tag name
  193. rtype = thing[2] # datatype
  194. elif thing[1] == 'param': # tag name
  195. ptypes.append(thing[2]) # datatype
  196. if rtype is None:
  197. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  198. return [rtype] + ptypes
  199. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  200. def multicall(self, calls):
  201. """Process an array of calls, and return an array of
  202. results. Calls should be structs of the form {'methodName':
  203. string, 'params': array}. Each result will either be a
  204. single-item array containg the result value, or a struct of
  205. the form {'faultCode': int, 'faultString': string}. This is
  206. useful when you need to make lots of small calls without lots
  207. of round trips.
  208. @param array calls An array of call requests
  209. @return array result An array of results
  210. """
  211. producers = []
  212. for call in calls:
  213. try:
  214. name = call['methodName']
  215. params = call.get('params', [])
  216. if name == 'system.multicall':
  217. # Recursive system.multicall forbidden
  218. raise RPCError(Faults.INCORRECT_PARAMETERS)
  219. root = AttrDict(self.namespaces)
  220. value = traverse(root, name, params)
  221. except RPCError, inst:
  222. value = {'faultCode': inst.code,
  223. 'faultString': inst.text}
  224. except:
  225. errmsg = "%s:%s" % (sys.exc_type, sys.exc_value)
  226. value = {'faultCode': 1, 'faultString': errmsg}
  227. producers.append(value)
  228. results = []
  229. def multiproduce():
  230. """ Run through all the producers in order """
  231. if not producers:
  232. return []
  233. callback = producers.pop(0)
  234. if isinstance(callback, types.FunctionType):
  235. try:
  236. value = callback()
  237. except RPCError, inst:
  238. value = {'faultCode':inst.code, 'faultString':inst.text}
  239. if value is NOT_DONE_YET:
  240. # push it back in the front of the queue because we
  241. # need to finish the calls in requested order
  242. producers.insert(0, callback)
  243. return NOT_DONE_YET
  244. else:
  245. value = callback
  246. results.append(value)
  247. if producers:
  248. # only finish when all producers are finished
  249. return NOT_DONE_YET
  250. return results
  251. multiproduce.delay = .05
  252. return multiproduce
  253. class AttrDict(dict):
  254. # hack to make a dict's getattr equivalent to its getitem
  255. def __getattr__(self, name):
  256. return self[name]
  257. class RootRPCInterface:
  258. def __init__(self, subinterfaces):
  259. for name, rpcinterface in subinterfaces:
  260. setattr(self, name, rpcinterface)
  261. class supervisor_xmlrpc_handler(xmlrpc_handler):
  262. path = '/RPC2'
  263. IDENT = 'Supervisor XML-RPC Handler'
  264. def __init__(self, supervisord, subinterfaces):
  265. self.rpcinterface = RootRPCInterface(subinterfaces)
  266. self.supervisord = supervisord
  267. if loads:
  268. self.loads = loads
  269. else:
  270. self.supervisord.options.logger.warn(
  271. 'cElementTree not installed, using slower XML parser for '
  272. 'XML-RPC'
  273. )
  274. self.loads = xmlrpclib.loads
  275. def match(self, request):
  276. return request.uri.startswith(self.path)
  277. def continue_request (self, data, request):
  278. logger = self.supervisord.options.logger
  279. try:
  280. params, method = self.loads(data)
  281. # no <methodName> in the request or name is an empty string
  282. if not method:
  283. logger.trace('XML-RPC request received with no method name')
  284. request.error(400)
  285. return
  286. # we allow xml-rpc clients that do not send empty <params>
  287. # when there are no parameters for the method call
  288. if params is None:
  289. params = ()
  290. try:
  291. logger.trace('XML-RPC method called: %s()' % method)
  292. value = self.call(method, params)
  293. # application-specific: instead of we never want to
  294. # marshal None (even though we could by saying allow_none=True
  295. # in dumps within xmlrpc_marshall), this is meant as
  296. # a debugging fixture, see issue 223.
  297. assert value is not None, (
  298. 'return value from method %r with params %r is None' %
  299. (method, params)
  300. )
  301. logger.trace('XML-RPC method %s() returned successfully' %
  302. method)
  303. except RPCError, err:
  304. # turn RPCError reported by method into a Fault instance
  305. value = xmlrpclib.Fault(err.code, err.text)
  306. logger.trace('XML-RPC method %s() returned fault: [%d] %s' % (
  307. method,
  308. err.code, err.text))
  309. if isinstance(value, types.FunctionType):
  310. # returning a function from an RPC method implies that
  311. # this needs to be a deferred response (it needs to block).
  312. pushproducer = request.channel.push_with_producer
  313. pushproducer(DeferredXMLRPCResponse(request, value))
  314. else:
  315. # if we get anything but a function, it implies that this
  316. # response doesn't need to be deferred, we can service it
  317. # right away.
  318. body = xmlrpc_marshal(value)
  319. request['Content-Type'] = 'text/xml'
  320. request['Content-Length'] = len(body)
  321. request.push(body)
  322. request.done()
  323. except:
  324. io = StringIO()
  325. traceback.print_exc(file=io)
  326. val = io.getvalue()
  327. logger.critical(val)
  328. # internal error, report as HTTP server error
  329. request.error(500)
  330. def call(self, method, params):
  331. return traverse(self.rpcinterface, method, params)
  332. def traverse(ob, method, params):
  333. path = method.split('.')
  334. for name in path:
  335. if name.startswith('_'):
  336. # security (don't allow things that start with an underscore to
  337. # be called remotely)
  338. raise RPCError(Faults.UNKNOWN_METHOD)
  339. ob = getattr(ob, name, None)
  340. if ob is None:
  341. raise RPCError(Faults.UNKNOWN_METHOD)
  342. try:
  343. return ob(*params)
  344. except TypeError:
  345. raise RPCError(Faults.INCORRECT_PARAMETERS)
  346. class SupervisorTransport(xmlrpclib.Transport):
  347. """
  348. Provides a Transport for xmlrpclib that uses
  349. httplib.HTTPConnection in order to support persistent
  350. connections. Also support basic auth and UNIX domain socket
  351. servers.
  352. """
  353. connection = None
  354. _use_datetime = 0 # python 2.5 fwd compatibility
  355. def __init__(self, username=None, password=None, serverurl=None):
  356. self.username = username
  357. self.password = password
  358. self.verbose = False
  359. self.serverurl = serverurl
  360. if serverurl.startswith('http://'):
  361. type, uri = urllib.splittype(serverurl)
  362. host, path = urllib.splithost(uri)
  363. host, port = urllib.splitport(host)
  364. if port is None:
  365. port = 80
  366. else:
  367. port = int(port)
  368. def get_connection(host=host, port=port):
  369. return httplib.HTTPConnection(host, port)
  370. self._get_connection = get_connection
  371. elif serverurl.startswith('unix://'):
  372. def get_connection(serverurl=serverurl):
  373. # we use 'localhost' here because domain names must be
  374. # < 64 chars (or we'd use the serverurl filename)
  375. conn = UnixStreamHTTPConnection('localhost')
  376. conn.socketfile = serverurl[7:]
  377. return conn
  378. self._get_connection = get_connection
  379. else:
  380. raise ValueError('Unknown protocol for serverurl %s' % serverurl)
  381. def request(self, host, handler, request_body, verbose=0):
  382. if not self.connection:
  383. self.connection = self._get_connection()
  384. self.headers = {
  385. "User-Agent" : self.user_agent,
  386. "Content-Type" : "text/xml",
  387. "Accept": "text/xml"
  388. }
  389. # basic auth
  390. if self.username is not None and self.password is not None:
  391. unencoded = "%s:%s" % (self.username, self.password)
  392. encoded = unencoded.encode('base64')
  393. encoded = encoded.replace('\012', '')
  394. self.headers["Authorization"] = "Basic %s" % encoded
  395. self.headers["Content-Length"] = str(len(request_body))
  396. self.connection.request('POST', handler, request_body, self.headers)
  397. r = self.connection.getresponse()
  398. if r.status != 200:
  399. self.connection.close()
  400. self.connection = None
  401. raise xmlrpclib.ProtocolError(host + handler,
  402. r.status,
  403. r.reason,
  404. '' )
  405. data = r.read()
  406. p, u = self.getparser()
  407. p.feed(data)
  408. p.close()
  409. return u.close()
  410. class UnixStreamHTTPConnection(httplib.HTTPConnection):
  411. def connect(self):
  412. self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  413. # we abuse the host parameter as the socketname
  414. self.sock.connect(self.socketfile)
  415. def gettags(comment):
  416. """ Parse documentation strings into JavaDoc-like tokens """
  417. tags = []
  418. tag = None
  419. datatype = None
  420. name = None
  421. tag_lineno = lineno = 0
  422. tag_text = []
  423. for line in comment.split('\n'):
  424. line = line.strip()
  425. if line.startswith("@"):
  426. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  427. parts = line.split(None, 3)
  428. if len(parts) == 1:
  429. datatype = ''
  430. name = ''
  431. tag_text = []
  432. elif len(parts) == 2:
  433. datatype = parts[1]
  434. name = ''
  435. tag_text = []
  436. elif len(parts) == 3:
  437. datatype = parts[1]
  438. name = parts[2]
  439. tag_text = []
  440. elif len(parts) == 4:
  441. datatype = parts[1]
  442. name = parts[2]
  443. tag_text = [parts[3].lstrip()]
  444. tag = parts[0][1:]
  445. tag_lineno = lineno
  446. else:
  447. if line:
  448. tag_text.append(line)
  449. lineno = lineno + 1
  450. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  451. return tags
  452. try:
  453. # Python 2.6 contains a version of cElementTree inside it.
  454. from xml.etree.ElementTree import iterparse
  455. except ImportError:
  456. try:
  457. # Failing that, try cElementTree instead.
  458. from cElementTree import iterparse
  459. except ImportError:
  460. iterparse = None
  461. if iterparse is not None:
  462. import datetime, time
  463. from base64 import decodestring
  464. def make_datetime(text):
  465. return datetime.datetime(
  466. *time.strptime(text, "%Y%m%dT%H:%M:%S")[:6]
  467. )
  468. unmarshallers = {
  469. "int": lambda x: int(x.text),
  470. "i4": lambda x: int(x.text),
  471. "boolean": lambda x: x.text == "1",
  472. "string": lambda x: x.text or "",
  473. "double": lambda x: float(x.text),
  474. "dateTime.iso8601": lambda x: make_datetime(x.text),
  475. "array": lambda x: [v.text for v in x],
  476. "data": lambda x: x[0].text,
  477. "struct": lambda x: dict([(k.text or "", v.text) for k, v in x]),
  478. "base64": lambda x: decodestring(x.text or ""),
  479. "value": lambda x: x[0].text,
  480. "param": lambda x: x[0].text,
  481. }
  482. def loads(data):
  483. params = method = None
  484. for action, elem in iterparse(StringIO(data)):
  485. unmarshal = unmarshallers.get(elem.tag)
  486. if unmarshal:
  487. data = unmarshal(elem)
  488. elem.clear()
  489. elem.text = data
  490. elif elem.tag == "methodName":
  491. method = elem.text
  492. elif elem.tag == "params":
  493. params = tuple([v.text for v in elem])
  494. return params, method
  495. else:
  496. loads = None