poller.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import select
  2. import errno
  3. class BasePoller:
  4. def __init__(self, options):
  5. self.options = options
  6. self.initialize()
  7. def initialize(self):
  8. pass
  9. def register_readable(self, fd):
  10. raise NotImplementedError
  11. def register_writable(self, fd):
  12. raise NotImplementedError
  13. def unregister(self, fd):
  14. raise NotImplementedError
  15. def poll(self, timeout):
  16. raise NotImplementedError
  17. def before_daemonize(self):
  18. pass
  19. def after_daemonize(self):
  20. pass
  21. class SelectPoller(BasePoller):
  22. def initialize(self):
  23. self._select = select
  24. self._init_fdsets()
  25. def register_readable(self, fd):
  26. self.readables.add(fd)
  27. def register_writable(self, fd):
  28. self.writables.add(fd)
  29. def unregister(self, fd):
  30. if fd in self.readables:
  31. self.readables.remove(fd)
  32. if fd in self.writables:
  33. self.writables.remove(fd)
  34. def unregister_all(self):
  35. self._init_fdsets()
  36. def poll(self, timeout):
  37. try:
  38. r, w, x = self._select.select(
  39. self.readables,
  40. self.writables,
  41. [], timeout
  42. )
  43. except select.error as err:
  44. if err.args[0] == errno.EINTR:
  45. self.options.logger.blather('EINTR encountered in poll')
  46. return [], []
  47. if err.args[0] == errno.EBADF:
  48. self.options.logger.blather('EBADF encountered in poll')
  49. self.unregister_all()
  50. return [], []
  51. raise
  52. return r, w
  53. def _init_fdsets(self):
  54. self.readables = set()
  55. self.writables = set()
  56. class PollPoller(BasePoller):
  57. def initialize(self):
  58. self._poller = select.poll()
  59. self.READ = select.POLLIN | select.POLLPRI | select.POLLHUP
  60. self.WRITE = select.POLLOUT
  61. def register_readable(self, fd):
  62. self._poller.register(fd, self.READ)
  63. def register_writable(self, fd):
  64. self._poller.register(fd, self.WRITE)
  65. def unregister(self, fd):
  66. self._poller.unregister(fd)
  67. def poll(self, timeout):
  68. fds = self._poll_fds(timeout)
  69. readables, writables = [], []
  70. for fd, eventmask in fds:
  71. if self._ignore_invalid(fd, eventmask):
  72. continue
  73. if eventmask & self.READ:
  74. readables.append(fd)
  75. if eventmask & self.WRITE:
  76. writables.append(fd)
  77. return readables, writables
  78. def _poll_fds(self, timeout):
  79. try:
  80. return self._poller.poll(timeout * 1000)
  81. except select.error as err:
  82. if err.args[0] == errno.EINTR:
  83. self.options.logger.blather('EINTR encountered in poll')
  84. return []
  85. raise
  86. def _ignore_invalid(self, fd, eventmask):
  87. if eventmask & select.POLLNVAL:
  88. # POLLNVAL means `fd` value is invalid, not open.
  89. # When a process quits it's `fd`s are closed so there
  90. # is no more reason to keep this `fd` registered
  91. # If the process restarts it's `fd`s are registered again
  92. self.unregister(fd)
  93. return True
  94. return False
  95. class KQueuePoller(BasePoller):
  96. '''
  97. Wrapper for select.kqueue()/kevent()
  98. '''
  99. max_events = 1000
  100. def initialize(self):
  101. self._kqueue = select.kqueue()
  102. self.readables = set()
  103. self.writables = set()
  104. def register_readable(self, fd):
  105. self.readables.add(fd)
  106. kevent = select.kevent(fd, filter=select.KQ_FILTER_READ,
  107. flags=select.KQ_EV_ADD)
  108. self._kqueue_control(fd, kevent)
  109. def register_writable(self, fd):
  110. self.writables.add(fd)
  111. kevent = select.kevent(fd, filter=select.KQ_FILTER_WRITE,
  112. flags=select.KQ_EV_ADD)
  113. self._kqueue_control(fd, kevent)
  114. def unregister(self, fd):
  115. kevent = select.kevent(
  116. fd,
  117. filter=(select.KQ_FILTER_READ | select.KQ_FILTER_WRITE),
  118. flags=select.KQ_EV_DELETE
  119. )
  120. self._forget_fd(fd)
  121. self._kqueue_control(fd, kevent)
  122. def _kqueue_control(self, fd, kevent):
  123. try:
  124. self._kqueue.control([kevent], 0)
  125. except OSError as error:
  126. if error.errno == errno.EBADF:
  127. self.options.logger.blather('EBADF encountered in kqueue. '
  128. 'Invalid file descriptor %s' % fd)
  129. else:
  130. raise
  131. def _forget_fd(self, fd):
  132. for collection in (self.readables, self.writables):
  133. try:
  134. collection.remove(fd)
  135. except KeyError:
  136. pass
  137. def poll(self, timeout):
  138. readables, writables = [], []
  139. try:
  140. kevents = self._kqueue.control(None, self.max_events, timeout)
  141. except OSError as error:
  142. if error.errno == errno.EINTR:
  143. self.options.logger.blather('EINTR encountered in poll')
  144. return readables, writables
  145. raise
  146. for kevent in kevents:
  147. if kevent.filter == select.KQ_FILTER_READ:
  148. readables.append(kevent.ident)
  149. if kevent.filter == select.KQ_FILTER_WRITE:
  150. writables.append(kevent.ident)
  151. return readables, writables
  152. def before_daemonize(self):
  153. self._kqueue.close()
  154. self._kqueue = None
  155. def after_daemonize(self):
  156. self._kqueue = select.kqueue()
  157. for fd in self.readables:
  158. self.register_readable(fd)
  159. for fd in self.writables:
  160. self.register_writable(fd)
  161. def implements_poll():
  162. return hasattr(select, 'poll')
  163. def implements_kqueue():
  164. return hasattr(select, 'kqueue')
  165. if implements_kqueue():
  166. Poller = KQueuePoller
  167. elif implements_poll():
  168. Poller = PollPoller
  169. else:
  170. Poller = SelectPoller