123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 |
- # -*- coding: utf-8 -*-
- import errno
- import sys
- import unittest
- import tempfile
- import shutil
- import os
- import syslog
- from supervisor.compat import mock
- from supervisor.compat import PY3
- from supervisor.compat import as_string
- from supervisor.tests.base import DummyStream
- from supervisor import read_file
- class LevelTests(unittest.TestCase):
- def test_LOG_LEVELS_BY_NUM_doesnt_include_builtins(self):
- from supervisor import loggers
- for level_name in loggers.LOG_LEVELS_BY_NUM.values():
- self.assertFalse(level_name.startswith('_'))
- class HandlerTests:
- def setUp(self):
- self.basedir = tempfile.mkdtemp()
- self.filename = os.path.join(self.basedir, 'thelog')
- def tearDown(self):
- try:
- shutil.rmtree(self.basedir)
- except OSError:
- pass
- def _makeOne(self, *arg, **kw):
- klass = self._getTargetClass()
- return klass(*arg, **kw)
- def _makeLogRecord(self, msg):
- from supervisor import loggers
- record = loggers.LogRecord(
- level=loggers.LevelsByName.INFO,
- msg=msg,
- exc_info=None
- )
- return record
- class BareHandlerTests(HandlerTests, unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.loggers import Handler
- return Handler
- def test_flush_stream_flush_raises_IOError_EPIPE(self):
- stream = DummyStream(error=IOError(errno.EPIPE))
- inst = self._makeOne(stream=stream)
- self.assertEqual(inst.flush(), None) # does not raise
- def test_flush_stream_flush_raises_IOError_not_EPIPE(self):
- stream = DummyStream(error=IOError(errno.EALREADY))
- inst = self._makeOne(stream=stream)
- self.assertRaises(IOError, inst.flush) # non-EPIPE IOError raises
- def test_close_already_closed(self):
- stream = DummyStream()
- inst = self._makeOne(stream=stream)
- inst.closed = True
- self.assertEqual(inst.close(), None)
-
- def test_close_stream_fileno_above_3(self):
- stream = DummyStream(fileno=50)
- inst = self._makeOne(stream=stream)
- self.assertEqual(inst.close(), None)
- self.assertTrue(inst.closed)
- self.assertTrue(inst.stream.closed)
- def test_close_stream_fileno_below_3(self):
- stream = DummyStream(fileno=0)
- inst = self._makeOne(stream=stream)
- self.assertEqual(inst.close(), None)
- self.assertFalse(inst.closed)
- self.assertFalse(inst.stream.closed)
- def test_emit_gardenpath(self):
- stream = DummyStream()
- inst = self._makeOne(stream=stream)
- record = self._makeLogRecord('foo')
- inst.emit(record)
- self.assertEqual(stream.flushed, True)
- self.assertEqual(stream.written, 'foo')
- def test_emit_unicode_error(self):
- stream = DummyStream(error=UnicodeError)
- inst = self._makeOne(stream=stream)
- record = self._makeLogRecord('foo')
- inst.emit(record)
- self.assertEqual(stream.flushed, True)
- self.assertEqual(stream.written, 'foo')
- def test_emit_other_error(self):
- stream = DummyStream(error=TypeError)
- inst = self._makeOne(stream=stream)
- handled = []
- inst.handleError = lambda: handled.append(True)
- record = self._makeLogRecord('foo')
- inst.emit(record)
- self.assertEqual(stream.flushed, False)
- self.assertEqual(stream.written, '')
-
- class FileHandlerTests(HandlerTests, unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.loggers import FileHandler
- return FileHandler
- def test_ctor(self):
- handler = self._makeOne(self.filename)
- self.assertTrue(os.path.exists(self.filename), self.filename)
- self.assertEqual(handler.mode, 'a')
- self.assertEqual(handler.baseFilename, self.filename)
- self.assertEqual(handler.stream.name, self.filename)
- handler.close()
- def test_close(self):
- handler = self._makeOne(self.filename)
- handler.stream.close()
- handler.stream = DummyStream()
- handler.close()
- self.assertEqual(handler.stream.closed, True)
- def test_close_raises(self):
- handler = self._makeOne(self.filename)
- handler.stream.close()
- handler.stream = DummyStream(OSError)
- self.assertRaises(OSError, handler.close)
- self.assertEqual(handler.stream.closed, False)
- def test_reopen(self):
- handler = self._makeOne(self.filename)
- handler.stream.close()
- stream = DummyStream()
- handler.stream = stream
- handler.reopen()
- self.assertEqual(stream.closed, True)
- self.assertEqual(handler.stream.name, self.filename)
- handler.close()
- def test_reopen_raises(self):
- handler = self._makeOne(self.filename)
- handler.stream.close()
- stream = DummyStream()
- handler.stream = stream
- handler.baseFilename = os.path.join(self.basedir, 'notthere', 'a.log')
- self.assertRaises(IOError, handler.reopen)
- self.assertEqual(stream.closed, True)
- def test_remove_exists(self):
- handler = self._makeOne(self.filename)
- self.assertTrue(os.path.exists(self.filename), self.filename)
- handler.remove()
- self.assertFalse(os.path.exists(self.filename), self.filename)
- def test_remove_doesntexist(self):
- handler = self._makeOne(self.filename)
- os.remove(self.filename)
- self.assertFalse(os.path.exists(self.filename), self.filename)
- handler.remove() # should not raise
- self.assertFalse(os.path.exists(self.filename), self.filename)
- def test_remove_raises(self):
- handler = self._makeOne(self.filename)
- os.remove(self.filename)
- os.mkdir(self.filename)
- self.assertTrue(os.path.exists(self.filename), self.filename)
- self.assertRaises(OSError, handler.remove)
- def test_emit_ascii_noerror(self):
- handler = self._makeOne(self.filename)
- record = self._makeLogRecord('hello!')
- handler.emit(record)
- handler.close()
- self.assertEqual(read_file(self.filename,'r'), 'hello!')
- # def test_emit_unicode_noerror(self):
- # handler = self._makeOne(self.filename)
- # record = self._makeLogRecord(u'fi\xed')
- # handler.emit(record)
- # content = open(self.filename, 'r').read()
- # self.assertEqual(content, 'fi\xc3\xad')
- def test_emit_error(self):
- handler = self._makeOne(self.filename)
- handler.stream.close()
- handler.stream = DummyStream(error=OSError)
- record = self._makeLogRecord('hello!')
- try:
- old_stderr = sys.stderr
- dummy_stderr = DummyStream()
- sys.stderr = dummy_stderr
- handler.emit(record)
- finally:
- sys.stderr = old_stderr
- self.assertTrue(dummy_stderr.written.endswith('OSError\n'),
- dummy_stderr.written)
- class RotatingFileHandlerTests(FileHandlerTests):
- def _getTargetClass(self):
- from supervisor.loggers import RotatingFileHandler
- return RotatingFileHandler
- def test_ctor(self):
- handler = self._makeOne(self.filename)
- self.assertEqual(handler.mode, 'a')
- self.assertEqual(handler.maxBytes, 512*1024*1024)
- self.assertEqual(handler.backupCount, 10)
- def test_emit_does_rollover(self):
- handler = self._makeOne(self.filename, maxBytes=10, backupCount=2)
- record = self._makeLogRecord('a' * 4)
- handler.emit(record) # 4 bytes
- self.assertFalse(os.path.exists(self.filename + '.1'))
- self.assertFalse(os.path.exists(self.filename + '.2'))
- handler.emit(record) # 8 bytes
- self.assertFalse(os.path.exists(self.filename + '.1'))
- self.assertFalse(os.path.exists(self.filename + '.2'))
- handler.emit(record) # 12 bytes, do rollover
- self.assertTrue(os.path.exists(self.filename + '.1'))
- self.assertFalse(os.path.exists(self.filename + '.2'))
- handler.emit(record) # 16 bytes
- self.assertTrue(os.path.exists(self.filename + '.1'))
- self.assertFalse(os.path.exists(self.filename + '.2'))
- handler.emit(record) # 20 bytes
- self.assertTrue(os.path.exists(self.filename + '.1'))
- self.assertFalse(os.path.exists(self.filename + '.2'))
- handler.emit(record) # 24 bytes, do rollover
- self.assertTrue(os.path.exists(self.filename + '.1'))
- self.assertTrue(os.path.exists(self.filename + '.2'))
- handler.emit(record) # 28 bytes
- self.assertTrue(os.path.exists(self.filename + '.1'))
- self.assertTrue(os.path.exists(self.filename + '.2'))
- self.assertEqual(read_file(self.filename), 'a' * 4)
- one = read_file(self.filename+'.1')
- self.assertEqual(one, 'a'*12)
- two = read_file(self.filename+'.2')
- self.assertEqual(two, 'a'*12)
- def test_current_logfile_removed(self):
- handler = self._makeOne(self.filename, maxBytes=6, backupCount=1)
- record = self._makeLogRecord('a' * 4)
- handler.emit(record) # 4 bytes
- self.assertTrue(os.path.exists(self.filename))
- self.assertFalse(os.path.exists(self.filename + '.1'))
- # Someone removes the active log file! :-(
- os.unlink(self.filename)
- self.assertFalse(os.path.exists(self.filename))
- handler.emit(record) # 8 bytes, do rollover
- self.assertTrue(os.path.exists(self.filename))
- self.assertFalse(os.path.exists(self.filename + '.1'))
- def test_removeAndRename_destination_does_not_exist(self):
- inst = self._makeOne(self.filename)
- renames = []
- removes = []
- inst._remove = lambda v: removes.append(v)
- inst._exists = lambda v: False
- inst._rename = lambda s, t: renames.append((s, t))
- inst.removeAndRename('foo', 'bar')
- self.assertEqual(renames, [('foo', 'bar')])
- self.assertEqual(removes, [])
- def test_removeAndRename_destination_exists(self):
- inst = self._makeOne(self.filename)
- renames = []
- removes = []
- inst._remove = lambda v: removes.append(v)
- inst._exists = lambda v: True
- inst._rename = lambda s, t: renames.append((s, t))
- inst.removeAndRename('foo', 'bar')
- self.assertEqual(renames, [('foo', 'bar')])
- self.assertEqual(removes, ['bar'])
- def test_removeAndRename_remove_raises_ENOENT(self):
- def remove(fn):
- raise OSError(errno.ENOENT)
- inst = self._makeOne(self.filename)
- renames = []
- inst._remove = remove
- inst._exists = lambda v: True
- inst._rename = lambda s, t: renames.append((s, t))
- inst.removeAndRename('foo', 'bar')
- self.assertEqual(renames, [('foo', 'bar')])
- def test_removeAndRename_remove_raises_other_than_ENOENT(self):
- def remove(fn):
- raise OSError(errno.EAGAIN)
- inst = self._makeOne(self.filename)
- inst._remove = remove
- inst._exists = lambda v: True
- self.assertRaises(OSError, inst.removeAndRename, 'foo', 'bar')
- def test_removeAndRename_rename_raises_ENOENT(self):
- def rename(s, d):
- raise OSError(errno.ENOENT)
- inst = self._makeOne(self.filename)
- inst._rename = rename
- inst._exists = lambda v: False
- self.assertEqual(inst.removeAndRename('foo', 'bar'), None)
- def test_removeAndRename_rename_raises_other_than_ENOENT(self):
- def rename(s, d):
- raise OSError(errno.EAGAIN)
- inst = self._makeOne(self.filename)
- inst._rename = rename
- inst._exists = lambda v: False
- self.assertRaises(OSError, inst.removeAndRename, 'foo', 'bar')
- def test_doRollover_maxbytes_lte_zero(self):
- inst = self._makeOne(self.filename)
- inst.maxBytes = 0
- self.assertEqual(inst.doRollover(), None)
-
-
- class BoundIOTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.loggers import BoundIO
- return BoundIO
- def _makeOne(self, maxbytes, buf=''):
- klass = self._getTargetClass()
- return klass(maxbytes, buf)
- def test_write_overflow(self):
- io = self._makeOne(1, 'a')
- io.write('b')
- self.assertEqual(io.buf, 'b')
- def test_getvalue(self):
- io = self._makeOne(1, 'a')
- self.assertEqual(io.getvalue(), 'a')
- def test_clear(self):
- io = self._makeOne(1, 'a')
- io.clear()
- self.assertEqual(io.buf, '')
- def test_close(self):
- io = self._makeOne(1, 'a')
- io.close()
- self.assertEqual(io.buf, '')
- class LoggerTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.loggers import Logger
- return Logger
- def _makeOne(self, level=None, handlers=None):
- klass = self._getTargetClass()
- return klass(level, handlers)
- def test_blather(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.BLAT)
- logger = self._makeOne(LevelsByName.BLAT, (handler,))
- logger.blather('hello')
- self.assertEqual(len(handler.records), 1)
- logger.level = LevelsByName.TRAC
- logger.blather('hello')
- self.assertEqual(len(handler.records), 1)
- def test_trace(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.TRAC)
- logger = self._makeOne(LevelsByName.TRAC, (handler,))
- logger.trace('hello')
- self.assertEqual(len(handler.records), 1)
- logger.level = LevelsByName.DEBG
- logger.trace('hello')
- self.assertEqual(len(handler.records), 1)
- def test_debug(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.DEBG)
- logger = self._makeOne(LevelsByName.DEBG, (handler,))
- logger.debug('hello')
- self.assertEqual(len(handler.records), 1)
- logger.level = LevelsByName.INFO
- logger.debug('hello')
- self.assertEqual(len(handler.records), 1)
- def test_info(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.INFO)
- logger = self._makeOne(LevelsByName.INFO, (handler,))
- logger.info('hello')
- self.assertEqual(len(handler.records), 1)
- logger.level = LevelsByName.WARN
- logger.info('hello')
- self.assertEqual(len(handler.records), 1)
- def test_warn(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.WARN)
- logger = self._makeOne(LevelsByName.WARN, (handler,))
- logger.warn('hello')
- self.assertEqual(len(handler.records), 1)
- logger.level = LevelsByName.ERRO
- logger.warn('hello')
- self.assertEqual(len(handler.records), 1)
- def test_error(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.ERRO)
- logger = self._makeOne(LevelsByName.ERRO, (handler,))
- logger.error('hello')
- self.assertEqual(len(handler.records), 1)
- logger.level = LevelsByName.CRIT
- logger.error('hello')
- self.assertEqual(len(handler.records), 1)
- def test_critical(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.CRIT)
- logger = self._makeOne(LevelsByName.CRIT, (handler,))
- logger.critical('hello')
- self.assertEqual(len(handler.records), 1)
- def test_close(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.CRIT)
- logger = self._makeOne(LevelsByName.CRIT, (handler,))
- logger.close()
- self.assertEqual(handler.closed, True)
- def test_getvalue(self):
- from supervisor.loggers import LevelsByName
- handler = DummyHandler(LevelsByName.CRIT)
- logger = self._makeOne(LevelsByName.CRIT, (handler,))
- self.assertRaises(NotImplementedError, logger.getvalue)
-
- class MockSysLog(mock.Mock):
- def __call__(self, *args, **kwargs):
- message = args[-1]
- if sys.version_info < (3, 0) and isinstance(message, unicode):
- # Python 2.x raises a UnicodeEncodeError when attempting to
- # transmit unicode characters that don't encode in the
- # default encoding.
- message.encode()
- super(MockSysLog, self).__call__(*args, **kwargs)
- class SyslogHandlerTests(HandlerTests, unittest.TestCase):
- def setUp(self):
- pass
- def tearDown(self):
- pass
- def _getTargetClass(self):
- return __import__('supervisor.loggers').loggers.SyslogHandler
- def _makeOne(self):
- return self._getTargetClass()()
- def test_emit_record_asdict_raises(self):
- class Record(object):
- def asdict(self):
- raise TypeError
- record = Record()
- handler = self._makeOne()
- handled = []
- handler.handleError = lambda: handled.append(True)
- handler.emit(record)
- self.assertEqual(handled, [True])
-
- @mock.patch('syslog.syslog', MockSysLog())
- def test_emit_ascii_noerror(self):
- handler = self._makeOne()
- record = self._makeLogRecord('hello!')
- handler.emit(record)
- syslog.syslog.assert_called_with('hello!')
- @mock.patch('syslog.syslog', MockSysLog())
- def test_close(self):
- handler = self._makeOne()
- handler.close() # no-op for syslog
- @mock.patch('syslog.syslog', MockSysLog())
- def test_reopen(self):
- handler = self._makeOne()
- handler.reopen() # no-op for syslog
- if PY3:
- @mock.patch('syslog.syslog', MockSysLog())
- def test_emit_unicode_noerror(self):
- handler = self._makeOne()
- record = self._makeLogRecord('fií')
- handler.emit(record)
- syslog.syslog.assert_called_with('fií')
- def test_emit_unicode_witherror(self):
- handler = self._makeOne()
- called = []
- def fake_syslog(msg):
- if not called:
- called.append(msg)
- raise UnicodeError
- handler._syslog = fake_syslog
- record = self._makeLogRecord('fií')
- handler.emit(record)
- self.assertEqual(called, ['fií'])
- else:
- @mock.patch('syslog.syslog', MockSysLog())
- def test_emit_unicode_noerror(self):
- handler = self._makeOne()
- inp = as_string('fií')
- record = self._makeLogRecord(inp)
- handler.emit(record)
- syslog.syslog.assert_called_with('fi\xc3\xad')
- def test_emit_unicode_witherror(self):
- handler = self._makeOne()
- called = []
- def fake_syslog(msg):
- if not called:
- called.append(msg)
- raise UnicodeError
- handler._syslog = fake_syslog
- record = self._makeLogRecord(as_string('fií'))
- handler.emit(record)
- self.assertEqual(called, [as_string('fi\xc3\xad')])
- class DummyHandler:
- close = False
- def __init__(self, level):
- self.level = level
- self.records = []
- def emit(self, record):
- self.records.append(record)
- def close(self):
- self.closed = True
- def test_suite():
- return unittest.findTestCases(sys.modules[__name__])
- if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
|