123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- import sys
- import unittest
- import errno
- import select
- from supervisor.tests.base import Mock
- from supervisor.poller import SelectPoller, PollPoller, KQueuePoller
- from supervisor.poller import implements_poll, implements_kqueue
- from supervisor.tests.base import DummyOptions
- # this base class is used instead of unittest.TestCase to hide
- # a TestCase subclass from test runner when the implementation is
- # not available
- SkipTestCase = object
- class BasePollerTests(unittest.TestCase):
- def _makeOne(self, options):
- from supervisor.poller import BasePoller
- return BasePoller(options)
- def test_register_readable(self):
- inst = self._makeOne(None)
- self.assertRaises(NotImplementedError, inst.register_readable, None)
- def test_register_writable(self):
- inst = self._makeOne(None)
- self.assertRaises(NotImplementedError, inst.register_writable, None)
- def test_unregister(self):
- inst = self._makeOne(None)
- self.assertRaises(NotImplementedError, inst.unregister, None)
- def test_poll(self):
- inst = self._makeOne(None)
- self.assertRaises(NotImplementedError, inst.poll, None)
- def test_before_daemonize(self):
- inst = self._makeOne(None)
- self.assertEqual(inst.before_daemonize(), None)
- def test_after_daemonize(self):
- inst = self._makeOne(None)
- self.assertEqual(inst.after_daemonize(), None)
- class SelectPollerTests(unittest.TestCase):
- def _makeOne(self, options):
- return SelectPoller(options)
- def test_register_readable(self):
- poller = self._makeOne(DummyOptions())
- poller.register_readable(6)
- poller.register_readable(7)
- self.assertEqual(sorted(poller.readables), [6,7])
- def test_register_writable(self):
- poller = self._makeOne(DummyOptions())
- poller.register_writable(6)
- poller.register_writable(7)
- self.assertEqual(sorted(poller.writables), [6,7])
- def test_unregister(self):
- poller = self._makeOne(DummyOptions())
- poller.register_readable(6)
- poller.register_readable(7)
- poller.register_writable(8)
- poller.register_writable(9)
- poller.unregister(6)
- poller.unregister(9)
- poller.unregister(100) # not registered, ignore error
- self.assertEqual(list(poller.readables), [7])
- self.assertEqual(list(poller.writables), [8])
- def test_poll_returns_readables_and_writables(self):
- _select = DummySelect(result={'readables': [6],
- 'writables': [8]})
- poller = self._makeOne(DummyOptions())
- poller._select = _select
- poller.register_readable(6)
- poller.register_readable(7)
- poller.register_writable(8)
- readables, writables = poller.poll(1)
- self.assertEqual(readables, [6])
- self.assertEqual(writables, [8])
- def test_poll_ignores_eintr(self):
- _select = DummySelect(error=errno.EINTR)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._select = _select
- poller.register_readable(6)
- poller.poll(1)
- self.assertEqual(options.logger.data[0], 'EINTR encountered in poll')
- def test_poll_ignores_ebadf(self):
- _select = DummySelect(error=errno.EBADF)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._select = _select
- poller.register_readable(6)
- poller.poll(1)
- self.assertEqual(options.logger.data[0], 'EBADF encountered in poll')
- self.assertEqual(list(poller.readables), [])
- self.assertEqual(list(poller.writables), [])
- def test_poll_uncaught_exception(self):
- _select = DummySelect(error=errno.EPERM)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._select = _select
- poller.register_readable(6)
- self.assertRaises(select.error, poller.poll, 1)
- if implements_kqueue():
- KQueuePollerTestsBase = unittest.TestCase
- else:
- KQueuePollerTestsBase = SkipTestCase
- class KQueuePollerTests(KQueuePollerTestsBase):
- def _makeOne(self, options):
- return KQueuePoller(options)
- def test_register_readable(self):
- kqueue = DummyKQueue()
- poller = self._makeOne(DummyOptions())
- poller._kqueue = kqueue
- poller.register_readable(6)
- self.assertEqual(list(poller.readables), [6])
- self.assertEqual(len(kqueue.registered_kevents), 1)
- self.assertReadEventAdded(kqueue.registered_kevents[0], 6)
- def test_register_writable(self):
- kqueue = DummyKQueue()
- poller = self._makeOne(DummyOptions())
- poller._kqueue = kqueue
- poller.register_writable(7)
- self.assertEqual(list(poller.writables), [7])
- self.assertEqual(len(kqueue.registered_kevents), 1)
- self.assertWriteEventAdded(kqueue.registered_kevents[0], 7)
- def test_unregister(self):
- kqueue = DummyKQueue()
- poller = self._makeOne(DummyOptions())
- poller._kqueue = kqueue
- poller.register_writable(7)
- poller.register_readable(8)
- poller.unregister(7)
- poller.unregister(100) # not registered, ignore error
- self.assertEqual(list(poller.writables), [])
- self.assertEqual(list(poller.readables), [8])
- self.assertWriteEventAdded(kqueue.registered_kevents[0], 7)
- self.assertReadEventAdded(kqueue.registered_kevents[1], 8)
- self.assertDeletedEvent(kqueue.registered_kevents[2], 7)
- def test_poll_returns_readables_and_writables(self):
- kqueue = DummyKQueue(result=[(6, select.KQ_FILTER_READ),
- (7, select.KQ_FILTER_READ),
- (8, select.KQ_FILTER_WRITE)])
- poller = self._makeOne(DummyOptions())
- poller._kqueue = kqueue
- poller.register_readable(6)
- poller.register_readable(7)
- poller.register_writable(8)
- readables, writables = poller.poll(1000)
- self.assertEqual(readables, [6,7])
- self.assertEqual(writables, [8])
- def test_poll_ignores_eintr(self):
- kqueue = DummyKQueue(raise_errno_poll=errno.EINTR)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._kqueue = kqueue
- poller.register_readable(6)
- poller.poll(1000)
- self.assertEqual(options.logger.data[0], 'EINTR encountered in poll')
- def test_register_readable_and_writable_ignores_ebadf(self):
- _kqueue = DummyKQueue(raise_errno_register=errno.EBADF)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._kqueue = _kqueue
- poller.register_readable(6)
- poller.register_writable(7)
- self.assertEqual(options.logger.data[0],
- 'EBADF encountered in kqueue. Invalid file descriptor 6')
- self.assertEqual(options.logger.data[1],
- 'EBADF encountered in kqueue. Invalid file descriptor 7')
- def test_register_uncaught_exception(self):
- _kqueue = DummyKQueue(raise_errno_register=errno.ENOMEM)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._kqueue = _kqueue
- self.assertRaises(OSError, poller.register_readable, 5)
- def test_poll_uncaught_exception(self):
- kqueue = DummyKQueue(raise_errno_poll=errno.EINVAL)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._kqueue = kqueue
- poller.register_readable(6)
- self.assertRaises(OSError, poller.poll, 1000)
- def test_before_daemonize_closes_kqueue(self):
- mock_kqueue = Mock()
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._kqueue = mock_kqueue
- poller.before_daemonize()
- mock_kqueue.close.assert_called_once_with()
- self.assertEqual(poller._kqueue, None)
- def test_after_daemonize_restores_kqueue(self):
- options = DummyOptions()
- poller = self._makeOne(options)
- poller.readables = [1]
- poller.writables = [3]
- poller.register_readable = Mock()
- poller.register_writable = Mock()
- poller.after_daemonize()
- self.assertTrue(isinstance(poller._kqueue, select.kqueue))
- poller.register_readable.assert_called_with(1)
- poller.register_writable.assert_called_with(3)
- def assertReadEventAdded(self, kevent, fd):
- self.assertKevent(kevent, fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
- def assertWriteEventAdded(self, kevent, fd):
- self.assertKevent(kevent, fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
- def assertDeletedEvent(self, kevent, fd):
- self.assertKevent(kevent, fd, select.KQ_FILTER_READ | select.KQ_FILTER_WRITE,
- select.KQ_EV_DELETE)
- def assertKevent(self, kevent, ident, filter, flags):
- self.assertEqual(kevent.ident, ident)
- self.assertEqual(kevent.filter, filter)
- self.assertEqual(kevent.flags, flags)
- if implements_poll():
- PollerPollTestsBase = unittest.TestCase
- else:
- PollerPollTestsBase = SkipTestCase
- class PollerPollTests(PollerPollTestsBase):
- def _makeOne(self, options):
- return PollPoller(options)
- def test_register_readable(self):
- select_poll = DummySelectPoll()
- poller = self._makeOne(DummyOptions())
- poller._poller = select_poll
- poller.register_readable(6)
- poller.register_readable(7)
- self.assertEqual(select_poll.registered_as_readable, [6,7])
- def test_register_writable(self):
- select_poll = DummySelectPoll()
- poller = self._makeOne(DummyOptions())
- poller._poller = select_poll
- poller.register_writable(6)
- self.assertEqual(select_poll.registered_as_writable, [6])
- def test_poll_returns_readables_and_writables(self):
- select_poll = DummySelectPoll(result=[(6, select.POLLIN),
- (7, select.POLLPRI),
- (8, select.POLLOUT),
- (9, select.POLLHUP)])
- poller = self._makeOne(DummyOptions())
- poller._poller = select_poll
- poller.register_readable(6)
- poller.register_readable(7)
- poller.register_writable(8)
- poller.register_readable(9)
- readables, writables = poller.poll(1000)
- self.assertEqual(readables, [6,7,9])
- self.assertEqual(writables, [8])
- def test_poll_ignores_eintr(self):
- select_poll = DummySelectPoll(error=errno.EINTR)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._poller = select_poll
- poller.register_readable(9)
- poller.poll(1000)
- self.assertEqual(options.logger.data[0], 'EINTR encountered in poll')
- def test_poll_uncaught_exception(self):
- select_poll = DummySelectPoll(error=errno.EBADF)
- options = DummyOptions()
- poller = self._makeOne(options)
- poller._poller = select_poll
- poller.register_readable(9)
- self.assertRaises(select.error, poller.poll, 1000)
- def test_poll_ignores_and_unregisters_closed_fd(self):
- select_poll = DummySelectPoll(result=[(6, select.POLLNVAL),
- (7, select.POLLPRI)])
- poller = self._makeOne(DummyOptions())
- poller._poller = select_poll
- poller.register_readable(6)
- poller.register_readable(7)
- readables, writables = poller.poll(1000)
- self.assertEqual(readables, [7])
- self.assertEqual(select_poll.unregistered, [6])
- class DummySelect(object):
- '''
- Fake implementation of select.select()
- '''
- def __init__(self, result=None, error=None):
- result = result or {}
- self.readables = result.get('readables', [])
- self.writables = result.get('writables', [])
- self.error = error
- def select(self, r, w, x, timeout):
- if self.error:
- raise select.error(self.error)
- return self.readables, self.writables, []
- class DummySelectPoll(object):
- '''
- Fake implementation of select.poll()
- '''
- def __init__(self, result=None, error=None):
- self.result = result or []
- self.error = error
- self.registered_as_readable = []
- self.registered_as_writable = []
- self.unregistered = []
- def register(self, fd, eventmask):
- if eventmask == select.POLLIN | select.POLLPRI | select.POLLHUP:
- self.registered_as_readable.append(fd)
- elif eventmask == select.POLLOUT:
- self.registered_as_writable.append(fd)
- else:
- raise ValueError("Registered a fd on unknown eventmask: '{0}'".format(eventmask))
- def unregister(self, fd):
- self.unregistered.append(fd)
- def poll(self, timeout):
- if self.error:
- raise select.error(self.error)
- return self.result
- class DummyKQueue(object):
- '''
- Fake implementation of select.kqueue()
- '''
- def __init__(self, result=None, raise_errno_poll=None, raise_errno_register=None):
- self.result = result or []
- self.errno_poll = raise_errno_poll
- self.errno_register = raise_errno_register
- self.registered_kevents = []
- self.registered_flags = []
- def control(self, kevents, max_events, timeout=None):
- if kevents is None: # being called on poll()
- self.assert_max_events_on_poll(max_events)
- self.raise_error(self.errno_poll)
- return self.build_result()
- self.assert_max_events_on_register(max_events)
- self.raise_error(self.errno_register)
- self.registered_kevents.extend(kevents)
- def raise_error(self, err):
- if not err: return
- ex = OSError()
- ex.errno = err
- raise ex
- def build_result(self):
- return [FakeKEvent(ident, filter) for ident,filter in self.result]
- def assert_max_events_on_poll(self, max_events):
- assert max_events == KQueuePoller.max_events, (
- "`max_events` parameter of `kqueue.control() should be %d"
- % KQueuePoller.max_events)
- def assert_max_events_on_register(self, max_events):
- assert max_events == 0, (
- "`max_events` parameter of `kqueue.control()` should be 0 on register")
- class FakeKEvent(object):
- def __init__(self, ident, filter):
- self.ident = ident
- self.filter = filter
- def test_suite():
- return unittest.findTestCases(sys.modules[__name__])
- if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
|