浏览代码

Implement "ordered shutdown". We shut process groups down one at a time (the one with the lowest priority gets shut down first, the highest last). We wait for groups with lower priorities to shut down before trying to shut down higher priority groups.

Chris McDonough 18 年之前
父节点
当前提交
8c2c85462a
共有 5 个文件被更改,包括 58 次插入12 次删除
  1. 6 0
      src/supervisor/process.py
  2. 1 4
      src/supervisor/rpcinterface.py
  3. 11 0
      src/supervisor/states.py
  4. 36 5
      src/supervisor/supervisord.py
  5. 4 3
      src/supervisor/tests/base.py

+ 6 - 0
src/supervisor/process.py

@@ -24,6 +24,7 @@ import signal
 
 
 from supervisor.states import ProcessStates
 from supervisor.states import ProcessStates
 from supervisor.states import getProcessStateDescription
 from supervisor.states import getProcessStateDescription
+from supervisor.states import STOPPED_STATES
 
 
 from supervisor.options import decode_wait_status
 from supervisor.options import decode_wait_status
 from supervisor.options import signame
 from supervisor.options import signame
@@ -515,6 +516,11 @@ class ProcessGroupBase:
         """ Processes which are starting or stopping """
         """ Processes which are starting or stopping """
         return [ x for x in self.processes.values() if x.delay ]
         return [ x for x in self.processes.values() if x.delay ]
 
 
+    def get_unstopped_processes(self):
+        """ Processes which aren't in a state that is considered 'stopped' """
+        return [ x for x in self.processes.values() if x.get_state() not in
+                 STOPPED_STATES ]
+
     def get_undead(self):
     def get_undead(self):
         """ Processes which we've attempted to stop but which haven't responded
         """ Processes which we've attempted to stop but which haven't responded
         to a kill request within a given amount of time (stopwaitsecs) """
         to a kill request within a given amount of time (stopwaitsecs) """

+ 1 - 4
src/supervisor/rpcinterface.py

@@ -20,10 +20,7 @@ from supervisor.states import SupervisorStates
 from supervisor.states import getSupervisorStateDescription
 from supervisor.states import getSupervisorStateDescription
 from supervisor.states import ProcessStates
 from supervisor.states import ProcessStates
 from supervisor.states import getProcessStateDescription
 from supervisor.states import getProcessStateDescription
-
-RUNNING_STATES = (ProcessStates.RUNNING,
-                  ProcessStates.BACKOFF,
-                  ProcessStates.STARTING)
+from supervisor.states import RUNNING_STATES
 
 
 API_VERSION  = '3.0'
 API_VERSION  = '3.0'
 
 

+ 11 - 0
src/supervisor/states.py

@@ -8,6 +8,17 @@ class ProcessStates:
     FATAL = 200
     FATAL = 200
     UNKNOWN = 1000
     UNKNOWN = 1000
 
 
+STOPPED_STATES = (ProcessStates.STOPPED,
+                  ProcessStates.EXITED,
+                  ProcessStates.FATAL,
+                  ProcessStates.UNKNOWN)
+
+RUNNING_STATES = (ProcessStates.RUNNING,
+                  ProcessStates.BACKOFF,
+                  ProcessStates.STARTING)
+
+
+
 def getProcessStateDescription(code):
 def getProcessStateDescription(code):
     for statename in ProcessStates.__dict__:
     for statename in ProcessStates.__dict__:
         if getattr(ProcessStates, statename) == code:
         if getattr(ProcessStates, statename) == code:

+ 36 - 5
src/supervisor/supervisord.py

@@ -52,12 +52,14 @@ from supervisor.options import ServerOptions
 from supervisor.options import signame
 from supervisor.options import signame
 from supervisor import events
 from supervisor import events
 from supervisor.states import SupervisorStates
 from supervisor.states import SupervisorStates
+from supervisor.states import getProcessStateDescription
 
 
 class Supervisor:
 class Supervisor:
     mood = 1 # 1: up, 0: restarting, -1: suicidal
     mood = 1 # 1: up, 0: restarting, -1: suicidal
     stopping = False # set after we detect that we are handling a stop request
     stopping = False # set after we detect that we are handling a stop request
     lastdelayreport = 0 # throttle for delayed process error reports at stop
     lastdelayreport = 0 # throttle for delayed process error reports at stop
     process_groups = None # map of process group name to process group object
     process_groups = None # map of process group name to process group object
+    stop_groups = None # list used for priority ordered shutdown
 
 
     def __init__(self, options):
     def __init__(self, options):
         self.options = options
         self.options = options
@@ -90,6 +92,7 @@ class Supervisor:
 
 
     def run(self, test=False):
     def run(self, test=False):
         self.process_groups = {} # clear
         self.process_groups = {} # clear
+        self.stop_groups = None # clear
         events.clear()
         events.clear()
         try:
         try:
             for config in self.options.process_group_configs:
             for config in self.options.process_group_configs:
@@ -127,12 +130,33 @@ class Supervisor:
             if now > (self.lastdelayreport + 3): # every 3 secs
             if now > (self.lastdelayreport + 3): # every 3 secs
                 names = [ p.config.name for p in delayprocs]
                 names = [ p.config.name for p in delayprocs]
                 namestr = ', '.join(names)
                 namestr = ', '.join(names)
-                self.options.logger.info('waiting for %s to die' %
-                                         namestr)
+                self.options.logger.info('waiting for %s to die' % namestr)
                 self.lastdelayreport = now
                 self.lastdelayreport = now
+                for proc in delayprocs:
+                    state = getProcessStateDescription(proc.get_state())
+                    self.options.logger.log(self.options.TRACE,
+                               '%s state: %s' % (proc.config.name, state))
 
 
         return delayprocs
         return delayprocs
 
 
+    def ordered_stop_groups_phase_1(self):
+        if self.stop_groups:
+            # stop the last group (the one with the "highest" priority)
+            self.stop_groups[-1].stop_all()
+
+    def ordered_stop_groups_phase_2(self):
+        # after phase 1 we've transitioned and reaped, let's see if we
+        # can remove the group we stopped from the stop_groups queue.
+        if self.stop_groups:
+            # pop the last group (the one with the "highest" priority)
+            group = self.stop_groups.pop()
+            if group.get_unstopped_processes():
+                # if any processes in the group aren't yet in a
+                # stopped state, we're not yet done shutting this
+                # group down, so push it back on to the end of the
+                # stop group queue
+                self.stop_groups.append(group)
+
     def runforever(self, test=False):
     def runforever(self, test=False):
         events.notify(events.SupervisorRunningEvent())
         events.notify(events.SupervisorRunningEvent())
         timeout = 1
         timeout = 1
@@ -145,16 +169,20 @@ class Supervisor:
             combined_map.update(self.get_process_map())
             combined_map.update(self.get_process_map())
 
 
             pgroups = self.process_groups.values()
             pgroups = self.process_groups.values()
+            pgroups.sort()
 
 
             if self.mood > 0:
             if self.mood > 0:
                 [ group.start_necessary() for group in pgroups ]
                 [ group.start_necessary() for group in pgroups ]
 
 
             elif self.mood < 1:
             elif self.mood < 1:
                 if not self.stopping:
                 if not self.stopping:
-                    # first time, do a notification
-                    events.notify(events.SupervisorStoppingEvent())
+                    # first time, set the stopping flag, do a
+                    # notification and set stop_groups
                     self.stopping = True
                     self.stopping = True
-                [ group.stop_all() for group in pgroups ]
+                    self.stop_groups = pgroups[:]
+                    events.notify(events.SupervisorStoppingEvent())
+
+                self.ordered_stop_groups_phase_1()
 
 
                 if not self.get_delay_processes():
                 if not self.get_delay_processes():
                     # if there are no delayed processes (we're done killing
                     # if there are no delayed processes (we're done killing
@@ -202,6 +230,9 @@ class Supervisor:
             self.reap()
             self.reap()
             self.handle_signal()
             self.handle_signal()
 
 
+            if self.mood < 1:
+                self.ordered_stop_groups_phase_2()
+
             if test:
             if test:
                 break
                 break
 
 

+ 4 - 3
src/supervisor/tests/base.py

@@ -690,13 +690,11 @@ class DummyProcessGroup:
         self.all_stopped = False
         self.all_stopped = False
         self.delay_processes = []
         self.delay_processes = []
         self.dispatchers = {}
         self.dispatchers = {}
+        self.unstopped_processes = []
 
 
     def start_necessary(self):
     def start_necessary(self):
         self.necessary_started = True
         self.necessary_started = True
 
 
-    def select(self):
-        return self.select_result
-
     def transition(self):
     def transition(self):
         self.transitioned = True
         self.transitioned = True
 
 
@@ -706,6 +704,9 @@ class DummyProcessGroup:
     def get_delay_processes(self):
     def get_delay_processes(self):
         return self.delay_processes
         return self.delay_processes
 
 
+    def get_unstopped_processes(self):
+        return self.unstopped_processes
+
     def get_dispatchers(self):
     def get_dispatchers(self):
         return self.dispatchers
         return self.dispatchers