socket_manager.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import socket
  2. class Proxy:
  3. """ Class for wrapping a shared resource object and getting
  4. notified when it's deleted
  5. """
  6. def __init__(self, object, **kwargs):
  7. self.object = object
  8. self.on_delete = kwargs.get('on_delete', None)
  9. def __del__(self):
  10. if self.on_delete:
  11. self.on_delete()
  12. def __getattr__(self, name):
  13. return getattr(self.object, name)
  14. def _get(self):
  15. return self.object
  16. class ReferenceCounter:
  17. """ Class for tracking references to a shared resource
  18. """
  19. def __init__(self, **kwargs):
  20. self.on_non_zero = kwargs['on_non_zero']
  21. self.on_zero = kwargs['on_zero']
  22. self.ref_count = 0
  23. def get_count(self):
  24. return self.ref_count
  25. def increment(self):
  26. if self.ref_count == 0:
  27. self.on_non_zero()
  28. self.ref_count = self.ref_count + 1
  29. def decrement(self):
  30. if self.ref_count <= 0:
  31. raise Exception('Illegal operation: cannot decrement below zero')
  32. self.ref_count -= 1
  33. if self.ref_count == 0:
  34. self.on_zero()
  35. class SocketManager:
  36. """ Class for managing sockets in servers that create/bind/listen
  37. before forking multiple child processes to accept()
  38. Sockets are managed at the process group level and referenced counted
  39. at the process level b/c that's really the only place to hook in
  40. """
  41. def __init__(self, socket_config, **kwargs):
  42. self.logger = kwargs.get('logger', None)
  43. self.socket = None
  44. self.prepared = False
  45. self.socket_config = socket_config
  46. self.ref_ctr = ReferenceCounter(on_zero=self._close, on_non_zero=self._prepare_socket)
  47. def __repr__(self):
  48. return '<%s at %s for %s>' % (self.__class__,
  49. id(self),
  50. self.socket_config.url)
  51. def config(self):
  52. return self.socket_config
  53. def is_prepared(self):
  54. return self.prepared
  55. def get_socket(self):
  56. self.ref_ctr.increment()
  57. self._require_prepared()
  58. return Proxy(self.socket, on_delete=self.ref_ctr.decrement)
  59. def get_socket_ref_count(self):
  60. self._require_prepared()
  61. return self.ref_ctr.get_count()
  62. def _require_prepared(self):
  63. if not self.prepared:
  64. raise Exception('Socket has not been prepared')
  65. def _prepare_socket(self):
  66. if not self.prepared:
  67. if self.logger:
  68. self.logger.info('Creating socket %s' % self.socket_config)
  69. self.socket = self.socket_config.create_and_bind()
  70. self.socket.listen(socket.SOMAXCONN)
  71. self.prepared = True
  72. def _close(self):
  73. self._require_prepared()
  74. if self.logger:
  75. self.logger.info('Closing socket %s' % self.socket_config)
  76. self.socket.close()
  77. self.prepared = False