Browse Source

Tests for memmon and slight childutils changes.

Chris McDonough 17 years ago
parent
commit
40f1e2199e

+ 23 - 25
src/supervisor/childutils.py

@@ -30,14 +30,6 @@ def getRPCInterface(env):
     # 'serverurl' to figure out what to attach to
     return xmlrpclib.ServerProxy('http://127.0.0.1', getRPCTransport(env))
 
-def write_stderr(msg):
-    sys.stderr.write(msg)
-    sys.stderr.flush()
-
-def write_stdout(msg):
-    sys.stdout.write(msg)
-    sys.stdout.flush()
-
 def get_headers(line):
     return dict([ x.split(':') for x in line.split() ])
 
@@ -54,37 +46,43 @@ def get_asctime():
     return asctime
 
 class ProcessCommunicationsProtocol:
-    def send(self, msg, write=write_stdout):
-        write(ProcessCommunicationEvent.BEGIN_TOKEN)
-        write(msg)
-        write(ProcessCommunicationEvent.END_TOKEN)
+    def send(self, msg, fp=sys.stdout):
+        fp.write(ProcessCommunicationEvent.BEGIN_TOKEN)
+        fp.write(msg)
+        fp.write(ProcessCommunicationEvent.END_TOKEN)
 
     def stdout(self, msg):
-        return self.send(msg, write_stdout)
+        return self.send(msg, sys.stdout)
 
     def stderr(self, msg):
-        return self.send(msg, write_stderr)
+        return self.send(msg, sys.stderr)
 
 pcomm = ProcessCommunicationsProtocol()
 
 class EventListenerProtocol:
-    def wait(self):
+    def wait(self, stdin=sys.stdin):
         self.ready()
-        line = sys.stdin.readline()
+        line = stdin.readline()
         headers = get_headers(line)
-        payload = sys.stdin.read(int(headers['len']))
+        payload = stdin.read(int(headers['len']))
         return headers, payload
-    def ready(self):
-        write_stdout(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN)
-    def ok(self, *ignored):
-        self.send('OK')
-    def fail(self, *ignored):
-        self.send('FAIL')
-    def send(self, data):
+
+    def ready(self, stdout=sys.stdout):
+        stdout.write(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN)
+        stdout.flush()
+
+    def ok(self, stdout=sys.stdout):
+        self.send('OK', stdout)
+
+    def fail(self, stdout=sys.stdout):
+        self.send('FAIL', stdout)
+
+    def send(self, data, stdout=sys.stdout):
         resultlen = len(data)
         result = '%s%s\n%s' % (PEventListenerDispatcher.RESULT_TOKEN_START,
                                str(resultlen),
                                data)
-        write_stdout(result)
+        stdout.write(result)
+        stdout.flush()
 
 listener = EventListenerProtocol()

+ 113 - 80
src/supervisor/memmon.py

@@ -26,8 +26,9 @@
 # command=python memmon.py [options]
 # events=TICK_60
 
-doc = """memmon.py [-p processname=byte_size] | [-g groupname=byte_size] |
-              [-a byte_size] [-s sendmail_program] [-m email_address]
+doc = """\
+memmon.py [-p processname=byte_size]  [-g groupname=byte_size] 
+          [-a byte_size] [-s sendmail] [-m email_address]
 
 Options:
 
@@ -42,9 +43,10 @@ Options:
 -a -- specify a global byte_size.  Restart any child of the supervisord
       under which this runs if it uses more than byte_size RSS.
 
--s -- the sendmail program to use to send email
-      (e.g. /usr/sbin/sendmail).  Must be a full path.  Default is
-      /usr/sbin/sendmail.
+-s -- the sendmail command to use to send email
+      (e.g. "/usr/sbin/sendmail -t -i").  Must be a command which accepts
+      header and message data on stdin and sends mail.
+      Default is "/usr/sbin/sendmail -t -i".
 
 -m -- specify an email address.  The script will send mail to this
       address when any process is restarted.  If no email address is
@@ -59,7 +61,7 @@ and 'GB'.
 
 A sample invocation:
 
-memmon.py -p program1=200MB -p theprog:thegroup=100MB -g thegroup=100MB -a 1GB -s /usr/sbin/sendmail -m chrism@plope.com
+memmon.py -p program1=200MB -p theprog:thegroup=100MB -g thegroup=100MB -a 1GB -s "/usr/sbin/sendmail -t -i" -m chrism@plope.com
 """
 
 import os
@@ -76,86 +78,115 @@ def usage():
 def shell(cmd):
     return os.popen(cmd).read()
 
-def wait(programs, groups, any, sendmail, email):
-    rpc = childutils.getRPCInterface(os.environ)
-
-    while 1:
-        headers, payload = childutils.listener.wait()
-
-        if not headers['eventname'].startswith('TICK'):
-            # do nothing with non-TICK events
-            childutils.listener.ok()
-            continue
 
-        sys.stderr.write(
-            'Checking programs %r, groups %r, any %r' %
-            (programs, groups, any)
-            )
-        
-        infos = rpc.supervisor.getAllProcessInfo()
-
-        for info in infos:
-            pid = info['pid']
-            name = info['name']
-            group = info['group']
-            pname = '%s:%s' % (group, name)
-
-            data = shell('ps -orss -p %s' % pid)
-            dlines = data.split('\n')
-            if len(dlines) < 2:
-                # no data
+class Memmon:
+    def __init__(self, programs, groups, any, sendmail, email, rpc):
+        self.programs = programs
+        self.groups = groups
+        self.any = any
+        self.sendmail = sendmail
+        self.email = email
+        self.rpc = rpc
+        self.stdin = sys.stdin
+        self.stdout = sys.stdout
+        self.stderr = sys.stderr
+        self.pscommand = 'ps -orss= -p %s'
+        self.mailed = False # for unit tests
+
+    def runforever(self, test=False):
+        while 1:
+            # we explicitly use self.stdin, self.stdout, and self.stderr
+            # instead of sys.* so we can unit test this code
+            headers, payload = childutils.listener.wait(self.stdin)
+
+            if not headers['eventname'].startswith('TICK'):
+                # do nothing with non-TICK events
+                childutils.listener.ok(self.stdout)
+                if test:
+                    break
                 continue
 
-            line = dlines[1]
-            try:
-                rss = line.lstrip().rstrip()
-                rss = int(rss) * 1024 # rss is in KB
-            except ValueError:
-                # line doesn't contain any data, or rss cant be intified
-                continue
+            status = []
+            if self.programs:
+                status.append(
+                    'Checking programs %s' % ', '.join(
+                    [ '%s=%s' % x for x in self.programs.items() ] )
+                    )
+
+            if self.groups:
+                status.append(
+                    'Checking groups %s' % ', '.join(
+                    [ '%s=%s' % x for x in self.groups.items() ] )
+                    )
+            if self.any is not None:
+                status.append('Checking any=%s' % self.any)
+
+            self.stderr.write('\n'.join(status) + '\n')
+
+            infos = self.rpc.supervisor.getAllProcessInfo()
+
+            for info in infos:
+                pid = info['pid']
+                name = info['name']
+                group = info['group']
+                pname = '%s:%s' % (group, name)
+
+                data = shell(self.pscommand % pid)
+                if not data:
+                    # no such pid (deal with race conditions)
+                    continue
 
-            sys.stderr.write('RSS of %s is %s\n' % (pname, rss))
+                try:
+                    rss = data.lstrip().rstrip()
+                    rss = int(rss) * 1024 # rss is in KB
+                except ValueError:
+                    # line doesn't contain any data, or rss cant be intified
+                    continue
 
-            for n in name, pname:
-                if n in programs:
-                    if  rss > programs[name]:
-                        restart(rpc, pname, sendmail, email, rss)
+                for n in name, pname:
+                    if n in self.programs:
+                        self.stderr.write('RSS of %s is %s\n' % (pname, rss))
+                        if  rss > self.programs[name]:
+                            self.restart(pname, rss)
+                            continue
+
+                if group in self.groups:
+                    self.stderr.write('RSS of %s is %s\n' % (pname, rss))
+                    if rss > self.groups[group]:
+                        self.restart(pname, rss)
                         continue
 
-            if group in groups:
-                if rss > groups[group]:
-                    restart(rpc, pname, sendmail, email, rss)
-                    continue
+                if self.any is not None:
+                    self.stderr.write('RSS of %s is %s\n' % (pname, rss))
+                    if rss > self.any:
+                        self.restart(pname, rss)
+                        continue
 
-            if any:
-                if rss > any:
-                    restart(rpc, pname, sendmail, email, rss)
-                    continue
-            
-        sys.stderr.flush()
-        childutils.listener.ok()
-
-def restart(rpc, name, sendmail, email, rss):
-    sys.stderr.write('Restarting %s\n' % name)
-    rpc.supervisor.stopProcess(name)
-    rpc.supervisor.startProcess(name)
-
-    if email:
-        msg = (
-            'memmon.py restarted the process named %s at %s because '
-            'it was consuming too much memory (%s bytes RSS)\n' % (
-            name, time.asctime(), rss)
-            )
-        subject = 'memmon: process %s restarted' % name
-        mail(sendmail, subject, email, msg)
-
-def mail(sendmail, subject, to, message):
-    m = os.popen('%s -t -i' % sendmail, 'w')
-    m.write('To: %s\n' % to)
-    m.write('Subject: %s\n' % subject)
-    m.write('\n')
-    m.write(message)
-    m.close()
+            self.stderr.flush()
+            childutils.listener.ok(self.stdout)
+            if test:
+                break
+
+    def restart(self, name, rss):
+        self.stderr.write('Restarting %s\n' % name)
+        self.rpc.supervisor.stopProcess(name)
+        self.rpc.supervisor.startProcess(name)
+
+        if self.email:
+            now = time.asctime()
+            msg = (
+                'memmon.py restarted the process named %s at %s because '
+                'it was consuming too much memory (%s bytes RSS)\n' % (
+                name, now, rss)
+                )
+            subject = 'memmon: process %s restarted' % name
+            m = os.popen(self.sendmail, 'w')
+            m.write('To: %s\n' % self.email)
+            m.write('Subject: %s\n' % subject)
+            m.write('\n')
+            m.write(msg)
+            m.close()
+            self.mailed = True
         
 def parse_namesize(option, value):
     try:
@@ -198,7 +229,7 @@ def main():
     programs = {}
     groups = {}
     any = None
-    sendmail = '/usr/sbin/sendmail'
+    sendmail = '/usr/sbin/sendmail -t -i'
     email = None
 
     for option, value in opts:
@@ -224,7 +255,9 @@ def main():
         if option in ('-m', '--email'):
             email = value
 
-    wait(programs, groups, any, sendmail, email)
+    rpc = childutils.getRPCInterface(os.environ)
+    memmon = Memmon(programs, groups, any, sendmail, email, rpc)
+    memmon.runforever()
 
 if __name__ == '__main__':
     main()

+ 21 - 70
src/supervisor/tests/test_childutils.py

@@ -25,26 +25,6 @@ class ChildUtilsTests(unittest.TestCase):
         self.assertEqual(t.password, 'abc123')
         self.assertEqual(t.serverurl, 'http://localhost:9001')
 
-    def test_write_stderr(self):
-        from supervisor.childutils import write_stderr
-        old = sys.stderr
-        try:
-            sys.stderr = StringIO()
-            write_stderr('hello')
-            self.assertEqual(sys.stderr.getvalue(),'hello')
-        finally:
-            sys.stderr = old
-
-    def test_write_stdout(self):
-        from supervisor.childutils import write_stdout
-        old = sys.stdout
-        try:
-            sys.stdout = StringIO()
-            write_stdout('hello')
-            self.assertEqual(sys.stdout.getvalue(),'hello')
-        finally:
-            sys.stdout = old
-
     def test_get_headers(self):
         from supervisor.childutils import get_headers
         line = 'a:1 b:2'
@@ -61,13 +41,12 @@ class ChildUtilsTests(unittest.TestCase):
 class TestProcessCommunicationsProtocol(unittest.TestCase):
     def test_send(self):
         from supervisor.childutils import pcomm
-        io = StringIO()
-        write = io.write
-        pcomm.send('hello', write)
+        stdout = StringIO()
+        pcomm.send('hello', stdout)
         from supervisor.events import ProcessCommunicationEvent
         begin = ProcessCommunicationEvent.BEGIN_TOKEN
         end = ProcessCommunicationEvent.END_TOKEN
-        self.assertEqual(io.getvalue(), '%s%s%s' % (begin, 'hello', end))
+        self.assertEqual(stdout.getvalue(), '%s%s%s' % (begin, 'hello', end))
 
     def test_stdout(self):
         from supervisor.childutils import pcomm
@@ -105,72 +84,44 @@ class TestEventListenerProtocol(unittest.TestCase):
                 return 'len:5'
             def read(self, *ignored):
                 return 'hello'
-        old_stdin = sys.stdin
-        old_stdout = sys.stdout
-        try:
-            sys.stdin = Dummy()
-            sys.stdout = StringIO()
-            headers, payload = listener.wait()
-            self.assertEqual(headers, {'len':'5'})
-            self.assertEqual(payload, 'hello')
-            self.assertEqual(sys.stdout.getvalue(), token)
-        finally:
-            sys.stdin = old_stdin
-            sys.stdout = old_stdout
+        stdin = Dummy()
+        headers, payload = listener.wait(stdin)
+        self.assertEqual(headers, {'len':'5'})
+        self.assertEqual(payload, 'hello')
 
     def test_token(self):
         from supervisor.childutils import listener
         from supervisor.dispatchers import PEventListenerDispatcher
         token = PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN
-        
-        old = sys.stdout
-        try:
-            sys.stdout = StringIO()
-            listener.ready()
-            self.assertEqual(sys.stdout.getvalue(), token)
-        finally:
-            sys.stdout = old
+        stdout = StringIO()
+        listener.ready(stdout)
+        self.assertEqual(stdout.getvalue(), token)
 
     def test_ok(self):
         from supervisor.childutils import listener
         from supervisor.dispatchers import PEventListenerDispatcher
         begin = PEventListenerDispatcher.RESULT_TOKEN_START
-
-        old = sys.stdout
-        try:
-            sys.stdout = StringIO()
-            listener.ok()
-            self.assertEqual(sys.stdout.getvalue(), begin + '2\nOK')
-        finally:
-            sys.stdout = old
+        stdout = StringIO()
+        listener.ok(stdout)
+        self.assertEqual(stdout.getvalue(), begin + '2\nOK')
 
     def test_fail(self):
         from supervisor.childutils import listener
         from supervisor.dispatchers import PEventListenerDispatcher
         begin = PEventListenerDispatcher.RESULT_TOKEN_START
-
-        old = sys.stdout
-        try:
-            sys.stdout = StringIO()
-            listener.fail()
-            self.assertEqual(sys.stdout.getvalue(), begin + '4\nFAIL')
-        finally:
-            sys.stdout = old
+        stdout = StringIO()
+        listener.fail(stdout)
+        self.assertEqual(stdout.getvalue(), begin + '4\nFAIL')
 
     def test_send(self):
         from supervisor.childutils import listener
         from supervisor.dispatchers import PEventListenerDispatcher
         begin = PEventListenerDispatcher.RESULT_TOKEN_START
-
-        old = sys.stdout
-        try:
-            sys.stdout = StringIO()
-            msg = 'the body data ya fool\n'
-            listener.send(msg)
-            expected = '%s%s\n%s' % (begin, len(msg), msg)
-            self.assertEqual(sys.stdout.getvalue(), expected)
-        finally:
-            sys.stdout = old
+        stdout = StringIO()
+        msg = 'the body data ya fool\n'
+        listener.send(msg, stdout)
+        expected = '%s%s\n%s' % (begin, len(msg), msg)
+        self.assertEqual(stdout.getvalue(), expected)
         
 
 def test_suite():

+ 129 - 0
src/supervisor/tests/test_memmon.py

@@ -0,0 +1,129 @@
+import sys
+import unittest
+from StringIO import StringIO
+
+class MemmonTests(unittest.TestCase):
+    def _getTargetClass(self):
+        from supervisor.memmon import Memmon
+        return Memmon
+
+    def _makeOne(self, *opts):
+        return self._getTargetClass()(*opts)
+
+    def _makeOnePopulated(self, programs, groups, any):
+        from supervisor.tests.base import DummyRPCServer
+        rpc = DummyRPCServer()
+        sendmail = 'echo'
+        email = 'chrism@plope.com'
+        memmon = self._makeOne(programs, groups, any, sendmail, email, rpc)
+        memmon.stdin = StringIO()
+        memmon.stdout = StringIO()
+        memmon.stderr = StringIO()
+        memmon.pscommand = 'echo 22%s'
+        return memmon
+        
+    def test_runforever_notatick(self):
+        programs = {'foo':0, 'bar':0, 'baz_01':0 }
+        groups = {}
+        any = None
+        memmon = self._makeOnePopulated(programs, groups, any)
+        memmon.stdin.write('eventname:NOTATICK len:0\n')
+        memmon.stdin.seek(0)
+        memmon.runforever(test=True)
+        self.assertEqual(memmon.stderr.getvalue(), '')
+
+    def test_runforever_tick_programs(self):
+        programs = {'foo':0, 'bar':0, 'baz_01':0 }
+        groups = {}
+        any = None
+        memmon = self._makeOnePopulated(programs, groups, any)
+        memmon.stdin.write('eventname:TICK len:0\n')
+        memmon.stdin.seek(0)
+        memmon.runforever(test=True)
+        lines = memmon.stderr.getvalue().split('\n')
+        self.assertEqual(len(lines), 8)
+        self.assertEqual(lines[0], 'Checking programs foo=0, bar=0, baz_01=0')
+        self.assertEqual(lines[1], 'RSS of foo:foo is 2264064')
+        self.assertEqual(lines[2], 'Restarting foo:foo')
+        self.assertEqual(lines[3], 'RSS of bar:bar is 2265088')
+        self.assertEqual(lines[4], 'Restarting bar:bar')
+        self.assertEqual(lines[5], 'RSS of baz:baz_01 is 2265088')
+        self.assertEqual(lines[6], 'Restarting baz:baz_01')
+        self.assertEqual(lines[7], '')
+        self.assertEqual(memmon.mailed, True)
+
+    def test_runforever_tick_groups(self):
+        programs = {}
+        groups = {'foo':0}
+        any = None
+        memmon = self._makeOnePopulated(programs, groups, any)
+        memmon.stdin.write('eventname:TICK len:0\n')
+        memmon.stdin.seek(0)
+        memmon.runforever(test=True)
+        lines = memmon.stderr.getvalue().split('\n')
+        self.assertEqual(len(lines), 4)
+        self.assertEqual(lines[0], 'Checking groups foo=0')
+        self.assertEqual(lines[1], 'RSS of foo:foo is 2264064')
+        self.assertEqual(lines[2], 'Restarting foo:foo')
+        self.assertEqual(lines[3], '')
+        self.assertEqual(memmon.mailed, True)
+
+    def test_runforever_tick_any(self):
+        programs = {}
+        groups = {}
+        any = 0
+        memmon = self._makeOnePopulated(programs, groups, any)
+        memmon.stdin.write('eventname:TICK len:0\n')
+        memmon.stdin.seek(0)
+        memmon.runforever(test=True)
+        lines = memmon.stderr.getvalue().split('\n')
+        self.assertEqual(len(lines), 8)
+        self.assertEqual(lines[0], 'Checking any=0')
+        self.assertEqual(lines[1], 'RSS of foo:foo is 2264064')
+        self.assertEqual(lines[2], 'Restarting foo:foo')
+        self.assertEqual(lines[3], 'RSS of bar:bar is 2265088')
+        self.assertEqual(lines[4], 'Restarting bar:bar')
+        self.assertEqual(lines[5], 'RSS of baz:baz_01 is 2265088')
+        self.assertEqual(lines[6], 'Restarting baz:baz_01')
+        self.assertEqual(lines[7], '')
+        self.assertEqual(memmon.mailed, True)
+
+    def test_runforever_tick_programs_and_groups(self):
+        programs = {'baz_01':0}
+        groups = {'foo':0}
+        any = None
+        memmon = self._makeOnePopulated(programs, groups, any)
+        memmon.stdin.write('eventname:TICK len:0\n')
+        memmon.stdin.seek(0)
+        memmon.runforever(test=True)
+        lines = memmon.stderr.getvalue().split('\n')
+        self.assertEqual(len(lines), 7)
+        self.assertEqual(lines[0], 'Checking programs baz_01=0')
+        self.assertEqual(lines[1], 'Checking groups foo=0')
+        self.assertEqual(lines[2], 'RSS of foo:foo is 2264064')
+        self.assertEqual(lines[3], 'Restarting foo:foo')
+        self.assertEqual(lines[4], 'RSS of baz:baz_01 is 2265088')
+        self.assertEqual(lines[5], 'Restarting baz:baz_01')
+        self.assertEqual(lines[6], '')
+        self.assertEqual(memmon.mailed, True)
+
+    def test_runforever_tick_programs_norestart(self):
+        programs = {'foo': sys.maxint}
+        groups = {}
+        any = None
+        memmon = self._makeOnePopulated(programs, groups, any)
+        memmon.stdin.write('eventname:TICK len:0\n')
+        memmon.stdin.seek(0)
+        memmon.runforever(test=True)
+        lines = memmon.stderr.getvalue().split('\n')
+        self.assertEqual(len(lines), 3)
+        self.assertEqual(lines[0], 'Checking programs foo=%s' % sys.maxint)
+        self.assertEqual(lines[1], 'RSS of foo:foo is 2264064')
+        self.assertEqual(lines[2], '')
+        self.assertEqual(memmon.mailed, False)
+
+def test_suite():
+    return unittest.findTestCases(sys.modules[__name__])
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')