12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001 |
- import os
- import time
- import datetime
- import errno
- import types
- from supervisor.compat import as_string
- from supervisor.compat import as_bytes
- from supervisor.compat import unicode
- from supervisor.datatypes import signal_number
- from supervisor.options import readFile
- from supervisor.options import tailFile
- from supervisor.options import NotExecutable
- from supervisor.options import NotFound
- from supervisor.options import NoPermission
- from supervisor.options import make_namespec
- from supervisor.options import split_namespec
- from supervisor.options import VERSION
- from supervisor.events import notify
- from supervisor.events import RemoteCommunicationEvent
- from supervisor.http import NOT_DONE_YET
- from supervisor.xmlrpc import Faults
- from supervisor.xmlrpc import RPCError
- from supervisor.states import SupervisorStates
- from supervisor.states import getSupervisorStateDescription
- from supervisor.states import ProcessStates
- from supervisor.states import getProcessStateDescription
- from supervisor.states import (
- RUNNING_STATES,
- STOPPED_STATES,
- )
- API_VERSION = '3.0'
- class SupervisorNamespaceRPCInterface:
- def __init__(self, supervisord):
- self.supervisord = supervisord
- def _update(self, text):
- self.update_text = text # for unit tests, mainly
- if ( isinstance(self.supervisord.options.mood, int) and
- self.supervisord.options.mood < SupervisorStates.RUNNING ):
- raise RPCError(Faults.SHUTDOWN_STATE)
- # RPC API methods
- def getAPIVersion(self):
- """ Return the version of the RPC API used by supervisord
- @return string version version id
- """
- self._update('getAPIVersion')
- return API_VERSION
- getVersion = getAPIVersion # b/w compatibility with releases before 3.0
- def getSupervisorVersion(self):
- """ Return the version of the supervisor package in use by supervisord
- @return string version version id
- """
- self._update('getSupervisorVersion')
- return VERSION
- def getIdentification(self):
- """ Return identifying string of supervisord
- @return string identifier identifying string
- """
- self._update('getIdentification')
- return self.supervisord.options.identifier
- def getState(self):
- """ Return current state of supervisord as a struct
- @return struct A struct with keys int statecode, string statename
- """
- self._update('getState')
- state = self.supervisord.options.mood
- statename = getSupervisorStateDescription(state)
- data = {
- 'statecode':state,
- 'statename':statename,
- }
- return data
- def getPID(self):
- """ Return the PID of supervisord
- @return int PID
- """
- self._update('getPID')
- return self.supervisord.options.get_pid()
- def readLog(self, offset, length):
- """ Read length bytes from the main log starting at offset
- @param int offset offset to start reading from.
- @param int length number of bytes to read from the log.
- @return string result Bytes of log
- """
- self._update('readLog')
- logfile = self.supervisord.options.logfile
- if logfile is None or not os.path.exists(logfile):
- raise RPCError(Faults.NO_FILE, logfile)
- try:
- return as_string(readFile(logfile, int(offset), int(length)))
- except ValueError as inst:
- why = inst.args[0]
- raise RPCError(getattr(Faults, why))
- readMainLog = readLog # b/w compatibility with releases before 2.1
- def clearLog(self):
- """ Clear the main log.
- @return boolean result always returns True unless error
- """
- self._update('clearLog')
- logfile = self.supervisord.options.logfile
- if logfile is None or not self.supervisord.options.exists(logfile):
- raise RPCError(Faults.NO_FILE)
- # there is a race condition here, but ignore it.
- try:
- self.supervisord.options.remove(logfile)
- except (OSError, IOError):
- raise RPCError(Faults.FAILED)
- for handler in self.supervisord.options.logger.handlers:
- if hasattr(handler, 'reopen'):
- self.supervisord.options.logger.info('reopening log file')
- handler.reopen()
- return True
- def shutdown(self):
- """ Shut down the supervisor process
- @return boolean result always returns True unless error
- """
- self._update('shutdown')
- self.supervisord.options.mood = SupervisorStates.SHUTDOWN
- return True
- def restart(self):
- """ Restart the supervisor process
- @return boolean result always return True unless error
- """
- self._update('restart')
- self.supervisord.options.mood = SupervisorStates.RESTARTING
- return True
- def reloadConfig(self):
- """
- Reload configuration
- @return boolean result always return True unless error
- """
- self._update('reloadConfig')
- try:
- self.supervisord.options.process_config(do_usage=False)
- except ValueError as msg:
- raise RPCError(Faults.CANT_REREAD, msg)
- added, changed, removed = self.supervisord.diff_to_active()
- added = [group.name for group in added]
- changed = [group.name for group in changed]
- removed = [group.name for group in removed]
- return [[added, changed, removed]] # cannot return len > 1, apparently
- def addProcessGroup(self, name):
- """ Update the config for a running process from config file.
- @param string name name of process group to add
- @return boolean result true if successful
- """
- self._update('addProcessGroup')
- for config in self.supervisord.options.process_group_configs:
- if config.name == name:
- result = self.supervisord.add_process_group(config)
- if not result:
- raise RPCError(Faults.ALREADY_ADDED, name)
- return True
- raise RPCError(Faults.BAD_NAME, name)
- def removeProcessGroup(self, name):
- """ Remove a stopped process from the active configuration.
- @param string name name of process group to remove
- @return boolean result Indicates whether the removal was successful
- """
- self._update('removeProcessGroup')
- if name not in self.supervisord.process_groups:
- raise RPCError(Faults.BAD_NAME, name)
- result = self.supervisord.remove_process_group(name)
- if not result:
- raise RPCError(Faults.STILL_RUNNING, name)
- return True
- def _getAllProcesses(self, lexical=False):
- # if lexical is true, return processes sorted in lexical order,
- # otherwise, sort in priority order
- all_processes = []
- if lexical:
- group_names = list(self.supervisord.process_groups.keys())
- group_names.sort()
- for group_name in group_names:
- group = self.supervisord.process_groups[group_name]
- process_names = list(group.processes.keys())
- process_names.sort()
- for process_name in process_names:
- process = group.processes[process_name]
- all_processes.append((group, process))
- else:
- groups = list(self.supervisord.process_groups.values())
- groups.sort() # asc by priority
- for group in groups:
- processes = list(group.processes.values())
- processes.sort() # asc by priority
- for process in processes:
- all_processes.append((group, process))
- return all_processes
- def _getGroupAndProcess(self, name):
- # get process to start from name
- group_name, process_name = split_namespec(name)
- group = self.supervisord.process_groups.get(group_name)
- if group is None:
- raise RPCError(Faults.BAD_NAME, name)
- if process_name is None:
- return group, None
- process = group.processes.get(process_name)
- if process is None:
- raise RPCError(Faults.BAD_NAME, name)
- return group, process
- def startProcess(self, name, wait=True):
- """ Start a process
- @param string name Process name (or ``group:name``, or ``group:*``)
- @param boolean wait Wait for process to be fully started
- @return boolean result Always true unless error
- """
- self._update('startProcess')
- group, process = self._getGroupAndProcess(name)
- if process is None:
- group_name, process_name = split_namespec(name)
- return self.startProcessGroup(group_name, wait)
- # test filespec, don't bother trying to spawn if we know it will
- # eventually fail
- try:
- filename, argv = process.get_execv_args()
- except NotFound as why:
- raise RPCError(Faults.NO_FILE, why.args[0])
- except (NotExecutable, NoPermission) as why:
- raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
- if process.get_state() in RUNNING_STATES:
- raise RPCError(Faults.ALREADY_STARTED, name)
- process.spawn()
- # We call reap() in order to more quickly obtain the side effects of
- # process.finish(), which reap() eventually ends up calling. This
- # might be the case if the spawn() was successful but then the process
- # died before its startsecs elapsed or it exited with an unexpected
- # exit code. In particular, finish() may set spawnerr, which we can
- # check and immediately raise an RPCError, avoiding the need to
- # defer by returning a callback.
- self.supervisord.reap()
- if process.spawnerr:
- raise RPCError(Faults.SPAWN_ERROR, name)
- # We call process.transition() in order to more quickly obtain its
- # side effects. In particular, it might set the process' state from
- # STARTING->RUNNING if the process has a startsecs==0.
- process.transition()
- if wait and process.get_state() != ProcessStates.RUNNING:
- # by default, this branch will almost always be hit for processes
- # with default startsecs configurations, because the default number
- # of startsecs for a process is "1", and the process will not have
- # entered the RUNNING state yet even though we've called
- # transition() on it. This is because a process is not considered
- # RUNNING until it has stayed up > startsecs.
- def onwait():
- if process.spawnerr:
- raise RPCError(Faults.SPAWN_ERROR, name)
- state = process.get_state()
- if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
- raise RPCError(Faults.ABNORMAL_TERMINATION, name)
- if state == ProcessStates.RUNNING:
- return True
- return NOT_DONE_YET
- onwait.delay = 0.05
- onwait.rpcinterface = self
- return onwait # deferred
- return True
- def startProcessGroup(self, name, wait=True):
- """ Start all processes in the group named 'name'
- @param string name The group name
- @param boolean wait Wait for each process to be fully started
- @return array result An array of process status info structs
- """
- self._update('startProcessGroup')
- group = self.supervisord.process_groups.get(name)
- if group is None:
- raise RPCError(Faults.BAD_NAME, name)
- processes = list(group.processes.values())
- processes.sort()
- processes = [ (group, process) for process in processes ]
- startall = make_allfunc(processes, isNotRunning, self.startProcess,
- wait=wait)
- startall.delay = 0.05
- startall.rpcinterface = self
- return startall # deferred
- def startAllProcesses(self, wait=True):
- """ Start all processes listed in the configuration file
- @param boolean wait Wait for each process to be fully started
- @return array result An array of process status info structs
- """
- self._update('startAllProcesses')
- processes = self._getAllProcesses()
- startall = make_allfunc(processes, isNotRunning, self.startProcess,
- wait=wait)
- startall.delay = 0.05
- startall.rpcinterface = self
- return startall # deferred
- def stopProcess(self, name, wait=True):
- """ Stop a process named by name
- @param string name The name of the process to stop (or 'group:name')
- @param boolean wait Wait for the process to be fully stopped
- @return boolean result Always return True unless error
- """
- self._update('stopProcess')
- group, process = self._getGroupAndProcess(name)
- if process is None:
- group_name, process_name = split_namespec(name)
- return self.stopProcessGroup(group_name, wait)
- if process.get_state() not in RUNNING_STATES:
- raise RPCError(Faults.NOT_RUNNING, name)
- msg = process.stop()
- if msg is not None:
- raise RPCError(Faults.FAILED, msg)
- # We'll try to reap any killed child. FWIW, reap calls waitpid, and
- # then, if waitpid returns a pid, calls finish() on the process with
- # that pid, which drains any I/O from the process' dispatchers and
- # changes the process' state. I chose to call reap without once=True
- # because we don't really care if we reap more than one child. Even if
- # we only reap one child. we may not even be reaping the child that we
- # just stopped (this is all async, and process.stop() may not work, and
- # we'll need to wait for SIGKILL during process.transition() as the
- # result of normal select looping).
- self.supervisord.reap()
- if wait and process.get_state() not in STOPPED_STATES:
- def onwait():
- # process will eventually enter a stopped state by
- # virtue of the supervisord.reap() method being called
- # during normal operations
- process.stop_report()
- if process.get_state() not in STOPPED_STATES:
- return NOT_DONE_YET
- return True
- onwait.delay = 0
- onwait.rpcinterface = self
- return onwait # deferred
- return True
- def stopProcessGroup(self, name, wait=True):
- """ Stop all processes in the process group named 'name'
- @param string name The group name
- @param boolean wait Wait for each process to be fully stopped
- @return array result An array of process status info structs
- """
- self._update('stopProcessGroup')
- group = self.supervisord.process_groups.get(name)
- if group is None:
- raise RPCError(Faults.BAD_NAME, name)
- processes = list(group.processes.values())
- processes.sort()
- processes = [ (group, process) for process in processes ]
- killall = make_allfunc(processes, isRunning, self.stopProcess,
- wait=wait)
- killall.delay = 0.05
- killall.rpcinterface = self
- return killall # deferred
- def stopAllProcesses(self, wait=True):
- """ Stop all processes in the process list
- @param boolean wait Wait for each process to be fully stopped
- @return array result An array of process status info structs
- """
- self._update('stopAllProcesses')
- processes = self._getAllProcesses()
- killall = make_allfunc(processes, isRunning, self.stopProcess,
- wait=wait)
- killall.delay = 0.05
- killall.rpcinterface = self
- return killall # deferred
- def signalProcess(self, name, signal):
- """ Send an arbitrary UNIX signal to the process named by name
- @param string name Name of the process to signal (or 'group:name')
- @param string signal Signal to send, as name ('HUP') or number ('1')
- @return boolean
- """
- self._update('signalProcess')
- group, process = self._getGroupAndProcess(name)
- if process is None:
- group_name, process_name = split_namespec(name)
- return self.signalProcessGroup(group_name, signal=signal)
- try:
- sig = signal_number(signal)
- except ValueError:
- raise RPCError(Faults.BAD_SIGNAL, signal)
- if process.get_state() not in RUNNING_STATES:
- raise RPCError(Faults.NOT_RUNNING, name)
- msg = process.signal(sig)
- if not msg is None:
- raise RPCError(Faults.FAILED, msg)
- return True
- def signalProcessGroup(self, name, signal):
- """ Send a signal to all processes in the group named 'name'
- @param string name The group name
- @param string signal Signal to send, as name ('HUP') or number ('1')
- @return array
- """
- group = self.supervisord.process_groups.get(name)
- self._update('signalProcessGroup')
- if group is None:
- raise RPCError(Faults.BAD_NAME, name)
- processes = list(group.processes.values())
- processes.sort()
- processes = [(group, process) for process in processes]
- sendall = make_allfunc(processes, isRunning, self.signalProcess,
- signal=signal)
- result = sendall()
- self._update('signalProcessGroup')
- return result
- def signalAllProcesses(self, signal):
- """ Send a signal to all processes in the process list
- @param string signal Signal to send, as name ('HUP') or number ('1')
- @return array An array of process status info structs
- """
- processes = self._getAllProcesses()
- signalall = make_allfunc(processes, isRunning, self.signalProcess,
- signal=signal)
- result = signalall()
- self._update('signalAllProcesses')
- return result
- def getAllConfigInfo(self):
- """ Get info about all available process configurations. Each struct
- represents a single process (i.e. groups get flattened).
- @return array result An array of process config info structs
- """
- self._update('getAllConfigInfo')
- configinfo = []
- for gconfig in self.supervisord.options.process_group_configs:
- inuse = gconfig.name in self.supervisord.process_groups
- for pconfig in gconfig.process_configs:
- configinfo.append(
- {
- 'autostart': pconfig.autostart,
- 'command': pconfig.command,
- 'exitcodes': pconfig.exitcodes,
- 'group': gconfig.name,
- 'group_prio': gconfig.priority,
- 'inuse': inuse,
- 'killasgroup': pconfig.killasgroup,
- 'name': pconfig.name,
- 'process_prio': pconfig.priority,
- 'redirect_stderr': pconfig.redirect_stderr,
- 'startretries': pconfig.startretries,
- 'startsecs': pconfig.startsecs,
- 'stdout_capture_maxbytes': pconfig.stdout_capture_maxbytes,
- 'stdout_events_enabled': pconfig.stdout_events_enabled,
- 'stdout_logfile': pconfig.stdout_logfile,
- 'stdout_logfile_backups': pconfig.stdout_logfile_backups,
- 'stdout_logfile_maxbytes': pconfig.stdout_logfile_maxbytes,
- 'stdout_syslog': pconfig.stdout_syslog,
- 'stopsignal': pconfig.stopsignal,
- 'stopwaitsecs': pconfig.stopwaitsecs,
- 'stderr_capture_maxbytes': pconfig.stderr_capture_maxbytes,
- 'stderr_events_enabled': pconfig.stderr_events_enabled,
- 'stderr_logfile': pconfig.stderr_logfile,
- 'stderr_logfile_backups': pconfig.stderr_logfile_backups,
- 'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes,
- 'stderr_syslog': pconfig.stderr_syslog,
- }
- )
- configinfo.sort(key=lambda r: r['name'])
- return configinfo
- def _interpretProcessInfo(self, info):
- state = info['state']
- if state == ProcessStates.RUNNING:
- start = info['start']
- now = info['now']
- start_dt = datetime.datetime(*time.gmtime(start)[:6])
- now_dt = datetime.datetime(*time.gmtime(now)[:6])
- uptime = now_dt - start_dt
- if _total_seconds(uptime) < 0: # system time set back
- uptime = datetime.timedelta(0)
- desc = 'pid %s, uptime %s' % (info['pid'], uptime)
- elif state in (ProcessStates.FATAL, ProcessStates.BACKOFF):
- desc = info['spawnerr']
- if not desc:
- desc = 'unknown error (try "tail %s")' % info['name']
- elif state in (ProcessStates.STOPPED, ProcessStates.EXITED):
- if info['start']:
- stop = info['stop']
- stop_dt = datetime.datetime(*time.localtime(stop)[:7])
- desc = stop_dt.strftime('%b %d %I:%M %p')
- else:
- desc = 'Not started'
- else:
- desc = ''
- return desc
- def getProcessInfo(self, name):
- """ Get info about a process named name
- @param string name The name of the process (or 'group:name')
- @return struct result A structure containing data about the process
- """
- self._update('getProcessInfo')
- group, process = self._getGroupAndProcess(name)
- if process is None:
- raise RPCError(Faults.BAD_NAME, name)
- start = int(process.laststart)
- stop = int(process.laststop)
- now = int(time.time())
- state = process.get_state()
- spawnerr = process.spawnerr or ''
- exitstatus = process.exitstatus or 0
- stdout_logfile = process.config.stdout_logfile or ''
- stderr_logfile = process.config.stderr_logfile or ''
- info = {
- 'name':process.config.name,
- 'group':group.config.name,
- 'start':start,
- 'stop':stop,
- 'now':now,
- 'state':state,
- 'statename':getProcessStateDescription(state),
- 'spawnerr':spawnerr,
- 'exitstatus':exitstatus,
- 'logfile':stdout_logfile, # b/c alias
- 'stdout_logfile':stdout_logfile,
- 'stderr_logfile':stderr_logfile,
- 'pid':process.pid,
- }
- description = self._interpretProcessInfo(info)
- info['description'] = description
- return info
- def getAllProcessInfo(self):
- """ Get info about all processes
- @return array result An array of process status results
- """
- self._update('getAllProcessInfo')
- all_processes = self._getAllProcesses(lexical=True)
- output = []
- for group, process in all_processes:
- name = make_namespec(group.config.name, process.config.name)
- output.append(self.getProcessInfo(name))
- return output
- def _readProcessLog(self, name, offset, length, channel):
- group, process = self._getGroupAndProcess(name)
- if process is None:
- raise RPCError(Faults.BAD_NAME, name)
- logfile = getattr(process.config, '%s_logfile' % channel)
- if logfile is None or not os.path.exists(logfile):
- raise RPCError(Faults.NO_FILE, logfile)
- try:
- return as_string(readFile(logfile, int(offset), int(length)))
- except ValueError as inst:
- why = inst.args[0]
- raise RPCError(getattr(Faults, why))
- def readProcessStdoutLog(self, name, offset, length):
- """ Read length bytes from name's stdout log starting at offset
- @param string name the name of the process (or 'group:name')
- @param int offset offset to start reading from.
- @param int length number of bytes to read from the log.
- @return string result Bytes of log
- """
- self._update('readProcessStdoutLog')
- return self._readProcessLog(name, offset, length, 'stdout')
- readProcessLog = readProcessStdoutLog # b/c alias
- def readProcessStderrLog(self, name, offset, length):
- """ Read length bytes from name's stderr log starting at offset
- @param string name the name of the process (or 'group:name')
- @param int offset offset to start reading from.
- @param int length number of bytes to read from the log.
- @return string result Bytes of log
- """
- self._update('readProcessStderrLog')
- return self._readProcessLog(name, offset, length, 'stderr')
- def _tailProcessLog(self, name, offset, length, channel):
- group, process = self._getGroupAndProcess(name)
- if process is None:
- raise RPCError(Faults.BAD_NAME, name)
- logfile = getattr(process.config, '%s_logfile' % channel)
- if logfile is None or not os.path.exists(logfile):
- return ['', 0, False]
- return tailFile(logfile, int(offset), int(length))
- def tailProcessStdoutLog(self, name, offset, length):
- """
- Provides a more efficient way to tail the (stdout) log than
- readProcessStdoutLog(). Use readProcessStdoutLog() to read
- chunks and tailProcessStdoutLog() to tail.
- Requests (length) bytes from the (name)'s log, starting at
- (offset). If the total log size is greater than (offset +
- length), the overflow flag is set and the (offset) is
- automatically increased to position the buffer at the end of
- the log. If less than (length) bytes are available, the
- maximum number of available bytes will be returned. (offset)
- returned is always the last offset in the log +1.
- @param string name the name of the process (or 'group:name')
- @param int offset offset to start reading from
- @param int length maximum number of bytes to return
- @return array result [string bytes, int offset, bool overflow]
- """
- self._update('tailProcessStdoutLog')
- return self._tailProcessLog(name, offset, length, 'stdout')
- tailProcessLog = tailProcessStdoutLog # b/c alias
- def tailProcessStderrLog(self, name, offset, length):
- """
- Provides a more efficient way to tail the (stderr) log than
- readProcessStderrLog(). Use readProcessStderrLog() to read
- chunks and tailProcessStderrLog() to tail.
- Requests (length) bytes from the (name)'s log, starting at
- (offset). If the total log size is greater than (offset +
- length), the overflow flag is set and the (offset) is
- automatically increased to position the buffer at the end of
- the log. If less than (length) bytes are available, the
- maximum number of available bytes will be returned. (offset)
- returned is always the last offset in the log +1.
- @param string name the name of the process (or 'group:name')
- @param int offset offset to start reading from
- @param int length maximum number of bytes to return
- @return array result [string bytes, int offset, bool overflow]
- """
- self._update('tailProcessStderrLog')
- return self._tailProcessLog(name, offset, length, 'stderr')
- def clearProcessLogs(self, name):
- """ Clear the stdout and stderr logs for the named process and
- reopen them.
- @param string name The name of the process (or 'group:name')
- @return boolean result Always True unless error
- """
- self._update('clearProcessLogs')
- group, process = self._getGroupAndProcess(name)
- if process is None:
- raise RPCError(Faults.BAD_NAME, name)
- try:
- # implies a reopen
- process.removelogs()
- except (IOError, OSError):
- raise RPCError(Faults.FAILED, name)
- return True
- clearProcessLog = clearProcessLogs # b/c alias
- def clearAllProcessLogs(self):
- """ Clear all process log files
- @return array result An array of process status info structs
- """
- self._update('clearAllProcessLogs')
- results = []
- callbacks = []
- all_processes = self._getAllProcesses()
- for group, process in all_processes:
- callbacks.append((group, process, self.clearProcessLog))
- def clearall():
- if not callbacks:
- return results
- group, process, callback = callbacks.pop(0)
- name = make_namespec(group.config.name, process.config.name)
- try:
- callback(name)
- except RPCError as e:
- results.append(
- {'name':process.config.name,
- 'group':group.config.name,
- 'status':e.code,
- 'description':e.text})
- else:
- results.append(
- {'name':process.config.name,
- 'group':group.config.name,
- 'status':Faults.SUCCESS,
- 'description':'OK'}
- )
- if callbacks:
- return NOT_DONE_YET
- return results
- clearall.delay = 0.05
- clearall.rpcinterface = self
- return clearall # deferred
- def sendProcessStdin(self, name, chars):
- """ Send a string of chars to the stdin of the process name.
- If non-7-bit data is sent (unicode), it is encoded to utf-8
- before being sent to the process' stdin. If chars is not a
- string or is not unicode, raise INCORRECT_PARAMETERS. If the
- process is not running, raise NOT_RUNNING. If the process'
- stdin cannot accept input (e.g. it was closed by the child
- process), raise NO_FILE.
- @param string name The process name to send to (or 'group:name')
- @param string chars The character data to send to the process
- @return boolean result Always return True unless error
- """
- self._update('sendProcessStdin')
- if not isinstance(chars, (str, bytes, unicode)):
- raise RPCError(Faults.INCORRECT_PARAMETERS, chars)
- chars = as_bytes(chars)
- group, process = self._getGroupAndProcess(name)
- if process is None:
- raise RPCError(Faults.BAD_NAME, name)
- if not process.pid or process.killing:
- raise RPCError(Faults.NOT_RUNNING, name)
- try:
- process.write(chars)
- except OSError as why:
- if why.args[0] == errno.EPIPE:
- raise RPCError(Faults.NO_FILE, name)
- else:
- raise
- return True
- def sendRemoteCommEvent(self, type, data):
- """ Send an event that will be received by event listener
- subprocesses subscribing to the RemoteCommunicationEvent.
- @param string type String for the "type" key in the event header
- @param string data Data for the event body
- @return boolean Always return True unless error
- """
- if isinstance(type, unicode):
- type = type.encode('utf-8')
- if isinstance(data, unicode):
- data = data.encode('utf-8')
- notify(
- RemoteCommunicationEvent(type, data)
- )
- return True
- def _total_seconds(timedelta):
- return ((timedelta.days * 86400 + timedelta.seconds) * 10**6 +
- timedelta.microseconds) / 10**6
- def make_allfunc(processes, predicate, func, **extra_kwargs):
- """ Return a closure representing a function that calls a
- function for every process, and returns a result """
- callbacks = []
- results = []
- def allfunc(
- processes=processes,
- predicate=predicate,
- func=func,
- extra_kwargs=extra_kwargs,
- callbacks=callbacks, # used only to fool scoping, never passed by caller
- results=results, # used only to fool scoping, never passed by caller
- ):
- if not callbacks:
- for group, process in processes:
- name = make_namespec(group.config.name, process.config.name)
- if predicate(process):
- try:
- callback = func(name, **extra_kwargs)
- except RPCError as e:
- results.append({'name':process.config.name,
- 'group':group.config.name,
- 'status':e.code,
- 'description':e.text})
- continue
- if isinstance(callback, types.FunctionType):
- callbacks.append((group, process, callback))
- else:
- results.append(
- {'name':process.config.name,
- 'group':group.config.name,
- 'status':Faults.SUCCESS,
- 'description':'OK'}
- )
- if not callbacks:
- return results
- for struct in callbacks[:]:
- group, process, cb = struct
- try:
- value = cb()
- except RPCError as e:
- results.append(
- {'name':process.config.name,
- 'group':group.config.name,
- 'status':e.code,
- 'description':e.text})
- callbacks.remove(struct)
- else:
- if value is not NOT_DONE_YET:
- results.append(
- {'name':process.config.name,
- 'group':group.config.name,
- 'status':Faults.SUCCESS,
- 'description':'OK'}
- )
- callbacks.remove(struct)
- if callbacks:
- return NOT_DONE_YET
- return results
- # XXX the above implementation has a weakness inasmuch as the
- # first call into each individual process callback will always
- # return NOT_DONE_YET, so they need to be called twice. The
- # symptom of this is that calling this method causes the
- # client to block for much longer than it actually requires to
- # kill all of the running processes. After the first call to
- # the killit callback, the process is actually dead, but the
- # above killall method processes the callbacks one at a time
- # during the select loop, which, because there is no output
- # from child processes after e.g. stopAllProcesses is called,
- # is not busy, so hits the timeout for each callback. I
- # attempted to make this better, but the only way to make it
- # better assumes totally synchronous reaping of child
- # processes, which requires infrastructure changes to
- # supervisord that are scary at the moment as it could take a
- # while to pin down all of the platform differences and might
- # require a C extension to the Python signal module to allow
- # the setting of ignore flags to signals.
- return allfunc
- def isRunning(process):
- if process.get_state() in RUNNING_STATES:
- return True
- def isNotRunning(process):
- return not isRunning(process)
- # this is not used in code but referenced via an entry point in the conf file
- def make_main_rpcinterface(supervisord):
- return SupervisorNamespaceRPCInterface(supervisord)
|