xmlrpc.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. import types
  2. import re
  3. import traceback
  4. import socket
  5. import sys
  6. import datetime
  7. import time
  8. from xml.etree.ElementTree import iterparse
  9. from supervisor.compat import xmlrpclib
  10. from supervisor.compat import func_attribute
  11. from supervisor.compat import StringIO
  12. from supervisor.compat import urllib
  13. from supervisor.compat import as_bytes
  14. from supervisor.compat import as_string
  15. from supervisor.compat import encodestring
  16. from supervisor.compat import decodestring
  17. from supervisor.compat import httplib
  18. from supervisor.medusa.http_server import get_header
  19. from supervisor.medusa.xmlrpc_handler import xmlrpc_handler
  20. from supervisor.medusa import producers
  21. from supervisor.medusa import text_socket
  22. from supervisor.http import NOT_DONE_YET
  23. class Faults:
  24. UNKNOWN_METHOD = 1
  25. INCORRECT_PARAMETERS = 2
  26. BAD_ARGUMENTS = 3
  27. SIGNATURE_UNSUPPORTED = 4
  28. SHUTDOWN_STATE = 6
  29. BAD_NAME = 10
  30. BAD_SIGNAL = 11
  31. NO_FILE = 20
  32. NOT_EXECUTABLE = 21
  33. FAILED = 30
  34. ABNORMAL_TERMINATION = 40
  35. SPAWN_ERROR = 50
  36. ALREADY_STARTED = 60
  37. NOT_RUNNING = 70
  38. SUCCESS = 80
  39. ALREADY_ADDED = 90
  40. STILL_RUNNING = 91
  41. CANT_REREAD = 92
  42. DEAD_PROGRAM_FAULTS = (Faults.SPAWN_ERROR,
  43. Faults.ABNORMAL_TERMINATION,
  44. Faults.NOT_RUNNING)
  45. def getFaultDescription(code):
  46. for faultname in Faults.__dict__:
  47. if getattr(Faults, faultname) == code:
  48. return faultname
  49. return 'UNKNOWN'
  50. class RPCError(Exception):
  51. def __init__(self, code, extra=None):
  52. self.code = code
  53. self.text = getFaultDescription(code)
  54. if extra is not None:
  55. self.text = '%s: %s' % (self.text, extra)
  56. def __str__(self):
  57. return 'code=%r, text=%r' % (self.code, self.text)
  58. class DeferredXMLRPCResponse:
  59. """ A medusa producer that implements a deferred callback; requires
  60. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  61. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  62. def __init__(self, request, callback):
  63. self.callback = callback
  64. self.request = request
  65. self.finished = False
  66. self.delay = float(callback.delay)
  67. def more(self):
  68. if self.finished:
  69. return ''
  70. try:
  71. try:
  72. value = self.callback()
  73. if value is NOT_DONE_YET:
  74. return NOT_DONE_YET
  75. except RPCError as err:
  76. value = xmlrpclib.Fault(err.code, err.text)
  77. body = xmlrpc_marshal(value)
  78. self.finished = True
  79. return self.getresponse(body)
  80. except:
  81. tb = traceback.format_exc()
  82. self.request.channel.server.logger.log(
  83. "XML-RPC response callback error", tb
  84. )
  85. self.finished = True
  86. self.request.error(500)
  87. def getresponse(self, body):
  88. self.request['Content-Type'] = 'text/xml'
  89. self.request['Content-Length'] = len(body)
  90. self.request.push(body)
  91. connection = get_header(self.CONNECTION, self.request.header)
  92. close_it = 0
  93. if self.request.version == '1.0':
  94. if connection == 'keep-alive':
  95. self.request['Connection'] = 'Keep-Alive'
  96. else:
  97. close_it = 1
  98. elif self.request.version == '1.1':
  99. if connection == 'close':
  100. close_it = 1
  101. elif self.request.version is None:
  102. close_it = 1
  103. outgoing_header = producers.simple_producer (
  104. self.request.build_reply_header())
  105. if close_it:
  106. self.request['Connection'] = 'close'
  107. # prepend the header
  108. self.request.outgoing.insert(0, outgoing_header)
  109. outgoing_producer = producers.composite_producer(self.request.outgoing)
  110. # apply a few final transformations to the output
  111. self.request.channel.push_with_producer (
  112. # globbing gives us large packets
  113. producers.globbing_producer (
  114. # hooking lets us log the number of bytes sent
  115. producers.hooked_producer (
  116. outgoing_producer,
  117. self.request.log
  118. )
  119. )
  120. )
  121. self.request.channel.current_request = None
  122. if close_it:
  123. self.request.channel.close_when_done()
  124. def xmlrpc_marshal(value):
  125. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  126. if ismethodresponse:
  127. if not isinstance(value, tuple):
  128. value = (value,)
  129. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  130. else:
  131. body = xmlrpclib.dumps(value)
  132. return body
  133. class SystemNamespaceRPCInterface:
  134. def __init__(self, namespaces):
  135. self.namespaces = {}
  136. for name, inst in namespaces:
  137. self.namespaces[name] = inst
  138. self.namespaces['system'] = self
  139. def _listMethods(self):
  140. methods = {}
  141. for ns_name in self.namespaces:
  142. namespace = self.namespaces[ns_name]
  143. for method_name in namespace.__class__.__dict__:
  144. # introspect; any methods that don't start with underscore
  145. # are published
  146. func = getattr(namespace, method_name)
  147. meth = getattr(func, func_attribute, None)
  148. if meth is not None:
  149. if not method_name.startswith('_'):
  150. sig = '%s.%s' % (ns_name, method_name)
  151. methods[sig] = str(func.__doc__)
  152. return methods
  153. def listMethods(self):
  154. """ Return an array listing the available method names
  155. @return array result An array of method names available (strings).
  156. """
  157. methods = self._listMethods()
  158. keys = list(methods.keys())
  159. keys.sort()
  160. return keys
  161. def methodHelp(self, name):
  162. """ Return a string showing the method's documentation
  163. @param string name The name of the method.
  164. @return string result The documentation for the method name.
  165. """
  166. methods = self._listMethods()
  167. for methodname in methods.keys():
  168. if methodname == name:
  169. return methods[methodname]
  170. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  171. def methodSignature(self, name):
  172. """ Return an array describing the method signature in the
  173. form [rtype, ptype, ptype...] where rtype is the return data type
  174. of the method, and ptypes are the parameter data types that the
  175. method accepts in method argument order.
  176. @param string name The name of the method.
  177. @return array result The result.
  178. """
  179. methods = self._listMethods()
  180. for method in methods:
  181. if method == name:
  182. rtype = None
  183. ptypes = []
  184. parsed = gettags(methods[method])
  185. for thing in parsed:
  186. if thing[1] == 'return': # tag name
  187. rtype = thing[2] # datatype
  188. elif thing[1] == 'param': # tag name
  189. ptypes.append(thing[2]) # datatype
  190. if rtype is None:
  191. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  192. return [rtype] + ptypes
  193. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  194. def multicall(self, calls):
  195. """Process an array of calls, and return an array of
  196. results. Calls should be structs of the form {'methodName':
  197. string, 'params': array}. Each result will either be a
  198. single-item array containing the result value, or a struct of
  199. the form {'faultCode': int, 'faultString': string}. This is
  200. useful when you need to make lots of small calls without lots
  201. of round trips.
  202. @param array calls An array of call requests
  203. @return array result An array of results
  204. """
  205. remaining_calls = calls[:] # [{'methodName':x, 'params':x}, ...]
  206. callbacks = [] # always empty or 1 callback function only
  207. results = [] # results of completed calls
  208. # args are only to fool scoping and are never passed by caller
  209. def multi(remaining_calls=remaining_calls,
  210. callbacks=callbacks,
  211. results=results):
  212. # if waiting on a callback, call it, then remove it if it's done
  213. if callbacks:
  214. try:
  215. value = callbacks[0]()
  216. except RPCError as exc:
  217. value = {'faultCode': exc.code,
  218. 'faultString': exc.text}
  219. except:
  220. info = sys.exc_info()
  221. errmsg = "%s:%s" % (info[0], info[1])
  222. value = {'faultCode': Faults.FAILED,
  223. 'faultString': 'FAILED: ' + errmsg}
  224. if value is not NOT_DONE_YET:
  225. callbacks.pop(0)
  226. results.append(value)
  227. # if we don't have a callback now, pop calls and call them in
  228. # order until one returns a callback.
  229. while (not callbacks) and remaining_calls:
  230. call = remaining_calls.pop(0)
  231. name = call.get('methodName', None)
  232. params = call.get('params', [])
  233. try:
  234. if name is None:
  235. raise RPCError(Faults.INCORRECT_PARAMETERS,
  236. 'No methodName')
  237. if name == 'system.multicall':
  238. raise RPCError(Faults.INCORRECT_PARAMETERS,
  239. 'Recursive system.multicall forbidden')
  240. # make the call, may return a callback or not
  241. root = AttrDict(self.namespaces)
  242. value = traverse(root, name, params)
  243. except RPCError as exc:
  244. value = {'faultCode': exc.code,
  245. 'faultString': exc.text}
  246. except:
  247. info = sys.exc_info()
  248. errmsg = "%s:%s" % (info[0], info[1])
  249. value = {'faultCode': Faults.FAILED,
  250. 'faultString': 'FAILED: ' + errmsg}
  251. if isinstance(value, types.FunctionType):
  252. callbacks.append(value)
  253. else:
  254. results.append(value)
  255. # we are done when there's no callback and no more calls queued
  256. if callbacks or remaining_calls:
  257. return NOT_DONE_YET
  258. else:
  259. return results
  260. multi.delay = 0.05
  261. # optimization: multi() is called here instead of just returning
  262. # multi in case all calls complete and we can return with no delay.
  263. value = multi()
  264. if value is NOT_DONE_YET:
  265. return multi
  266. else:
  267. return value
  268. class AttrDict(dict):
  269. # hack to make a dict's getattr equivalent to its getitem
  270. def __getattr__(self, name):
  271. return self.get(name)
  272. class RootRPCInterface:
  273. def __init__(self, subinterfaces):
  274. for name, rpcinterface in subinterfaces:
  275. setattr(self, name, rpcinterface)
  276. def capped_int(value):
  277. i = int(value)
  278. if i < xmlrpclib.MININT:
  279. i = xmlrpclib.MININT
  280. elif i > xmlrpclib.MAXINT:
  281. i = xmlrpclib.MAXINT
  282. return i
  283. def make_datetime(text):
  284. return datetime.datetime(
  285. *time.strptime(text, "%Y%m%dT%H:%M:%S")[:6]
  286. )
  287. class supervisor_xmlrpc_handler(xmlrpc_handler):
  288. path = '/RPC2'
  289. IDENT = 'Supervisor XML-RPC Handler'
  290. unmarshallers = {
  291. "int": lambda x: int(x.text),
  292. "i4": lambda x: int(x.text),
  293. "boolean": lambda x: x.text == "1",
  294. "string": lambda x: x.text or "",
  295. "double": lambda x: float(x.text),
  296. "dateTime.iso8601": lambda x: make_datetime(x.text),
  297. "array": lambda x: x[0].text,
  298. "data": lambda x: [v.text for v in x],
  299. "struct": lambda x: dict([(k.text or "", v.text) for k, v in x]),
  300. "base64": lambda x: as_string(decodestring(as_bytes(x.text or ""))),
  301. "param": lambda x: x[0].text,
  302. }
  303. def __init__(self, supervisord, subinterfaces):
  304. self.rpcinterface = RootRPCInterface(subinterfaces)
  305. self.supervisord = supervisord
  306. def loads(self, data):
  307. params = method = None
  308. for action, elem in iterparse(StringIO(data)):
  309. unmarshall = self.unmarshallers.get(elem.tag)
  310. if unmarshall:
  311. data = unmarshall(elem)
  312. elem.clear()
  313. elem.text = data
  314. elif elem.tag == "value":
  315. try:
  316. data = elem[0].text
  317. except IndexError:
  318. data = elem.text or ""
  319. elem.clear()
  320. elem.text = data
  321. elif elem.tag == "methodName":
  322. method = elem.text
  323. elif elem.tag == "params":
  324. params = tuple([v.text for v in elem])
  325. return params, method
  326. def match(self, request):
  327. return request.uri.startswith(self.path)
  328. def continue_request(self, data, request):
  329. logger = self.supervisord.options.logger
  330. try:
  331. try:
  332. params, method = self.loads(data)
  333. except:
  334. logger.error(
  335. 'XML-RPC request data %r is invalid: unmarshallable' %
  336. (data,)
  337. )
  338. request.error(400)
  339. return
  340. # no <methodName> in the request or name is an empty string
  341. if not method:
  342. logger.error(
  343. 'XML-RPC request data %r is invalid: no method name' %
  344. (data,)
  345. )
  346. request.error(400)
  347. return
  348. # we allow xml-rpc clients that do not send empty <params>
  349. # when there are no parameters for the method call
  350. if params is None:
  351. params = ()
  352. try:
  353. logger.trace('XML-RPC method called: %s()' % method)
  354. value = self.call(method, params)
  355. logger.trace('XML-RPC method %s() returned successfully' %
  356. method)
  357. except RPCError as err:
  358. # turn RPCError reported by method into a Fault instance
  359. value = xmlrpclib.Fault(err.code, err.text)
  360. logger.trace('XML-RPC method %s() returned fault: [%d] %s' % (
  361. method,
  362. err.code, err.text))
  363. if isinstance(value, types.FunctionType):
  364. # returning a function from an RPC method implies that
  365. # this needs to be a deferred response (it needs to block).
  366. pushproducer = request.channel.push_with_producer
  367. pushproducer(DeferredXMLRPCResponse(request, value))
  368. else:
  369. # if we get anything but a function, it implies that this
  370. # response doesn't need to be deferred, we can service it
  371. # right away.
  372. body = xmlrpc_marshal(value)
  373. request['Content-Type'] = 'text/xml'
  374. request['Content-Length'] = len(body)
  375. request.push(body)
  376. request.done()
  377. except:
  378. tb = traceback.format_exc()
  379. logger.critical(
  380. "Handling XML-RPC request with data %r raised an unexpected "
  381. "exception: %s" % (data, tb)
  382. )
  383. # internal error, report as HTTP server error
  384. request.error(500)
  385. def call(self, method, params):
  386. return traverse(self.rpcinterface, method, params)
  387. def traverse(ob, method, params):
  388. path = method.split('.')
  389. for name in path:
  390. if name.startswith('_'):
  391. # security (don't allow things that start with an underscore to
  392. # be called remotely)
  393. raise RPCError(Faults.UNKNOWN_METHOD)
  394. ob = getattr(ob, name, None)
  395. if ob is None:
  396. raise RPCError(Faults.UNKNOWN_METHOD)
  397. try:
  398. return ob(*params)
  399. except TypeError:
  400. raise RPCError(Faults.INCORRECT_PARAMETERS)
  401. class SupervisorTransport(xmlrpclib.Transport):
  402. """
  403. Provides a Transport for xmlrpclib that uses
  404. httplib.HTTPConnection in order to support persistent
  405. connections. Also support basic auth and UNIX domain socket
  406. servers.
  407. """
  408. connection = None
  409. def __init__(self, username=None, password=None, serverurl=None):
  410. xmlrpclib.Transport.__init__(self)
  411. self.username = username
  412. self.password = password
  413. self.verbose = False
  414. self.serverurl = serverurl
  415. if serverurl.startswith('http://'):
  416. type, uri = urllib.splittype(serverurl)
  417. host, path = urllib.splithost(uri)
  418. host, port = urllib.splitport(host)
  419. if port is None:
  420. port = 80
  421. else:
  422. port = int(port)
  423. def get_connection(host=host, port=port):
  424. return httplib.HTTPConnection(host, port)
  425. self._get_connection = get_connection
  426. elif serverurl.startswith('unix://'):
  427. def get_connection(serverurl=serverurl):
  428. # we use 'localhost' here because domain names must be
  429. # < 64 chars (or we'd use the serverurl filename)
  430. conn = UnixStreamHTTPConnection('localhost')
  431. conn.socketfile = serverurl[7:]
  432. return conn
  433. self._get_connection = get_connection
  434. else:
  435. raise ValueError('Unknown protocol for serverurl %s' % serverurl)
  436. def request(self, host, handler, request_body, verbose=0):
  437. if not self.connection:
  438. self.connection = self._get_connection()
  439. self.headers = {
  440. "User-Agent" : self.user_agent,
  441. "Content-Type" : "text/xml",
  442. "Accept": "text/xml"
  443. }
  444. # basic auth
  445. if self.username is not None and self.password is not None:
  446. unencoded = "%s:%s" % (self.username, self.password)
  447. encoded = as_string(encodestring(as_bytes(unencoded)))
  448. encoded = encoded.replace('\n', '')
  449. encoded = encoded.replace('\012', '')
  450. self.headers["Authorization"] = "Basic %s" % encoded
  451. self.headers["Content-Length"] = str(len(request_body))
  452. self.connection.request('POST', handler, request_body, self.headers)
  453. r = self.connection.getresponse()
  454. if r.status != 200:
  455. self.connection.close()
  456. self.connection = None
  457. raise xmlrpclib.ProtocolError(host + handler,
  458. r.status,
  459. r.reason,
  460. '' )
  461. data = r.read()
  462. p, u = self.getparser()
  463. p.feed(data)
  464. p.close()
  465. return u.close()
  466. class UnixStreamHTTPConnection(httplib.HTTPConnection):
  467. def connect(self): # pragma: no cover
  468. self.sock = text_socket.text_socket(socket.AF_UNIX, socket.SOCK_STREAM)
  469. # we abuse the host parameter as the socketname
  470. self.sock.connect(self.socketfile)
  471. def gettags(comment):
  472. """ Parse documentation strings into JavaDoc-like tokens """
  473. tags = []
  474. tag = None
  475. datatype = None
  476. name = None
  477. tag_lineno = lineno = 0
  478. tag_text = []
  479. for line in comment.split('\n'):
  480. line = line.strip()
  481. if line.startswith("@"):
  482. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  483. parts = line.split(None, 3)
  484. if len(parts) == 1:
  485. datatype = ''
  486. name = ''
  487. tag_text = []
  488. elif len(parts) == 2:
  489. datatype = parts[1]
  490. name = ''
  491. tag_text = []
  492. elif len(parts) == 3:
  493. datatype = parts[1]
  494. name = parts[2]
  495. tag_text = []
  496. elif len(parts) == 4:
  497. datatype = parts[1]
  498. name = parts[2]
  499. tag_text = [parts[3].lstrip()]
  500. tag = parts[0][1:]
  501. tag_lineno = lineno
  502. else:
  503. if line:
  504. tag_text.append(line)
  505. lineno += 1
  506. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  507. return tags