socketserver.py 16 KB


  1. import asyncore
  2. import asynchat
  3. import socket
  4. import srp
  5. import os
  6. import signal
  7. import sys
  8. import time
  9. # response status codes
  10. ST_OK = 200
  11. ST_FAILED = 400
  12. ST_AUTHREQUIRED = 401
  13. ST_SERVERERROR = 500
  14. PROTVERSION = "1.0"
  15. from tailhelper import TailHelper
  16. class CommandServerChannel(asynchat.async_chat):
  17. authenticating = False # flag to determine whether a connection is auth'ing
  18. authsession = None # SRP auth session data
  19. clientproof = None # SRP client proof data
  20. def __init__ (self, server, sock, addr):
  21. asynchat.async_chat.__init__ (self, sock)
  22. self.server = server
  23. self.addr = addr
  24. self.set_terminator ('\n')
  25. self.data = ''
  26. self.supervisord = server.supervisord
  27. self.options = server.supervisord.options
  28. def collect_incoming_data (self, data):
  29. self.data = self.data + data
  30. def found_terminator (self):
  31. line = self.data
  32. self.data = ''
  33. if not line:
  34. pass
  35. self.handle_command (line)
  36. def handle_command(self, command):
  37. try:
  38. protversion, command = command.split(' ', 1)
  39. except (ValueError, IndexError):
  40. self.sendreply(ST_FAILED, "Malformed request")
  41. return
  42. args = command.split()
  43. if not args:
  44. self.sendreply(ST_FAILED, "Empty command")
  45. return
  46. command = args[0]
  47. if not self.checkauth() and not command in ('auth', 'proof'):
  48. self.sendreply(ST_AUTHREQUIRED, "Authentication required\n")
  49. return
  50. methodname = "cmd_" + command
  51. method = getattr(self, methodname, None)
  52. if method:
  53. status, msg = method(args)
  54. self.sendreply(status, msg)
  55. else:
  56. self.sendreply(
  57. ST_FAILED,
  58. "Unknown command %r; 'capabilities' for a list\n" % command)
  59. def sendreply(self, status, msg):
  60. if not msg.endswith("\n"):
  61. msg = msg + "\n"
  62. msglen = len(msg)
  63. msg = "%s %s %s\n%s" % (PROTVERSION, status, msglen, msg)
  64. self.push(msg)
  65. def checkauth(self):
  66. if self.options.noauth:
  67. return True
  68. if self.authsession and not self.proving:
  69. return True
  70. return False
  71. def cmd_auth(self, args):
  72. if len(args) < 3:
  73. self.authsession = None
  74. return ST_FAILED, "AUTH command needs user and pubkey"
  75. user = args[1]
  76. pubkey = ' '.join(args[2:])
  77. try:
  78. A = srp.decode_long(''.join(pubkey.strip()))
  79. except:
  80. self.authsession = None
  81. return ST_FAILED, "malformed public key"
  82. try:
  83. if (not self.options.passwdfile or not
  84. os.path.exists(self.options.passwdfile)):
  85. raise srp.NoSuchUser
  86. self.authsession = tuple(
  87. srp.lookup(user, A, self.options.passwdfile)) + (A,)
  88. except srp.NoSuchUser:
  89. self.authsession = None
  90. return ST_FAILED, 'No such user "%s" \n' % user
  91. s, B, u, K, m, A = self.authsession
  92. key = '\t'.join([srp.encode_string(s),
  93. srp.encode_long(B),
  94. srp.encode_long(u)])
  95. self.proving = True
  96. return ST_OK, key
  97. def cmd_proof(self, args):
  98. if not self.proving:
  99. return ST_FAILED, "PROOF must be isssued directly afer AUTH"
  100. if len(args) < 2:
  101. self.authsession = None
  102. return ST_FAILED, "No client proof"
  103. self.clientproof = srp.decode_string(args[1])
  104. self.proving = False
  105. s, B, u, K, m, A = self.authsession
  106. if m == self.clientproof:
  107. hostproof = srp.encode_string(
  108. srp.host_authenticator(K, A, m))
  109. return ST_OK, hostproof
  110. else:
  111. self.authsession = None
  112. return ST_FAILED, 'Client proof failed. \n'
  113. def cmd_start(self, args):
  114. self.mood = 1
  115. names = args[1:]
  116. if not names:
  117. return ST_FAILED, "No process named"
  118. try:
  119. procs = self.supervisord.proclist.getmany(names)
  120. except KeyError, name:
  121. return ST_FAILED, "Unknown process named %s" % name
  122. resp = []
  123. for proc in procs:
  124. proc.backoff = 0
  125. proc.delay = 0
  126. proc.killing = 0
  127. proc.administrative_stop = 0
  128. if not proc.pid:
  129. proc.spawn()
  130. resp.append("%s started" % proc.name)
  131. else:
  132. resp.append("%s already started" % proc.name)
  133. return ST_OK, '\n'.join(resp)
  134. def cmd_stop(self, args):
  135. self.mood = 1
  136. names = args[1:]
  137. if not names:
  138. return ST_FAILED, "No process named"
  139. try:
  140. procs = self.supervisord.proclist.getmany(names)
  141. except KeyError, name:
  142. return ST_FAILED, "Unknown process named %s" % name
  143. resp = []
  144. for proc in procs:
  145. proc.backoff = 0
  146. proc.delay = 0
  147. proc.killing = 0
  148. if proc.pid:
  149. status = proc.kill(signal.SIGTERM)
  150. if status:
  151. resp.append(status)
  152. else:
  153. resp.append("%s: sent SIGTERM" % proc.name)
  154. proc.killing = 1
  155. proc.administrative_stop = 1
  156. proc.delay = time.time() + self.options.backofflimit
  157. else:
  158. proc.administrative_stop = 1
  159. resp.append("%s: already stopped" % proc.name)
  160. return ST_OK, '\n'.join(resp)
  161. def cmd_restart(self, args):
  162. self.mood = 1 # Up
  163. names = args[1:]
  164. if not names:
  165. return ST_FAILED, "No process named"
  166. try:
  167. procs = self.supervisord.proclist.getmany(names)
  168. except KeyError, name:
  169. return ST_FAILED, "Unknown process named %s" % name
  170. resp = []
  171. for proc in procs:
  172. proc.administrative_stop = 0
  173. proc.backoff = 0
  174. proc.delay = 0
  175. proc.killing = 0
  176. if proc.pid:
  177. status = proc.kill(signal.SIGTERM)
  178. resp.append("Sent SIGTERM to %s; will restart later"
  179. % proc.name)
  180. proc.killing = 1
  181. proc.delay = time.time() + self.options.backofflimit
  182. else:
  183. proc.spawn()
  184. resp.append("%s started" % proc.name)
  185. return ST_OK, '\n'.join(resp)
  186. def cmd_kill(self, args):
  187. try:
  188. which = args[1]
  189. except IndexError:
  190. return ST_FAILED, "No process named"
  191. if args[2:]:
  192. try:
  193. sig = int(args[2])
  194. except:
  195. return ST_FAILED, "Bad signal %r" % args[2]
  196. else:
  197. sig = signal.SIGTERM
  198. procs = self.supervisord.proclist.get(which)
  199. procs.reverse() # kill in reverse priority order
  200. resp = []
  201. if not procs:
  202. return ST_FAILED, "Unknown process %s" % which
  203. for proc in procs:
  204. if not proc.pid:
  205. resp.append("%s not running" % proc.name)
  206. else:
  207. msg = proc.kill(sig)
  208. if msg:
  209. resp.append("Kill of %s with signal %d failed: %s" %
  210. (proc.name, sig, msg))
  211. else:
  212. resp.append("Signal %d sent to %s" % (sig, proc.name))
  213. return ST_OK, '\n'.join(resp)
  214. def cmd_status(self, args):
  215. names = args[1:]
  216. if not names:
  217. up, down = self.supervisord.proclist.getupdown()
  218. up = ','.join([proc.name for proc in up])
  219. down = ','.join([proc.name for proc in down])
  220. return (ST_OK,
  221. "socket=%s\n" % `self.options.sockname` +
  222. "supervisord_pid=%s\n" % os.getpid() +
  223. "up=%s\n" % up +
  224. "down=%s\n" % down)
  225. try:
  226. procs = self.supervisord.proclist.getmany(names)
  227. except KeyError, name:
  228. return ST_FAILED, "Unknown process named %s" % name
  229. msg = ''
  230. for proc in procs:
  231. filename = proc.get_execv_args()[0]
  232. msg = msg + ("name=%s\n" % proc.name +
  233. "command=%s\n" % filename +
  234. "status=%s\n" % (proc.pid and "up" or "down") +
  235. "pid=%s\n" % proc.pid)
  236. return ST_OK, msg
  237. def cmd_list(self, args):
  238. try:
  239. which = args[1]
  240. except IndexError:
  241. which = 'all'
  242. if which not in ['all', 'up', 'down']:
  243. return ST_FAILED, 'args to list must be one of "all", "up", "down"'
  244. procs = self.supervisord.proclist.get(which)
  245. names = [ proc.name for proc in procs ]
  246. msg = '\n'.join(names)
  247. return ST_OK, msg
  248. def cmd_reload(self, args):
  249. self.mood = 0
  250. self.options.logger.critical('Reloading config and restarting all '
  251. 'processes')
  252. self.supervisord.proclist.stop_all()
  253. return ST_OK, 'Reloading configuration after all processes have quit'
  254. self.close()
  255. def cmd_shutdown(self, args):
  256. self.supervisord.mood = -1 # exiting
  257. self.options.logger.critical("supervisord stopping via shutdown")
  258. self.supervisord.proclist.stop_all()
  259. return ST_OK, "Will shut down after all processes have quit"
  260. def cmd_logtail(self, args):
  261. try:
  262. numlines = args[1]
  263. except:
  264. numlines = 15
  265. try:
  266. numlines = int(numlines)
  267. except:
  268. return ST_FAILED, ('Number of lines must be integer, was: %s' %
  269. numlines)
  270. helper = TailHelper(self.options.logfile)
  271. sz, lines = helper.tail(numlines)
  272. return ST_OK, ''.join(lines)
  273. def cmd_capabilities(self, args):
  274. caps = [
  275. "capabilities -- return server capabilities",
  276. "status [name] -- report application/process status",
  277. "kill name [signame] -- send a signal to the application",
  278. "start name [name..] -- start an application",
  279. "stop name [name..] -- stop an application if running",
  280. "restart name [name..] -- stop followed by start",
  281. "list [all|up|down] -- list controlled service names",
  282. "shutdown -- Shut the supervisord process down",
  283. ]
  284. if not self.noauth:
  285. caps.append(
  286. "auth -- Initiate SRP authentication"
  287. )
  288. return ST_OK, '\n'.join(caps)
  289. def cmd_add(self, args):
  290. from supervisord import Subprocess
  291. from rpc import ProcessConfig
  292. try:
  293. processName = args[1]
  294. except IndexError:
  295. return ST_FAILED, "No process named"
  296. if not args[2:]:
  297. return ST_FAILED, "Bad no command named"
  298. command = ' '.join(args[2:])
  299. options = self.options
  300. logname = processName + str(time.time()) + '.log'
  301. logfile = os.path.join(options.childlogdir, logname)
  302. config = ProcessConfig(name = processName,
  303. command=command, priority=999, auto_start = True,
  304. auto_restart = False, user = options.uid,
  305. logfile=logfile
  306. )
  307. process = Subprocess(options, config)
  308. process.metadata = ''
  309. self.supervisord.proclist.processes[processName] = process
  310. self.supervisord.proclist.start_necessary()
  311. return ST_OK, '%s might start' % processName
  312. class CommandLineServer(asyncore.dispatcher):
  313. channel_class = CommandServerChannel
  314. def __init__(self, supervisord):
  315. asyncore.dispatcher.__init__(self, None, None)
  316. self.supervisord = supervisord
  317. self.options = supervisord.options
  318. def opensocket(self):
  319. if self.options.sockfamily == socket.AF_UNIX:
  320. self.open_domainsocket()
  321. return
  322. if self.options.sockfamily == socket.AF_INET:
  323. self.open_inetsocket()
  324. return
  325. raise RuntimeError, ('Unknown socket family %s' %
  326. self.supervisord.options.sockfamily)
  327. def open_inetsocket(self):
  328. sockname = self.options.sockname
  329. self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  330. self.set_reuse_addr()
  331. try:
  332. self.bind(sockname)
  333. except socket.error:
  334. sys.stderr.write(
  335. 'Another process is already listening on port %s; could '
  336. 'not start supervisord!\n' % sockname[1])
  337. sys.exit(1)
  338. self.listen(5)
  339. def open_domainsocket(self):
  340. options = self.options
  341. sockname = options.sockname
  342. tempname = "%s.%d" % (sockname, os.getpid())
  343. self.unlink_quietly(tempname)
  344. while 1:
  345. self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
  346. try:
  347. self.bind(tempname)
  348. os.chmod(tempname, options.sockchmod)
  349. try:
  350. os.link(tempname, sockname)
  351. if options.sockchown:
  352. try:
  353. os.chown(sockname, options.sockuid, options.sockgid)
  354. except os.error:
  355. raise
  356. options.logger.critical(
  357. 'Cant set uid/gid on socket!')
  358. options.usage(
  359. 'Invalid socket-chown uid/gid %s'
  360. % `self.options.sockchown`)
  361. break
  362. except os.error:
  363. # Lock contention, or stale socket.
  364. self.checkopen()
  365. # Stale socket -- delete, sleep, and try again.
  366. msg = "Unlinking stale socket %s" % sockname
  367. sys.stderr.write(msg + "\n")
  368. self.options.logger.warn(msg)
  369. self.unlink_quietly(sockname)
  370. self.close()
  371. time.sleep(1)
  372. continue
  373. finally:
  374. self.unlink_quietly(tempname)
  375. self.listen(5)
  376. def unlink_quietly(self, filename):
  377. try:
  378. os.unlink(filename)
  379. except os.error:
  380. pass
  381. def checkopen(self):
  382. options = self.options
  383. s = socket.socket(options.sockfamily, socket.SOCK_STREAM)
  384. try:
  385. s.connect(options.sockname)
  386. s.send("1.0 STATUS\n")
  387. data = s.recv(1000)
  388. s.close()
  389. except socket.error:
  390. pass
  391. else:
  392. while data.endswith("\n"):
  393. data = data[:-1]
  394. msg = ("Another supervisord is already up using socket %r:\n%s" %
  395. (options.sockname, data))
  396. sys.stderr.write(msg + "\n")
  397. options.logger.critical(msg)
  398. sys.exit(1)
  399. def handle_accept (self):
  400. conn, addr = self.accept()
  401. channel = self.channel_class(self, conn, addr)
  402. if not self.options.noauth:
  403. channel.authenticating = True
  404. channel.authsession = None
  405. channel.clientproof=None
  406. channel.authbuffer = ""
  407. #self.channels[channel] = 1
  408. def writable(self):
  409. return 0
  410. def readable(self):
  411. return 1
  412. def makeCommandLineServer(supervisord):
  413. options = supervisord.options
  414. if options.sockname is None:
  415. return
  416. if options.noauth:
  417. options.logger.critical(
  418. 'Running without any supervisorctl socket authentication '
  419. 'checking')
  420. server = CommandLineServer(supervisord)
  421. options.logger.info('Running socket server on %r' % options.sockname)
  422. server.opensocket()
  423. return server