瀏覽代碼

Added GSoC work -- tab completions in supervisorctl shell, and foreground mode for Supervisor

Siddhant Goel 17 年之前
父節點
當前提交
8af7d817c7
共有 5 個文件被更改,包括 274 次插入20 次删除
  1. 6 0
      CHANGES.txt
  2. 0 4
      TODO.txt
  3. 204 2
      src/supervisor/supervisorctl.py
  4. 14 14
      src/supervisor/tests/base.py
  5. 50 0
      src/supervisor/tests/test_supervisorctl.py

+ 6 - 0
CHANGES.txt

@@ -1,5 +1,11 @@
 Next Release
 
+  - Tab completions in the supervisorctl shell, and a foreground mode
+    for Supervisor, implemented as a part of GSoC.
+    The supervisorctl program now has a 'fg' command, which makes it
+    possible to supply inputs to a process, and see its output/error
+    stream in real time.
+
   - Process config reloading implemented by Anders Quist.  The
     supervisorctl program now has the commands "add" and "drop".
     "add <programname>" adds the process group implied by <programname>

+ 0 - 4
TODO.txt

@@ -18,10 +18,6 @@
   bytes requested exceeds the number logged.  This is not certain and needs
   investigation.  Reported by Chris McDonough.
 
-- Supervisorctl tab completion and history.
-
-- Supervisorctl "debug" command (communicate with a process over its stdin).
-
 - Allow effective user to switch to a particular group instead of
   defaulting to the user's primary group:
   http://www.plope.com/software/collector/233.

+ 204 - 2
src/supervisor/supervisorctl.py

@@ -42,11 +42,68 @@ import socket
 import asyncore
 import errno
 import urlparse
+import threading
 
 from supervisor.options import ClientOptions
 from supervisor.options import split_namespec
 from supervisor import xmlrpc
 
+class fgthread(threading.Thread):
+    
+    # A subclass of threading.Thread, with a kill() method.
+    # To be used for foreground output/error streaming.
+    # http://mail.python.org/pipermail/python-list/2004-May/260937.html
+  
+    def __init__(self, program, ctl):
+        threading.Thread.__init__(self)
+        import http_client
+        self.killed = False
+        self.program=program
+        self.ctl=ctl
+        self.listener=http_client.Listener()
+        self.output_handler=http_client.HTTPHandler(self.listener,
+                                                    self.ctl.options.username,
+                                                    self.ctl.options.password)
+        self.error_handler=http_client.HTTPHandler(self.listener,
+                                                   self.ctl.options.username,
+                                                   self.ctl.options.password)
+
+    def start(self):
+        # Start the thread
+        self.__run_backup = self.run
+        self.run = self.__run
+        threading.Thread.start(self)
+
+    def run(self):
+        self.output_handler.get(self.ctl.options.serverurl,
+                                '/logtail/%s/stdout'%self.program)
+        self.error_handler.get(self.ctl.options.serverurl,
+                               '/logtail/%s/stderr'%self.program)
+        asyncore.loop()
+
+    def __run(self):
+        # Hacked run function, which installs the trace
+        sys.settrace(self.globaltrace)
+        self.__run_backup()
+        self.run = self.__run_backup
+
+    def globaltrace(self, frame, why, arg):
+        if why == 'call':
+            return self.localtrace
+        else:
+            return None
+
+    def localtrace(self, frame, why, arg):
+        if self.killed:
+            if why == 'line':
+                raise SystemExit()
+        return self.localtrace
+
+    def kill(self):
+        self.output_handler.close()
+        self.error_handler.close()
+        self.killed = True
+
 class Controller(cmd.Cmd):
 
     def __init__(self, options, completekey='tab', stdin=None,
@@ -54,6 +111,11 @@ class Controller(cmd.Cmd):
         self.options = options
         self.prompt = self.options.prompt + '> '
         self.options.plugins = []
+        self.vocab = ['add','exit','maintail','pid','reload',
+                      'restart','start','stop','version','clear',
+                      'fg','open','quit','remove','shutdown','status',
+                      'tail','help']
+        self.info=self.get_supervisor().getAllProcessInfo()
         cmd.Cmd.__init__(self, completekey, stdin, stdout)
         for name, factory, kwargs in self.options.plugin_factories:
             plugin = factory(self, **kwargs)
@@ -160,6 +222,75 @@ class Controller(cmd.Cmd):
             raise
         return True
 
+    def completionmatches(self,text,line,flag=0):
+        groups=[]
+        programs=[]
+        groupwiseprograms={}
+        for i in self.info:
+            programs.append(i['name'])
+            if i['group'] not in groups:
+                groups.append(i['group'])
+                groupwiseprograms[i['group']]=[]
+            groupwiseprograms[i['group']].append(i['name'])
+        total=[]
+        for i in groups:
+            if i in programs:
+                total.append(i+' ')
+            else:
+                for n in groupwiseprograms[i]:
+                    total.append(i+':'+n+' ')
+        if flag:
+            # add/remove require only the group name
+            return [i+' ' for i in groups if i.startswith(text)]
+        if len(line.split()) == 1:
+            return total
+        else:
+            current=line.split()[-1]
+            if line.endswith(' ') and len(line.split()) > 1:
+                results=[i for i in total if i.startswith(text)]
+                return results
+            if ':' in current:
+                g=current.split(':')[0]
+                results = [i+' ' for i in groupwiseprograms[g]
+                           if i.startswith(text)]
+                return results
+            results = [i for i in total if i.startswith(text)]
+            return results
+
+    def complete(self,text,state):
+        try:
+            import readline
+        except ImportError:
+            return None
+        line=readline.get_line_buffer()
+        if line == '':
+            results = [i+' ' for i in self.vocab if i.startswith(text)]+[None]
+            return results[state]
+        else:
+            exp=line.split()[0]
+            if exp in ['start','stop','restart','clear','status','tail','fg']:
+                if not line.endswith(' ') and len(line.split()) == 1:
+                    return [text+' ',None][state]
+                if exp == 'fg':
+                    if line.endswith(' ') and len(line.split()) > 1:
+                        return None
+                results=self.completionmatches(text,line)+[None]
+                return results[state]
+            elif exp in ['maintail','pid','reload','shutdown','exit','open',
+                         'quit','version','EOF']:
+                return None
+            elif exp == 'help':
+                if line.endswith(' ') and len(line.split()) > 1:
+                    return None
+                results=[i+' ' for i in self.vocab if i.startswith(text)]+[None]
+                return results[state]
+            elif exp in ['add','remove']:
+                results=self.completionmatches(text,line,flag=1)+[None]
+                return results[state]
+            else:
+                results=[i+' ' for i in self.vocab if i.startswith(text)]+[None]
+                return results[state]
+
     def do_help(self, arg):
         for plugin in self.options.plugins:
             plugin.do_help(arg)
@@ -646,13 +777,26 @@ class DefaultControllerPlugin(ControllerPluginBase):
     def help_reload(self):
         self.ctl.output("reload \t\tRestart the remote supervisord.")
 
+    def _formatChanges(self, (added, changed, dropped)):
+        changedict = {}
+        for n, t in [(added, 'available'),
+                     (changed, 'changed'),
+                     (dropped, 'disappeared')]:
+            changedict.update(dict(zip(n, [t] * len(n))))
+
+        if changedict:
+            for name in sorted(changedict):
+                self.ctl.output("%s: %s" % (name, changedict[name]))
+        else:
+            self.ctl.output("No config updates to proccesses")
+
     def do_add(self, arg):
         names = arg.strip().split()
 
         supervisor = self.ctl.get_supervisor()
         for name in names:
             try:
-                supervisor.addProcessGroup(name)
+                supervisor.addProcess(name)
             except xmlrpclib.Fault, e:
                 if e.faultCode == xmlrpc.Faults.SHUTDOWN_STATE:
                     self.ctl.output('ERROR: shutting down')
@@ -676,7 +820,7 @@ class DefaultControllerPlugin(ControllerPluginBase):
         supervisor = self.ctl.get_supervisor()
         for name in names:
             try:
-                result = supervisor.removeProcessGroup(name)
+                result = supervisor.removeProcess(name)
             except xmlrpclib.Fault, e:
                 if e.faultCode == xmlrpc.Faults.STILL_RUNNING:
                     self.ctl.output('ERROR: process/group still running: %s'
@@ -767,6 +911,64 @@ class DefaultControllerPlugin(ControllerPluginBase):
             "version\t\t\tShow the version of the remote supervisord "
             "process")
 
+    def do_fg(self,args=None):
+        if not self.ctl.upcheck():
+            return
+        if not args:
+            self.ctl.output('Error: no process name supplied')
+            self.help_fg()
+            return
+        args=args.split()
+        if len(args)>1:
+            self.ctl.output('Error: too many process names supplied')
+            return
+        program=args[0]
+        supervisor=self.ctl.get_supervisor()
+        try:
+            info=supervisor.getProcessInfo(program)
+        except xmlrpclib.Fault, msg:
+            if msg.faultCode == xmlrpc.Faults.BAD_NAME:
+                self.ctl.output('Error: bad process name supplied')
+                return
+            # for any other fault
+            self.ctl.output(str(msg))
+            return
+        if not info['statename'] == 'RUNNING':
+            self.ctl.output('Error: process not running')
+            return
+        # everything good; continue
+        try:
+            a=fgthread(program,self.ctl)
+            # this thread takes care of
+            # the output/error messages
+            a.start()
+            while True:
+                # this takes care of the user input
+                inp = raw_input() + '\n'
+                try:
+                    supervisor.sendProcessStdin(program,inp)
+                except xmlrpclib.Fault, msg:
+                    if msg.faultCode == 70:
+                        self.ctl.output('Process got killed')
+                        self.ctl.output('Exiting foreground')
+                        a.kill()
+                        return
+                info = supervisor.getProcessInfo(program)
+                if not info['statename'] == 'RUNNING':
+                    self.ctl.output('Process got killed')
+                    self.ctl.output('Exiting foreground')
+                    a.kill()
+                    return
+                continue
+        except KeyboardInterrupt:
+            a.kill()
+            self.ctl.output('Exiting foreground')
+        return
+
+    def help_fg(self,args=None):
+        self.ctl.output('fg <process>\tConnect to a process in foreground mode')
+        self.ctl.output('Press Ctrl+C to exit foreground')
+
 def main(args=None, options=None):
     if options is None:
         options = ClientOptions()

+ 14 - 14
src/supervisor/tests/base.py

@@ -660,19 +660,19 @@ class DummySupervisorRPCNamespace:
         return self.all_process_info
 
     def getProcessInfo(self, name):
+        from supervisor import xmlrpc
+        import xmlrpclib
         from supervisor.process import ProcessStates
-        return {
-            'name':'foo',
-            'group':'foo',
-            'pid':11,
-            'state':ProcessStates.RUNNING,
-            'statename':'RUNNING',
-            'start':_NOW - 100,
-            'stop':0,
-            'spawnerr':'',
-            'now':_NOW,
-            'description':'foo description',
-             }
+        for i in self.all_process_info:
+            if i['name']==name:
+                info=i
+                return info
+        if name == 'BAD_NAME':
+            raise xmlrpclib.Fault(xmlrpc.Faults.BAD_NAME, 'BAD_NAME')
+        if name == 'FAILED':
+            raise xmlrpclib.Fault(xmlrpc.Faults.FAILED, 'FAILED')
+        if name == 'NO_FILE':
+            raise xmlrpclib.Fault(xmlrpc.Faults.NO_FILE, 'NO_FILE')
 
     def startProcess(self, name):
         from supervisor import xmlrpc
@@ -769,7 +769,7 @@ class DummySupervisorRPCNamespace:
     def reloadConfig(self):
         return [[['added'], ['changed'], ['dropped']]]
 
-    def addProcessGroup(self, name):
+    def addProcess(self, name):
         from xmlrpclib import Fault
         from supervisor import xmlrpc
         if name == 'ALREADY_ADDED':
@@ -781,7 +781,7 @@ class DummySupervisorRPCNamespace:
         else:
             self.processes = [name]
 
-    def removeProcessGroup(self, name):
+    def removeProcess(self, name):
         from xmlrpclib import Fault
         from supervisor import xmlrpc
         if name == 'STILL_RUNNING':

+ 50 - 0
src/supervisor/tests/test_supervisorctl.py

@@ -74,6 +74,18 @@ class ControllerTests(unittest.TestCase):
         self.assertEqual(controller.cmdqueue, [' help'])
         self.assertEqual(plugin.helped, True)
 
+    def test_completionmatches(self):
+        options=DummyClientOptions()
+        controller=self._makeOne(options)
+        controller.stdout=StringIO()
+        plugin=DummyPlugin()
+        controller.options.plugin=(plugin,)
+        for i in ['add','remove']:
+            result=controller.completionmatches('',i+' ',1)
+            self.assertEqual(result,['foo ','bar ','baz '])
+        result=controller.completionmatches('','fg baz:')
+        self.assertEqual(result,['baz_01 '])
+
     def test_nohelp(self):
         options = DummyClientOptions()
         controller = self._makeOne(options)
@@ -460,6 +472,12 @@ class TestDefaultControllerPlugin(unittest.TestCase):
         self.assertEqual(result, None)
         self.assertEqual(options._server.supervisor._shutdown, True)
 
+    def test__formatChanges(self):
+        plugin = self._makeOne()
+        # Don't explode, plz
+        plugin._formatChanges([['added'], ['changed'], ['dropped']])
+        plugin._formatChanges([[], [], []])
+
     def test_add(self):
         plugin = self._makeOne()
         result = plugin.do_add('foo')
@@ -578,6 +596,38 @@ class TestDefaultControllerPlugin(unittest.TestCase):
         self.assertEqual(plugin.ctl.stdout.getvalue(),
                          'supervisord: ERROR (unknown error reading log)\n')
 
+    def test_fg_toofewargs(self):
+        plugin=self._makeOne()
+        result=plugin.do_fg('')
+        lines=plugin.ctl.stdout.getvalue().split('\n')
+        self.assertEqual(result,None)
+        self.assertEqual(lines[0],'Error: no process name supplied')
+
+    def test_fg_toomanyargs(self):
+        plugin=self._makeOne()
+        result=plugin.do_fg('foo bar')
+        line=plugin.ctl.stdout.getvalue()
+        self.assertEqual(result,None)
+        self.assertEqual(line,'Error: too many process names supplied\n')
+
+    def test_fg_badprocname(self):
+        plugin=self._makeOne()
+        result=plugin.do_fg('BAD_NAME')
+        line=plugin.ctl.stdout.getvalue()
+        self.assertEqual(result,None)
+        self.assertEqual(line,'Error: bad process name supplied\n')
+
+    def test_fg_procnotrunning(self):
+        plugin=self._makeOne()
+        result=plugin.do_fg('bar')
+        line=plugin.ctl.stdout.getvalue()
+        self.assertEqual(result,None)
+        self.assertEqual(line,'Error: process not running\n')
+        result=plugin.do_fg('baz_01')
+        lines=plugin.ctl.stdout.getvalue().split('\n')
+        self.assertEqual(result,None)
+        self.assertEqual(lines[-2],'Error: process not running')
+
 class DummyListener:
     def __init__(self):
         self.errors = []