소스 검색

Refactor to make it possible to unit test the supervisord class.

Write unit tests for supervisord class.
Chris McDonough 19 년 전
부모
커밋
0cd1d2e448
4개의 변경된 파일526개의 추가작업 그리고 359개의 파일을 삭제
  1. 1 2
      src/supervisor/http.py
  2. 338 5
      src/supervisor/options.py
  3. 33 341
      src/supervisor/supervisord.py
  4. 154 11
      src/supervisor/tests.py

+ 1 - 2
src/supervisor/http.py

@@ -608,8 +608,7 @@ class logtail_handler:
 
         request.done()
 
-def make_http_server(supervisord):
-    options = supervisord.options
+def make_http_server(options, supervisord):
     if not options.http_port:
         return
 

+ 338 - 5
src/supervisor/options.py

@@ -1,17 +1,21 @@
 import ConfigParser
+import asyncore
+import socket
 import getopt
 import os
+import sys
 import datatypes
 import logging
-import sys
 import tempfile
-import socket
 import errno
 import signal
 import re
 import xmlrpclib
 import httplib
 import urllib
+import pwd
+import grp
+import resource
 
 class FileHandler(logging.StreamHandler):
     """File handler which supports reopening of logs.
@@ -427,6 +431,11 @@ class ServerOptions(Options):
     passwdfile = None
     nodaemon = None
     AUTOMATIC = []
+    TRACE = 5
+    mood = 1 # 1: up, 0: restarting, -1: suicidal
+    stopping = False # set after we detect that we are handling a stop request
+    mustreopen = False # set after we detect we handled a logreopen signal
+
     
     def __init__(self):
         Options.__init__(self)
@@ -499,11 +508,9 @@ class ServerOptions(Options):
 
     def realize(self, *arg, **kw):
         Options.realize(self, *arg, **kw)
-        import socket
 
         # Additional checking of user option; set uid and gid
         if self.user is not None:
-            import pwd
 	    uid = datatypes.name_to_uid(self.user)
             if uid is None:
                 self.usage("No such user %s" % self.user)
@@ -718,7 +725,136 @@ class ServerOptions(Options):
         programs.sort() # asc by priority
         return programs
 
-    def clear_childlogdir(self):
+    def daemonize(self):
+        # To daemonize, we need to become the leader of our own session
+        # (process) group.  If we do not, signals sent to our
+        # parent process will also be sent to us.   This might be bad because
+        # signals such as SIGINT can be sent to our parent process during
+        # normal (uninteresting) operations such as when we press Ctrl-C in the
+        # parent terminal window to escape from a logtail command.
+        # To disassociate ourselves from our parent's session group we use
+        # os.setsid.  It means "set session id", which has the effect of
+        # disassociating a process from is current session and process group
+        # and setting itself up as a new session leader.
+        #
+        # Unfortunately we cannot call setsid if we're already a session group
+        # leader, so we use "fork" to make a copy of ourselves that is
+        # guaranteed to not be a session group leader.
+        #
+        # We also change directories, set stderr and stdout to null, and
+        # change our umask.
+        #
+        # This explanation was (gratefully) garnered from
+        # http://www.hawklord.uklinux.net/system/daemons/d3.htm
+
+        pid = os.fork()
+        if pid != 0:
+            # Parent
+            self.logger.debug("supervisord forked; parent exiting")
+            os._exit(0)
+        # Child
+        self.logger.info("daemonizing the process")
+        if self.directory:
+            try:
+                os.chdir(self.directory)
+            except os.error, err:
+                self.logger.warn("can't chdir into %r: %s"
+                                 % (self.directory, err))
+            else:
+                self.logger.info("set current directory: %r"
+                                 % self.directory)
+        os.close(0)
+        sys.stdin = sys.__stdin__ = open("/dev/null")
+        os.close(1)
+        sys.stdout = sys.__stdout__ = open("/dev/null", "w")
+        os.close(2)
+        sys.stderr = sys.__stderr__ = open("/dev/null", "w")
+        os.setsid()
+        os.umask(self.umask)
+        # XXX Stevens, in his Advanced Unix book, section 13.3 (page
+        # 417) recommends calling umask(0) and closing unused
+        # file descriptors.  In his Network Programming book, he
+        # additionally recommends ignoring SIGHUP and forking again
+        # after the setsid() call, for obscure SVR4 reasons.
+
+    def write_pidfile(self):
+        pid = os.getpid()
+        try:
+            f = open(self.pidfile, 'w')
+            f.write('%s\n' % pid)
+            f.close()
+        except (IOError, os.error):
+            self.logger.critical('could not write pidfile %s' % self.pidfile)
+        else:
+            self.logger.info('supervisord started with pid %s' % pid)
+                
+    def cleanup(self):
+        try:
+            if self.http_port is not None:
+                if self.http_port.family == socket.AF_UNIX:
+                    os.unlink(self.http_port.address)
+        except os.error:
+            pass
+        try:
+            os.unlink(self.pidfile)
+        except os.error:
+            pass
+
+    def openhttpserver(self, supervisord):
+        from http import make_http_server
+        try:
+            self.httpserver = make_http_server(self, supervisord)
+        except socket.error, why:
+            if why[0] == errno.EADDRINUSE:
+                port = str(self.options.http_port.address)
+                self.usage('Another program is already listening on '
+                           'the port that our HTTP server is '
+                           'configured to use (%s).  Shut this program '
+                           'down first before starting supervisord. ' %
+                           port)
+        except ValueError, why:
+            self.usage(why[0])
+
+    def setsignals(self):
+        signal.signal(signal.SIGTERM, self.sigexit)
+        signal.signal(signal.SIGINT, self.sigexit)
+        signal.signal(signal.SIGQUIT, self.sigexit)
+        signal.signal(signal.SIGHUP, self.sighup)
+        signal.signal(signal.SIGCHLD, self.sigchild)
+        signal.signal(signal.SIGUSR2, self.sigreopenlog)
+
+    def sigexit(self, sig, frame):
+        self.mood = -1 # exiting
+        self.logger.critical('received %s indicating exit request' %
+                                     signame(sig))
+
+    def sighup(self, sig, frame):
+        self.mood = 0 # restarting
+        self.logger.critical('received %s indicating restart request' %
+                             signame(sig))
+
+    def sigreopenlog(self, sig, frame):
+        self.mustreopen = True
+        self.logger.info('received %s indicating log reopen request' %
+                         signame(sig))
+
+    def sigchild(self, sig, frame):
+        # do nothing here, we reap our children synchronously
+        self.logger.debug('received %s' % signame(sig))
+
+    def create_autochildlogs(self):
+        for program in self.programs:
+            if program.logfile is self.AUTOMATIC:
+                # temporary logfile which is erased at start time
+                prefix='%s---%s-' % (program.name, self.identifier)
+                fd, logfile = tempfile.mkstemp(
+                    suffix='.log',
+                    prefix=prefix,
+                    dir=self.childlogdir)
+                os.close(fd)
+                program.logfile = logfile
+
+    def clear_autochildlogdir(self):
         # must be called after realize()
         childlogdir = self.childlogdir
         fnre = re.compile(r'.+?---%s-\S+\.log\.{0,1}\d{0,4}' % self.identifier)
@@ -736,6 +872,142 @@ class ServerOptions(Options):
                 except (os.error, IOError):
                     self.logger.info('Failed to clean up %r' % pathname)
 
+    def get_socket_map(self):
+        return asyncore.socket_map
+
+    def cleanup_fds(self):
+        # try to close any unused file descriptors to prevent leakage.
+        # we start at the "highest" descriptor in the asyncore socket map
+        # because this might be called remotely and we don't want to close
+        # the internet channel during this call.
+        asyncore_fds = asyncore.socket_map.keys()
+        start = 5
+        if asyncore_fds:
+            start = max(asyncore_fds) + 1
+        for x in range(start, self.minfds):
+            try:
+                os.close(x)
+            except:
+                pass
+
+    def kill(self, pid, signal):
+        os.kill(pid, signal)
+
+    def set_uid(self):
+        if self.uid is None:
+            if os.getuid() == 0:
+                return 'Supervisor running as root (no user in config file)'
+            return None
+        msg = self.dropPrivileges(self.uid)
+        if msg is None:
+            return 'Set uid to user %s' % self.uid
+        return msg
+
+    def dropPrivileges(self, user):
+        # Drop root privileges if we have them
+        if user is None:
+            return "No used specified to setuid to!"
+        if os.getuid() != 0:
+            return "Can't drop privilege as nonroot user"
+        try:
+            uid = int(user)
+        except ValueError:
+            try:
+                pwrec = pwd.getpwnam(user)
+            except KeyError:
+                return "Can't find username %r" % user
+            uid = pwrec[2]
+        else:
+            try:
+                pwrec = pwd.getpwuid(uid)
+            except KeyError:
+                return "Can't find uid %r" % uid
+        if hasattr(os, 'setgroups'):
+            user = pwrec[0]
+            groups = [grprec[2] for grprec in grp.getgrall() if user in
+                      grprec[3]]
+            try:
+                os.setgroups(groups)
+            except OSError:
+                return 'Could not set groups of effective user'
+        gid = pwrec[3]
+        try:
+            os.setgid(gid)
+        except OSError:
+            return 'Could not set group id of effective user'
+        os.setuid(uid)
+
+    def waitpid(self):
+        try:
+            pid, sts = os.waitpid(-1, os.WNOHANG)
+        except os.error, why:
+            err = why[0]
+            if err not in (errno.ECHILD, errno.EINTR):
+                self.logger.info(
+                    'waitpid error; a process may not be cleaned up properly')
+            if err == errno.EINTR:
+                self.logger.debug('EINTR during reap')
+            pid, sts = None, None
+        return pid, sts
+
+    def set_rlimits(self):
+        limits = []
+        if hasattr(resource, 'RLIMIT_NOFILE'):
+            limits.append(
+                {
+                'msg':('The minimum number of file descriptors required '
+                       'to run this process is %(min)s as per the "minfds" '
+                       'command-line argument or config file setting. '
+                       'The current environment will only allow you '
+                       'to open %(hard)s file descriptors.  Either raise '
+                       'the number of usable file descriptors in your '
+                       'environment (see README.txt) or lower the '
+                       'minfds setting in the config file to allow '
+                       'the process to start.'),
+                'min':self.minfds,
+                'resource':resource.RLIMIT_NOFILE,
+                'name':'RLIMIT_NOFILE',
+                })
+        if hasattr(resource, 'RLIMIT_NPROC'):
+            limits.append(
+                {
+                'msg':('The minimum number of available processes required '
+                       'to run this program is %(min)s as per the "minprocs" '
+                       'command-line argument or config file setting. '
+                       'The current environment will only allow you '
+                       'to open %(hard)s processes.  Either raise '
+                       'the number of usable processes in your '
+                       'environment (see README.txt) or lower the '
+                       'minprocs setting in the config file to allow '
+                       'the program to start.'),
+                'min':self.minprocs,
+                'resource':resource.RLIMIT_NPROC,
+                'name':'RLIMIT_NPROC',
+                })
+
+        msgs = []
+            
+        for limit in limits:
+
+            min = limit['min']
+            res = limit['resource']
+            msg = limit['msg']
+            name = limit['name']
+
+            soft, hard = resource.getrlimit(res)
+            
+            if (soft < min) and (soft != -1): # -1 means unlimited 
+                if (hard < min) and (hard != -1):
+                    self.usage(msg % locals())
+
+                try:
+                    resource.setrlimit(res, (min, hard))
+                    msgs.append('Increased %(name)s limit to %(min)s' %
+                                locals())
+                except (resource.error, ValueError):
+                    self.usage(msg % locals())
+        return msgs
+
     def make_logger(self, critical_messages, info_messages):
         # must be called after realize() and after supervisor does setuid()
         format =  '%(asctime)s %(levelname)s %(message)s\n'
@@ -744,6 +1016,7 @@ class ServerOptions(Options):
         logging.addLevelName(logging.INFO, 'INFO')
         logging.addLevelName(logging.WARN, 'WARN')
         logging.addLevelName(logging.ERROR, 'ERRO')
+        logging.addLevelName(self.TRACE, 'TRAC')
         self.logger = self.getLogger(
             self.logfile,
             self.loglevel,
@@ -762,6 +1035,10 @@ class ServerOptions(Options):
         for msg in info_messages:
             self.logger.info(msg)
 
+    def make_process(self, config):
+        from supervisord import Subprocess
+        return Subprocess(self, config)
+
 class ClientOptions(Options):
     positional_args_allowed = 1
 
@@ -792,6 +1069,7 @@ class ClientOptions(Options):
         self.add("password", "supervisorctl.password", "p:", "password=")
 
     def realize(self, *arg, **kw):
+        os.environ['SUPERVISOR_ENABLED'] = '1'
         Options.realize(self, *arg, **kw)
         if not self.args:
             self.interactive = 1
@@ -1027,3 +1305,58 @@ def gettags(comment):
     tags.append((tag_lineno, tag, datatype, name, '\n'.join(tag_text)))
 
     return tags
+
+
+# Helpers for dealing with signals and exit status
+
+def decode_wait_status(sts):
+    """Decode the status returned by wait() or waitpid().
+
+    Return a tuple (exitstatus, message) where exitstatus is the exit
+    status, or -1 if the process was killed by a signal; and message
+    is a message telling what happened.  It is the caller's
+    responsibility to display the message.
+    """
+    if os.WIFEXITED(sts):
+        es = os.WEXITSTATUS(sts) & 0xffff
+        msg = "exit status %s" % es
+        return es, msg
+    elif os.WIFSIGNALED(sts):
+        sig = os.WTERMSIG(sts)
+        msg = "terminated by %s" % signame(sig)
+        if hasattr(os, "WCOREDUMP"):
+            iscore = os.WCOREDUMP(sts)
+        else:
+            iscore = sts & 0x80
+        if iscore:
+            msg += " (core dumped)"
+        return -1, msg
+    else:
+        msg = "unknown termination cause 0x%04x" % sts
+        return -1, msg
+
+
+_signames = None
+
+def signame(sig):
+    """Return a symbolic name for a signal.
+
+    Return "signal NNN" if there is no corresponding SIG name in the
+    signal module.
+    """
+
+    if _signames is None:
+        _init_signames()
+    return _signames.get(sig) or "signal %d" % sig
+
+def _init_signames():
+    global _signames
+    d = {}
+    for k, v in signal.__dict__.items():
+        k_startswith = getattr(k, "startswith", None)
+        if k_startswith is None:
+            continue
+        if k_startswith("SIG") and not k_startswith("SIG_"):
+            d[v] = k
+    _signames = d
+

+ 33 - 341
src/supervisor/supervisord.py

@@ -48,21 +48,17 @@ import errno
 import socket
 import select
 import signal
-import pwd
-import grp
 import asyncore
 import traceback
 import StringIO
-import resource
 import stat
-import re
-import tempfile
 import shlex
 
 from fcntl import fcntl
 from fcntl import F_SETFL, F_GETFL
 
 from options import ServerOptions
+from options import decode_wait_status
 
 class ProcessStates:
     RUNNING = 0
@@ -338,7 +334,7 @@ class Subprocess:
             self.options.logger.debug('killing %s (pid %s)' % (self.config.name,
                                                                self.pid))
             self.killing = 1
-            os.kill(self.pid, sig)
+            self.options.kill(self.pid, sig)
         except:
             io = StringIO.StringIO()
             traceback.print_exc(file=io)
@@ -424,7 +420,7 @@ class Subprocess:
     def set_uid(self):
         if self.config.uid is None:
             return
-        msg = dropPrivileges(self.config.uid)
+        msg = self.options.dropPrivileges(self.config.uid)
         return msg
 
     def __cmp__(self, other):
@@ -436,15 +432,11 @@ class Subprocess:
             if self.childlog:
                 self.childlog.info(data)
 
-    def trace(self, data):
-        # 'trace' level logging to main log file
-        msg = '%s output:\n%s' % (self.config.name, data)
-        self.options.logger.log(5, msg)
-
     def log_stdout(self, data):
         if data:
             self.log(data)
-            self.trace(data)
+            msg = '%s output:\n%s' % (self.config.name, data)
+            self.options.logger.log(self.options.TRACE, msg)
 
     log_stderr = log_stdout
 
@@ -471,22 +463,21 @@ class Subprocess:
 
 class Supervisor:
 
-    mood = 1 # 1: up, 0: restarting, -1: suicidal
-    stopping = False # set after we detect that we are handling a stop request
-    mustreopen = False # set after we detect we handled a logreopen signal
+    stopping = False
+
+    def __init__(self, options):
+        self.options = options
 
     def main(self, args=None, test=False, first=False):
-        os.environ['SUPERVISOR_ENABLED'] = '1'
-        self.options = ServerOptions()
         self.options.realize(args)
-        self.cleanup_fds()
+        self.options.cleanup_fds()
         info_messages = []
         critical_messages = []
-        setuid_msg = self.set_uid()
+        setuid_msg = self.options.set_uid()
         if setuid_msg:
             critical_messages.append(setuid_msg)
         if first:
-            rlimit_messages = self.set_rlimits()
+            rlimit_messages = self.options.set_rlimits()
             info_messages.extend(rlimit_messages)
 
         # this sets the options.logger object
@@ -495,25 +486,16 @@ class Supervisor:
 
         if not self.options.nocleanup:
             # clean up old automatic logs
-            self.options.clear_childlogdir()
+            self.options.clear_autochildlogdir()
 
         # delay "automatic" child log creation until after setuid because
         # we want to use mkstemp, which needs to create the file eagerly
-        for program in self.options.programs:
-            if program.logfile is self.options.AUTOMATIC:
-                # temporary logfile which is erased at start time
-                prefix='%s---%s-' % (program.name, self.options.identifier)
-                fd, logfile = tempfile.mkstemp(
-                    suffix='.log',
-                    prefix=prefix,
-                    dir=self.options.childlogdir)
-                os.close(fd)
-                program.logfile = logfile
+        self.options.create_autochildlogs()
 
         self.run(test)
 
     def get_state(self):
-        if self.mood <= 0:
+        if self.options.mood <= 0:
             return SupervisorStates.SHUTDOWN
         return SupervisorStates.ACTIVE
 
@@ -572,16 +554,7 @@ class Supervisor:
         # Python doesn't offer it as it's not standard across UNIX versions.
         # there is still a race condition here; we can get a sigchild while
         # we're sitting in the waitpid call.
-        try:
-            pid, sts = os.waitpid(-1, os.WNOHANG)
-        except os.error, why:
-            err = why[0]
-            if err not in (errno.ECHILD, errno.EINTR):
-                self.options.logger.info(
-                    'waitpid error; a process may not be cleaned up properly')
-            if err == errno.EINTR:
-                self.options.logger.debug('EINTR during reap')
-            pid, sts = None, None
+        pid, sts = self.options.waitpid()
         if pid:
             name = '<unknown>'
             process = self.options.pidhistory.get(pid)
@@ -601,231 +574,34 @@ class Supervisor:
                                          % pid)
             return
         self.options.logger.debug('set wait status on %s' % proc.config.name)
+        
         proc.finaloutput = _readfd(proc.stdoutfd)
         proc.waitstatus = pid, sts
         proc.killing = 0
         proc.laststop = time.time()
 
-    def cleanup_fds(self):
-        # try to close any unused file descriptors to prevent leakage.
-        # we start at the "highest" descriptor in the asyncore socket map
-        # because this might be called remotely and we don't want to close
-        # the internet channel during this call.
-        asyncore_fds = asyncore.socket_map.keys()
-        start = 5
-        if asyncore_fds:
-            start = max(asyncore_fds) + 1
-        for x in range(start, self.options.minfds):
-            try:
-                os.close(x)
-            except:
-                pass
-
-    def set_uid(self):
-        if self.options.uid is None:
-            if os.getuid() == 0:
-                return 'Supervisor running as root (no user in config file)'
-            return None
-        msg = dropPrivileges(self.options.uid)
-        if msg is None:
-            return 'Set uid to user %s' % self.options.uid
-        return msg
-
-    def set_rlimits(self):
-        limits = []
-        if hasattr(resource, 'RLIMIT_NOFILE'):
-            limits.append(
-                {
-                'msg':('The minimum number of file descriptors required '
-                       'to run this process is %(min)s as per the "minfds" '
-                       'command-line argument or config file setting. '
-                       'The current environment will only allow you '
-                       'to open %(hard)s file descriptors.  Either raise '
-                       'the number of usable file descriptors in your '
-                       'environment (see README.txt) or lower the '
-                       'minfds setting in the config file to allow '
-                       'the process to start.'),
-                'min':self.options.minfds,
-                'resource':resource.RLIMIT_NOFILE,
-                'name':'RLIMIT_NOFILE',
-                })
-        if hasattr(resource, 'RLIMIT_NPROC'):
-            limits.append(
-                {
-                'msg':('The minimum number of available processes required '
-                       'to run this program is %(min)s as per the "minprocs" '
-                       'command-line argument or config file setting. '
-                       'The current environment will only allow you '
-                       'to open %(hard)s processes.  Either raise '
-                       'the number of usable processes in your '
-                       'environment (see README.txt) or lower the '
-                       'minprocs setting in the config file to allow '
-                       'the program to start.'),
-                'min':self.options.minprocs,
-                'resource':resource.RLIMIT_NPROC,
-                'name':'RLIMIT_NPROC',
-                })
-
-        msgs = []
-            
-        for limit in limits:
-
-            min = limit['min']
-            res = limit['resource']
-            msg = limit['msg']
-            name = limit['name']
-
-            soft, hard = resource.getrlimit(res)
-            
-            if (soft < min) and (soft != -1): # -1 means unlimited 
-                if (hard < min) and (hard != -1):
-                    self.options.usage(msg % locals())
-
-                try:
-                    resource.setrlimit(res, (min, hard))
-                    msgs.append('Increased %(name)s limit to %(min)s' %
-                                locals())
-                except (resource.error, ValueError):
-                    self.options.usage(msg % locals())
-        return msgs
-
     def run(self, test=False):
         self.processes = {}
         for program in self.options.programs:
             name = program.name
-            self.processes[name] = Subprocess(self.options, program)
+            self.processes[name] = self.options.make_process(program)
         try:
-            pid = os.getpid()
-            try:
-                f = open(self.options.pidfile, 'w')
-                f.write('%s\n' % pid)
-                f.close()
-            except (IOError, os.error):
-                self.options.logger.critical('could not write pidfile %s' %
-                                             self.options.pidfile)
-            else:
-                self.options.logger.info('supervisord started with pid %s' %
-                                         pid)
-                
-            self.openhttpserver()
-            self.setsignals()
+            self.options.write_pidfile()
+            self.options.openhttpserver(self)
+            self.options.setsignals()
             if not self.options.nodaemon:
-                self.daemonize()
+                self.options.daemonize()
             self.runforever(test)
         finally:
-            try:
-                if self.options.http_port is not None:
-                    if self.options.http_port.family == socket.AF_UNIX:
-                        os.unlink(self.options.http_port.address)
-            except os.error:
-                pass
-            try:
-                os.unlink(self.options.pidfile)
-            except os.error:
-                pass
-
-    def openhttpserver(self):
-        from http import make_http_server
-        try:
-            self.httpserver = make_http_server(self)
-        except socket.error, why:
-            if why[0] == errno.EADDRINUSE:
-                port = str(self.options.http_port.address)
-                self.options.usage('Another program is already listening on '
-                                   'the port that our HTTP server is '
-                                   'configured to use (%s).  Shut this program '
-                                   'down first before starting supervisord. ' %
-                                   port)
-        except ValueError, why:
-            self.options.usage(why[0])
-
-    def setsignals(self):
-        signal.signal(signal.SIGTERM, self.sigexit)
-        signal.signal(signal.SIGINT, self.sigexit)
-        signal.signal(signal.SIGQUIT, self.sigexit)
-        signal.signal(signal.SIGHUP, self.sighup)
-        signal.signal(signal.SIGCHLD, self.sigchild)
-        signal.signal(signal.SIGUSR2, self.sigreopenlog)
-
-    def sigexit(self, sig, frame):
-        self.mood = -1 # exiting
-        self.options.logger.critical('received %s indicating exit request' %
-                                     signame(sig))
-
-    def sighup(self, sig, frame):
-        self.mood = 0 # restarting
-        self.options.logger.critical('received %s indicating restart request' %
-                                     signame(sig))
-
-    def sigreopenlog(self, sig, frame):
-        self.mustreopen = True
-        self.options.logger.info('received %s indicating log reopen request' %
-                                 signame(sig))
-
-    def sigchild(self, sig, frame):
-        # do nothing here, we reap our children synchronously
-        self.options.logger.debug('received %s' % signame(sig))
-
-    def daemonize(self):
-
-        # To daemonize, we need to become the leader of our own session
-        # (process) group.  If we do not, signals sent to our
-        # parent process will also be sent to us.   This might be bad because
-        # signals such as SIGINT can be sent to our parent process during
-        # normal (uninteresting) operations such as when we press Ctrl-C in the
-        # parent terminal window to escape from a logtail command.
-        # To disassociate ourselves from our parent's session group we use
-        # os.setsid.  It means "set session id", which has the effect of
-        # disassociating a process from is current session and process group
-        # and setting itself up as a new session leader.
-        #
-        # Unfortunately we cannot call setsid if we're already a session group
-        # leader, so we use "fork" to make a copy of ourselves that is
-        # guaranteed to not be a session group leader.
-        #
-        # We also change directories, set stderr and stdout to null, and
-        # change our umask.
-        #
-        # This explanation was (gratefully) garnered from
-        # http://www.hawklord.uklinux.net/system/daemons/d3.htm
-
-        pid = os.fork()
-        if pid != 0:
-            # Parent
-            self.options.logger.debug("supervisord forked; parent exiting")
-            os._exit(0)
-        # Child
-        self.options.logger.info("daemonizing the process")
-        if self.options.directory:
-            try:
-                os.chdir(self.options.directory)
-            except os.error, err:
-                self.options.logger.warn("can't chdir into %r: %s"
-                                         % (self.options.directory, err))
-            else:
-                self.options.logger.info("set current directory: %r"
-                                         % self.options.directory)
-        os.close(0)
-        sys.stdin = sys.__stdin__ = open("/dev/null")
-        os.close(1)
-        sys.stdout = sys.__stdout__ = open("/dev/null", "w")
-        os.close(2)
-        sys.stderr = sys.__stderr__ = open("/dev/null", "w")
-        os.setsid()
-        os.umask(self.options.umask)
-        # XXX Stevens, in his Advanced Unix book, section 13.3 (page
-        # 417) recommends calling umask(0) and closing unused
-        # file descriptors.  In his Network Programming book, he
-        # additionally recommends ignoring SIGHUP and forking again
-        # after the setsid() call, for obscure SVR4 reasons.
+            self.options.cleanup()
 
     def runforever(self, test=False):
         timeout = .5
 
-        socket_map = asyncore.socket_map
+        socket_map = self.options.get_socket_map()
 
         while 1:
-            if self.mood > 0:
+            if self.options.mood > 0:
                 self.start_necessary()
 
             self.handle_procs_with_waitstatus()
@@ -847,7 +623,7 @@ class Supervisor:
                     proc.log_stdout(proc.finaloutput)
                     proc.finaloutput = ''
 
-            if self.mood < 1:
+            if self.options.mood < 1:
                 if not self.stopping:
                     self.stop_all()
                     self.stopping = True
@@ -866,8 +642,8 @@ class Supervisor:
                 r, w, x = select.select(r, w, x, timeout)
             except select.error, err:
                 if err[0] == errno.EINTR:
-                    #trace
-                    self.options.logger.log(5,'EINTR encountered in select')
+                    self.options.logger.log(self.options.TRACE,
+                                            'EINTR encountered in select')
                 else:
                     raise
                 r = w = x = []
@@ -906,9 +682,9 @@ class Supervisor:
             self.handle_procs_with_delay()
             self.reap()
 
-            if self.mustreopen:
+            if self.options.mustreopen:
                 self.logreopen()
-                self.mustreopen = False
+                self.options.mustreopen = False
 
             if test:
                 break
@@ -931,58 +707,6 @@ def _readfd(fd):
         data = ''
     return data
 
-# Helpers for dealing with signals and exit status
-
-def decode_wait_status(sts):
-    """Decode the status returned by wait() or waitpid().
-
-    Return a tuple (exitstatus, message) where exitstatus is the exit
-    status, or -1 if the process was killed by a signal; and message
-    is a message telling what happened.  It is the caller's
-    responsibility to display the message.
-    """
-    if os.WIFEXITED(sts):
-        es = os.WEXITSTATUS(sts) & 0xffff
-        msg = "exit status %s" % es
-        return es, msg
-    elif os.WIFSIGNALED(sts):
-        sig = os.WTERMSIG(sts)
-        msg = "terminated by %s" % signame(sig)
-        if hasattr(os, "WCOREDUMP"):
-            iscore = os.WCOREDUMP(sts)
-        else:
-            iscore = sts & 0x80
-        if iscore:
-            msg += " (core dumped)"
-        return -1, msg
-    else:
-        msg = "unknown termination cause 0x%04x" % sts
-        return -1, msg
-
-_signames = None
-
-def signame(sig):
-    """Return a symbolic name for a signal.
-
-    Return "signal NNN" if there is no corresponding SIG name in the
-    signal module.
-    """
-
-    if _signames is None:
-        _init_signames()
-    return _signames.get(sig) or "signal %d" % sig
-
-def _init_signames():
-    global _signames
-    d = {}
-    for k, v in signal.__dict__.items():
-        k_startswith = getattr(k, "startswith", None)
-        if k_startswith is None:
-            continue
-        if k_startswith("SIG") and not k_startswith("SIG_"):
-            d[v] = k
-    _signames = d
-
 def get_path():
     """Return a list corresponding to $PATH, or a default."""
     path = ["/bin", "/usr/bin", "/usr/local/bin"]
@@ -992,39 +716,6 @@ def get_path():
             path = p.split(os.pathsep)
     return path
 
-def dropPrivileges(user):
-    # Drop root privileges if we have them
-    if user is None:
-        return "No used specified to setuid to!"
-    if os.getuid() != 0:
-        return "Can't drop privilege as nonroot user"
-    try:
-        uid = int(user)
-    except ValueError:
-        try:
-            pwrec = pwd.getpwnam(user)
-        except KeyError:
-            return "Can't find username %r" % user
-        uid = pwrec[2]
-    else:
-        try:
-            pwrec = pwd.getpwuid(uid)
-        except KeyError:
-            return "Can't find uid %r" % uid
-    if hasattr(os, 'setgroups'):
-        user = pwrec[0]
-        groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]]
-        try:
-            os.setgroups(groups)
-        except OSError:
-            return 'Could not set groups of effective user'
-    gid = pwrec[3]
-    try:
-        os.setgid(gid)
-    except OSError:
-        return 'Could not set group id of effective user'
-    os.setuid(uid)
-
 # Main program
 def main(test=False):
     assert os.name == "posix", "This code makes Unix-specific assumptions"
@@ -1032,12 +723,13 @@ def main(test=False):
     while 1:
         # if we hup, restart by making a new Supervisor()
         # the test argument just makes it possible to unit test this code
-        d = Supervisor()
+        options = ServerOptions()
+        d = Supervisor(options)
         d.main(None, test, first)
         first = False
         if test:
             return d
-        if d.mood < 0:
+        if d.options.mood < 0:
             sys.exit(0)
         for proc in d.processes.values():
             proc.removelogs()

+ 154 - 11
src/supervisor/tests.py

@@ -985,14 +985,6 @@ class SubprocessTests(unittest.TestCase):
         instance.log('foo')
         self.assertEqual(instance.childlog.data, ['foo'])
 
-    def test_trace(self):
-        # trace messages go to the main logger
-        options = DummyOptions()
-        config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo')
-        instance = self._makeOne(options, config)
-        instance.trace('foo')
-        self.assertEqual(options.logger.data, [5, 'notthere output:\nfoo'])
-
     def test_log_stdout(self):
         # stdout goes to the process log and the main log
         options = DummyOptions()
@@ -1249,6 +1241,95 @@ class LogtailHandlerTests(unittest.TestCase):
         self.assertEqual(len(request.producers), 1)
         self.assertEqual(request._done, True)
 
+class SupervisordTests(unittest.TestCase):
+    def _getTargetClass(self):
+        from supervisord import Supervisor
+        return Supervisor
+
+    def _makeOne(self, options):
+        return self._getTargetClass()(options)
+
+    def test_main(self):
+        options = DummyOptions()
+        supervisord = self._makeOne(options)
+        pconfig = DummyPConfig('foo', 'foo', '/bin/foo')
+        options.programs = [pconfig]
+        supervisord.main(args='abc', test=True, first=True)
+        self.assertEqual(options.realizeargs, 'abc')
+        self.assertEqual(options.fds_cleaned_up, True)
+        self.assertEqual(options.rlimits_set, True)
+        self.assertEqual(options.make_logger_messages,
+                         (['setuid_called'], ['rlimits_set']))
+        self.assertEqual(options.autochildlogdir_cleared, True)
+        self.assertEqual(options.autochildlogs_created, True)
+        self.assertEqual(len(supervisord.processes), 1)
+        self.assertEqual(supervisord.processes['foo'].options, options)
+        self.assertEqual(options.pidfile_written, True)
+        self.assertEqual(options.httpserver_opened, True)
+        self.assertEqual(options.signals_set, True)
+        self.assertEqual(options.daemonized, True)
+        self.assertEqual(options.cleaned_up, True)
+
+    def test_get_state(self):
+        from supervisord import SupervisorStates
+        options = DummyOptions()
+        supervisord = self._makeOne(options)
+        self.assertEqual(supervisord.get_state(), SupervisorStates.ACTIVE)
+        options.mood = -1
+        self.assertEqual(supervisord.get_state(), SupervisorStates.SHUTDOWN)
+
+    def test_start_necessary(self):
+        from supervisord import ProcessStates
+        options = DummyOptions()
+        pconfig1 = DummyPConfig('killed', 'killed', '/bin/killed')
+        process1 = DummyProcess(options, pconfig1, ProcessStates.KILLED)
+        pconfig2 = DummyPConfig('error', 'error', '/bin/error')
+        process2 = DummyProcess(options, pconfig2, ProcessStates.ERROR)
+        pconfig3 = DummyPConfig('notstarted', 'notstarted', '/bin/notstarted',
+                                autostart=True)
+        process3 = DummyProcess(options, pconfig3, ProcessStates.NOTSTARTED)
+        pconfig4 = DummyPConfig('wontstart', 'wonstart', '/bin/wontstart',
+                                autostart=False)
+        process4 = DummyProcess(options, pconfig4, ProcessStates.NOTSTARTED)
+
+        supervisord = self._makeOne(options)
+        supervisord.processes = {'killed': process1, 'error': process2,
+                                 'notstarted':process3, 'wontstart':process4}
+        supervisord.start_necessary()
+        self.assertEqual(process1.spawned, True)
+        self.assertEqual(process2.spawned, False)
+        self.assertEqual(process3.spawned, True)
+        self.assertEqual(process4.spawned, False)
+
+    def handle_procs_with_waitstatus(self):
+        options = DummyOptions()
+        pconfig1 = DummyPConfig('process1', 'process1', '/bin/process1')
+        process1 = DummyProcess(options, pconfig1)
+        pconfig2 = DummyPConfig('process2', 'process2', '/bin/process2')
+        process2 = DummyProcess(options, pconfig2)
+        process2.waitstatus = True
+        supervisord = self._makeOne(options)
+        supervisord.processes = {'killed': process1, 'error': process2}
+
+        supervisord.handle_procs_with_waitstatus()
+        self.assertEqual(process1.status_reported, False)
+        self.assertEqual(process2.status_reported, True)
+
+    def test_stop_all(self):
+        options = DummyOptions()
+        pconfig1 = DummyPConfig('process1', 'process1', '/bin/process1')
+        process1 = DummyProcess(options, pconfig1)
+        pconfig2 = DummyPConfig('process2', 'process2', '/bin/process2')
+        process2 = DummyProcess(options, pconfig2)
+        process2.pid = 1
+        supervisord = self._makeOne(options)
+        supervisord.processes = {'killed': process1, 'error': process2}
+
+        supervisord.stop_all()
+        self.assertEqual(process1.stop_called, False)
+        self.assertEqual(process2.stop_called, True)
+        
+        
 class ControllerTests(unittest.TestCase):
     def _getTargetClass(self):
         from supervisorctl import Controller
@@ -1628,6 +1709,7 @@ class TailFProducerTests(unittest.TestCase):
 class DummyProcess:
     # Initial state; overridden by instance variables
     pid = 0 # Subprocess pid; 0 when not running
+    beenstarted = False
     laststart = 0 # Last time the subprocess was started; 0 if never
     laststop = 0  # Last time the subprocess was stopped; 0 if never
     delay = 0 # If nonzero, delay starting or killing until this time
@@ -1656,7 +1738,7 @@ class DummyProcess:
         self.logsremoved = False
         self.stop_called = False
         self.backoff_done = False
-        self.spawned = True
+        self.spawned = False
         self.state = state
         self.error_at_clear = False
 
@@ -1679,12 +1761,15 @@ class DummyProcess:
     def spawn(self):
         self.spawned = True
 
+    def reportstatus(self):
+        self.status_reported = True
+
     def __cmp__(self, other):
         return cmp(self.config.priority, other.config.priority)
 
 class DummyPConfig:
     def __init__(self, name, command, priority=999, autostart=True,
-                 autorestart=False, uid=None, logfile=None, logfile_backups=0,
+                 autorestart=True, uid=None, logfile=None, logfile_backups=0,
                  logfile_maxbytes=0, stopsignal=signal.SIGTERM,
                  exitcodes=[0,2]):
         self.name = name
@@ -1718,6 +1803,11 @@ class DummyLogger:
         self.removed = True
 
 class DummyOptions:
+
+    TRACE = 5
+    directory = None
+    waitpid_return = None, None
+
     def __init__(self):
         self.identifier = 'supervisor'
         self.childlogdir = '/tmp'
@@ -1725,8 +1815,13 @@ class DummyOptions:
         self.logger = self.getLogger()
         self.backofflimit = 10
         self.logfile = '/tmp/logfile'
-        self.nocleanup = True
+        self.nocleanup = False
         self.pidhistory = {}
+        self.programs = []
+        self.nodaemon = False
+        self.socket_map = {}
+        self.mood = 1
+        self.mustreopen = False
 
     def getLogger(self, *args):
         logger = DummyLogger()
@@ -1734,6 +1829,53 @@ class DummyOptions:
         logger.args = args
         return logger
 
+    def realize(self, args):
+        self.realizeargs = args
+
+    def cleanup_fds(self):
+        self.fds_cleaned_up = True
+
+    def set_rlimits(self):
+        self.rlimits_set = True
+        return ['rlimits_set']
+
+    def set_uid(self):
+        self.setuid_called = True
+        return 'setuid_called'
+
+    def openhttpserver(self, supervisord):
+        self.httpserver_opened = True
+
+    def setsignals(self):
+        self.signals_set = True
+
+    def daemonize(self):
+        self.daemonized = True
+
+    def get_socket_map(self):
+        return self.socket_map
+
+    def make_logger(self, critical_msgs, info_msgs):
+        self.make_logger_messages = critical_msgs, info_msgs
+
+    def create_autochildlogs(self):
+        self.autochildlogs_created = True
+
+    def clear_autochildlogdir(self):
+        self.autochildlogdir_cleared = True
+
+    def cleanup(self):
+        self.cleaned_up = True
+
+    def write_pidfile(self):
+        self.pidfile_written = True
+
+    def waitpid(self):
+        return self.waitpid_return
+
+    def make_process(self, config):
+        return DummyProcess(self, config)
+
 class DummyClientOptions:
     def __init__(self):
         self.prompt = 'supervisor'
@@ -1915,6 +2057,7 @@ class DummyRequest:
         
 def test_suite():
     suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(SupervisordTests))
     suite.addTest(unittest.makeSuite(ControllerTests))
     suite.addTest(unittest.makeSuite(INIOptionTests))
     suite.addTest(unittest.makeSuite(SupervisorNamespaceXMLRPCInterfaceTests))