xmlrpc.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. import types
  2. import re
  3. import traceback
  4. import sys
  5. import datetime
  6. import time
  7. from xml.etree.ElementTree import iterparse
  8. from supervisor.compat import xmlrpclib
  9. from supervisor.compat import func_attribute
  10. from supervisor.compat import StringIO
  11. from supervisor.compat import urllib
  12. from supervisor.compat import as_bytes
  13. from supervisor.compat import as_string
  14. from supervisor.compat import encodestring
  15. from supervisor.compat import decodestring
  16. from supervisor.compat import httplib
  17. import supervisor.medusa.text_socket as socket
  18. from supervisor.medusa.http_server import get_header
  19. from supervisor.medusa.xmlrpc_handler import xmlrpc_handler
  20. from supervisor.medusa import producers
  21. from supervisor.http import NOT_DONE_YET
  22. class Faults:
  23. UNKNOWN_METHOD = 1
  24. INCORRECT_PARAMETERS = 2
  25. BAD_ARGUMENTS = 3
  26. SIGNATURE_UNSUPPORTED = 4
  27. SHUTDOWN_STATE = 6
  28. BAD_NAME = 10
  29. BAD_SIGNAL = 11
  30. NO_FILE = 20
  31. NOT_EXECUTABLE = 21
  32. FAILED = 30
  33. ABNORMAL_TERMINATION = 40
  34. SPAWN_ERROR = 50
  35. ALREADY_STARTED = 60
  36. NOT_RUNNING = 70
  37. SUCCESS = 80
  38. ALREADY_ADDED = 90
  39. STILL_RUNNING = 91
  40. CANT_REREAD = 92
  41. def getFaultDescription(code):
  42. for faultname in Faults.__dict__:
  43. if getattr(Faults, faultname) == code:
  44. return faultname
  45. return 'UNKNOWN'
  46. class RPCError(Exception):
  47. def __init__(self, code, extra=None):
  48. self.code = code
  49. self.text = getFaultDescription(code)
  50. if extra is not None:
  51. self.text = '%s: %s' % (self.text, extra)
  52. class DeferredXMLRPCResponse:
  53. """ A medusa producer that implements a deferred callback; requires
  54. a subclass of asynchat.async_chat that handles NOT_DONE_YET sentinel """
  55. CONNECTION = re.compile ('Connection: (.*)', re.IGNORECASE)
  56. traceback = traceback # for testing override
  57. def __init__(self, request, callback):
  58. self.callback = callback
  59. self.request = request
  60. self.finished = False
  61. self.delay = float(callback.delay)
  62. def more(self):
  63. if self.finished:
  64. return ''
  65. try:
  66. try:
  67. value = self.callback()
  68. if value is NOT_DONE_YET:
  69. return NOT_DONE_YET
  70. except RPCError as err:
  71. value = xmlrpclib.Fault(err.code, err.text)
  72. body = xmlrpc_marshal(value)
  73. self.finished = True
  74. return self.getresponse(body)
  75. except:
  76. # report unexpected exception back to server
  77. self.traceback.print_exc()
  78. self.finished = True
  79. self.request.error(500)
  80. def getresponse(self, body):
  81. self.request['Content-Type'] = 'text/xml'
  82. self.request['Content-Length'] = len(body)
  83. self.request.push(body)
  84. connection = get_header(self.CONNECTION, self.request.header)
  85. close_it = 0
  86. if self.request.version == '1.0':
  87. if connection == 'keep-alive':
  88. self.request['Connection'] = 'Keep-Alive'
  89. else:
  90. close_it = 1
  91. elif self.request.version == '1.1':
  92. if connection == 'close':
  93. close_it = 1
  94. elif self.request.version is None:
  95. close_it = 1
  96. outgoing_header = producers.simple_producer (
  97. self.request.build_reply_header())
  98. if close_it:
  99. self.request['Connection'] = 'close'
  100. # prepend the header
  101. self.request.outgoing.insert(0, outgoing_header)
  102. outgoing_producer = producers.composite_producer(self.request.outgoing)
  103. # apply a few final transformations to the output
  104. self.request.channel.push_with_producer (
  105. # globbing gives us large packets
  106. producers.globbing_producer (
  107. # hooking lets us log the number of bytes sent
  108. producers.hooked_producer (
  109. outgoing_producer,
  110. self.request.log
  111. )
  112. )
  113. )
  114. self.request.channel.current_request = None
  115. if close_it:
  116. self.request.channel.close_when_done()
  117. def xmlrpc_marshal(value):
  118. ismethodresponse = not isinstance(value, xmlrpclib.Fault)
  119. if ismethodresponse:
  120. if not isinstance(value, tuple):
  121. value = (value,)
  122. body = xmlrpclib.dumps(value, methodresponse=ismethodresponse)
  123. else:
  124. body = xmlrpclib.dumps(value)
  125. return body
  126. class SystemNamespaceRPCInterface:
  127. def __init__(self, namespaces):
  128. self.namespaces = {}
  129. for name, inst in namespaces:
  130. self.namespaces[name] = inst
  131. self.namespaces['system'] = self
  132. def _listMethods(self):
  133. methods = {}
  134. for ns_name in self.namespaces:
  135. namespace = self.namespaces[ns_name]
  136. for method_name in namespace.__class__.__dict__:
  137. # introspect; any methods that don't start with underscore
  138. # are published
  139. func = getattr(namespace, method_name)
  140. meth = getattr(func, func_attribute, None)
  141. if meth is not None:
  142. if not method_name.startswith('_'):
  143. sig = '%s.%s' % (ns_name, method_name)
  144. methods[sig] = str(func.__doc__)
  145. return methods
  146. def listMethods(self):
  147. """ Return an array listing the available method names
  148. @return array result An array of method names available (strings).
  149. """
  150. methods = self._listMethods()
  151. keys = list(methods.keys())
  152. keys.sort()
  153. return keys
  154. def methodHelp(self, name):
  155. """ Return a string showing the method's documentation
  156. @param string name The name of the method.
  157. @return string result The documentation for the method name.
  158. """
  159. methods = self._listMethods()
  160. for methodname in methods.keys():
  161. if methodname == name:
  162. return methods[methodname]
  163. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  164. def methodSignature(self, name):
  165. """ Return an array describing the method signature in the
  166. form [rtype, ptype, ptype...] where rtype is the return data type
  167. of the method, and ptypes are the parameter data types that the
  168. method accepts in method argument order.
  169. @param string name The name of the method.
  170. @return array result The result.
  171. """
  172. methods = self._listMethods()
  173. for method in methods:
  174. if method == name:
  175. rtype = None
  176. ptypes = []
  177. parsed = gettags(methods[method])
  178. for thing in parsed:
  179. if thing[1] == 'return': # tag name
  180. rtype = thing[2] # datatype
  181. elif thing[1] == 'param': # tag name
  182. ptypes.append(thing[2]) # datatype
  183. if rtype is None:
  184. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  185. return [rtype] + ptypes
  186. raise RPCError(Faults.SIGNATURE_UNSUPPORTED)
  187. def multicall(self, calls):
  188. """Process an array of calls, and return an array of
  189. results. Calls should be structs of the form {'methodName':
  190. string, 'params': array}. Each result will either be a
  191. single-item array containing the result value, or a struct of
  192. the form {'faultCode': int, 'faultString': string}. This is
  193. useful when you need to make lots of small calls without lots
  194. of round trips.
  195. @param array calls An array of call requests
  196. @return array result An array of results
  197. """
  198. producers = []
  199. for call in calls:
  200. try:
  201. name = call['methodName']
  202. params = call.get('params', [])
  203. if name == 'system.multicall':
  204. # Recursive system.multicall forbidden
  205. raise RPCError(Faults.INCORRECT_PARAMETERS)
  206. root = AttrDict(self.namespaces)
  207. value = traverse(root, name, params)
  208. except RPCError as inst:
  209. value = {'faultCode': inst.code,
  210. 'faultString': inst.text}
  211. except:
  212. errmsg = "%s:%s" % (sys.exc_info()[0], sys.exc_info()[1])
  213. value = {'faultCode': 1, 'faultString': errmsg}
  214. producers.append(value)
  215. results = []
  216. def multiproduce():
  217. """ Run through all the producers in order """
  218. if not producers:
  219. return []
  220. callback = producers.pop(0)
  221. if isinstance(callback, types.FunctionType):
  222. try:
  223. value = callback()
  224. except RPCError as inst:
  225. value = {'faultCode':inst.code, 'faultString':inst.text}
  226. if value is NOT_DONE_YET:
  227. # push it back in the front of the queue because we
  228. # need to finish the calls in requested order
  229. producers.insert(0, callback)
  230. return NOT_DONE_YET
  231. else:
  232. value = callback
  233. results.append(value)
  234. if producers:
  235. # only finish when all producers are finished
  236. return NOT_DONE_YET
  237. return results
  238. multiproduce.delay = .05
  239. return multiproduce
  240. class AttrDict(dict):
  241. # hack to make a dict's getattr equivalent to its getitem
  242. def __getattr__(self, name):
  243. return self[name]
  244. class RootRPCInterface:
  245. def __init__(self, subinterfaces):
  246. for name, rpcinterface in subinterfaces:
  247. setattr(self, name, rpcinterface)
  248. def make_datetime(text):
  249. return datetime.datetime(
  250. *time.strptime(text, "%Y%m%dT%H:%M:%S")[:6]
  251. )
  252. class supervisor_xmlrpc_handler(xmlrpc_handler):
  253. path = '/RPC2'
  254. IDENT = 'Supervisor XML-RPC Handler'
  255. unmarshallers = {
  256. "int": lambda x: int(x.text),
  257. "i4": lambda x: int(x.text),
  258. "boolean": lambda x: x.text == "1",
  259. "string": lambda x: x.text or "",
  260. "double": lambda x: float(x.text),
  261. "dateTime.iso8601": lambda x: make_datetime(x.text),
  262. "array": lambda x: x[0].text,
  263. "data": lambda x: [v.text for v in x],
  264. "struct": lambda x: dict([(k.text or "", v.text) for k, v in x]),
  265. "base64": lambda x: as_string(decodestring(as_bytes(x.text or ""))),
  266. "param": lambda x: x[0].text,
  267. }
  268. def __init__(self, supervisord, subinterfaces):
  269. self.rpcinterface = RootRPCInterface(subinterfaces)
  270. self.supervisord = supervisord
  271. def loads(self, data):
  272. params = method = None
  273. for action, elem in iterparse(StringIO(data)):
  274. unmarshall = self.unmarshallers.get(elem.tag)
  275. if unmarshall:
  276. data = unmarshall(elem)
  277. elem.clear()
  278. elem.text = data
  279. elif elem.tag == "value":
  280. try:
  281. data = elem[0].text
  282. except IndexError:
  283. data = elem.text or ""
  284. elem.clear()
  285. elem.text = data
  286. elif elem.tag == "methodName":
  287. method = elem.text
  288. elif elem.tag == "params":
  289. params = tuple([v.text for v in elem])
  290. return params, method
  291. def match(self, request):
  292. return request.uri.startswith(self.path)
  293. def continue_request (self, data, request):
  294. logger = self.supervisord.options.logger
  295. try:
  296. params, method = self.loads(data)
  297. # no <methodName> in the request or name is an empty string
  298. if not method:
  299. logger.trace('XML-RPC request received with no method name')
  300. request.error(400)
  301. return
  302. # we allow xml-rpc clients that do not send empty <params>
  303. # when there are no parameters for the method call
  304. if params is None:
  305. params = ()
  306. try:
  307. logger.trace('XML-RPC method called: %s()' % method)
  308. value = self.call(method, params)
  309. # application-specific: instead of we never want to
  310. # marshal None (even though we could by saying allow_none=True
  311. # in dumps within xmlrpc_marshall), this is meant as
  312. # a debugging fixture, see issue 223.
  313. assert value is not None, (
  314. 'return value from method %r with params %r is None' %
  315. (method, params)
  316. )
  317. logger.trace('XML-RPC method %s() returned successfully' %
  318. method)
  319. except RPCError as err:
  320. # turn RPCError reported by method into a Fault instance
  321. value = xmlrpclib.Fault(err.code, err.text)
  322. logger.trace('XML-RPC method %s() returned fault: [%d] %s' % (
  323. method,
  324. err.code, err.text))
  325. if isinstance(value, types.FunctionType):
  326. # returning a function from an RPC method implies that
  327. # this needs to be a deferred response (it needs to block).
  328. pushproducer = request.channel.push_with_producer
  329. pushproducer(DeferredXMLRPCResponse(request, value))
  330. else:
  331. # if we get anything but a function, it implies that this
  332. # response doesn't need to be deferred, we can service it
  333. # right away.
  334. body = xmlrpc_marshal(value)
  335. request['Content-Type'] = 'text/xml'
  336. request['Content-Length'] = len(body)
  337. request.push(body)
  338. request.done()
  339. except:
  340. io = StringIO()
  341. traceback.print_exc(file=io)
  342. val = io.getvalue()
  343. logger.critical(val)
  344. # internal error, report as HTTP server error
  345. request.error(500)
  346. def call(self, method, params):
  347. return traverse(self.rpcinterface, method, params)
  348. def traverse(ob, method, params):
  349. path = method.split('.')
  350. for name in path:
  351. if name.startswith('_'):
  352. # security (don't allow things that start with an underscore to
  353. # be called remotely)
  354. raise RPCError(Faults.UNKNOWN_METHOD)
  355. ob = getattr(ob, name, None)
  356. if ob is None:
  357. raise RPCError(Faults.UNKNOWN_METHOD)
  358. try:
  359. return ob(*params)
  360. except TypeError:
  361. raise RPCError(Faults.INCORRECT_PARAMETERS)
  362. class SupervisorTransport(xmlrpclib.Transport):
  363. """
  364. Provides a Transport for xmlrpclib that uses
  365. httplib.HTTPConnection in order to support persistent
  366. connections. Also support basic auth and UNIX domain socket
  367. servers.
  368. """
  369. connection = None
  370. def __init__(self, username=None, password=None, serverurl=None):
  371. xmlrpclib.Transport.__init__(self)
  372. self.username = username
  373. self.password = password
  374. self.verbose = False
  375. self.serverurl = serverurl
  376. if serverurl.startswith('http://'):
  377. type, uri = urllib.splittype(serverurl)
  378. host, path = urllib.splithost(uri)
  379. host, port = urllib.splitport(host)
  380. if port is None:
  381. port = 80
  382. else:
  383. port = int(port)
  384. def get_connection(host=host, port=port):
  385. return httplib.HTTPConnection(host, port)
  386. self._get_connection = get_connection
  387. elif serverurl.startswith('unix://'):
  388. def get_connection(serverurl=serverurl):
  389. # we use 'localhost' here because domain names must be
  390. # < 64 chars (or we'd use the serverurl filename)
  391. conn = UnixStreamHTTPConnection('localhost')
  392. conn.socketfile = serverurl[7:]
  393. return conn
  394. self._get_connection = get_connection
  395. else:
  396. raise ValueError('Unknown protocol for serverurl %s' % serverurl)
  397. def request(self, host, handler, request_body, verbose=0):
  398. if not self.connection:
  399. self.connection = self._get_connection()
  400. self.headers = {
  401. "User-Agent" : self.user_agent,
  402. "Content-Type" : "text/xml",
  403. "Accept": "text/xml"
  404. }
  405. # basic auth
  406. if self.username is not None and self.password is not None:
  407. unencoded = "%s:%s" % (self.username, self.password)
  408. encoded = as_string(encodestring(as_bytes(unencoded)))
  409. encoded = encoded.replace('\n', '')
  410. encoded = encoded.replace('\012', '')
  411. self.headers["Authorization"] = "Basic %s" % encoded
  412. self.headers["Content-Length"] = str(len(request_body))
  413. self.connection.request('POST', handler, request_body, self.headers)
  414. r = self.connection.getresponse()
  415. if r.status != 200:
  416. self.connection.close()
  417. self.connection = None
  418. raise xmlrpclib.ProtocolError(host + handler,
  419. r.status,
  420. r.reason,
  421. '' )
  422. data = r.read()
  423. p, u = self.getparser()
  424. p.feed(data)
  425. p.close()
  426. return u.close()
  427. class UnixStreamHTTPConnection(httplib.HTTPConnection):
  428. def connect(self): # pragma: no cover
  429. self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  430. # we abuse the host parameter as the socketname
  431. self.sock.connect(self.socketfile)
  432. def gettags(comment):
  433. """ Parse documentation strings into JavaDoc-like tokens """
  434. tags = []
  435. tag = None
  436. datatype = None
  437. name = None
  438. tag_lineno = lineno = 0
  439. tag_text = []
  440. for line in comment.split('\n'):
  441. line = line.strip()
  442. if line.startswith("@"):
  443. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  444. parts = line.split(None, 3)
  445. if len(parts) == 1:
  446. datatype = ''
  447. name = ''
  448. tag_text = []
  449. elif len(parts) == 2:
  450. datatype = parts[1]
  451. name = ''
  452. tag_text = []
  453. elif len(parts) == 3:
  454. datatype = parts[1]
  455. name = parts[2]
  456. tag_text = []
  457. elif len(parts) == 4:
  458. datatype = parts[1]
  459. name = parts[2]
  460. tag_text = [parts[3].lstrip()]
  461. tag = parts[0][1:]
  462. tag_lineno = lineno
  463. else:
  464. if line:
  465. tag_text.append(line)
  466. lineno += 1
  467. tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
  468. return tags