socket_manager.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import socket
  2. class Proxy:
  3. """ Class for wrapping a shared resource object and getting
  4. notified when it's deleted
  5. """
  6. def __init__(self, object, **kwargs):
  7. self.object = object
  8. self.on_delete = kwargs.get('on_delete', None)
  9. def __del__(self):
  10. if self.on_delete:
  11. self.on_delete()
  12. def __getattr__(self, name):
  13. return getattr(self.object, name)
  14. def _get(self):
  15. return self.object
  16. class ReferenceCounter:
  17. """ Class for tracking references to a shared resource
  18. """
  19. def __init__(self, **kwargs):
  20. self.on_non_zero = kwargs['on_non_zero']
  21. self.on_zero = kwargs['on_zero']
  22. self.ref_count = 0
  23. def get_count(self):
  24. return self.ref_count
  25. def increment(self):
  26. if self.ref_count == 0:
  27. self.on_non_zero()
  28. self.ref_count = self.ref_count + 1
  29. def decrement(self):
  30. if self.ref_count <= 0:
  31. raise Exception('Illegal operation: cannot decrement below zero')
  32. self.ref_count -= 1
  33. if self.ref_count == 0:
  34. self.on_zero()
  35. class SocketManager:
  36. """ Class for managing sockets in servers that create/bind/listen
  37. before forking multiple child processes to accept()
  38. Sockets are managed at the process group level and referenced counted
  39. at the process level b/c that's really the only place to hook in
  40. """
  41. def __init__(self, socket_config, **kwargs):
  42. self.logger = kwargs.get('logger', None)
  43. self.socket = None
  44. self.prepared = False
  45. self.socket_config = socket_config
  46. self.ref_ctr = ReferenceCounter(
  47. on_zero=self._close, on_non_zero=self._prepare_socket
  48. )
  49. def __repr__(self):
  50. return '<%s at %s for %s>' % (self.__class__,
  51. id(self),
  52. self.socket_config.url)
  53. def config(self):
  54. return self.socket_config
  55. def is_prepared(self):
  56. return self.prepared
  57. def get_socket(self):
  58. self.ref_ctr.increment()
  59. self._require_prepared()
  60. return Proxy(self.socket, on_delete=self.ref_ctr.decrement)
  61. def get_socket_ref_count(self):
  62. self._require_prepared()
  63. return self.ref_ctr.get_count()
  64. def _require_prepared(self):
  65. if not self.prepared:
  66. raise Exception('Socket has not been prepared')
  67. def _prepare_socket(self):
  68. if not self.prepared:
  69. if self.logger:
  70. self.logger.info('Creating socket %s' % self.socket_config)
  71. self.socket = self.socket_config.create_and_bind()
  72. self.socket.listen(socket.SOMAXCONN)
  73. self.prepared = True
  74. def _close(self):
  75. self._require_prepared()
  76. if self.logger:
  77. self.logger.info('Closing socket %s' % self.socket_config)
  78. self.socket.close()
  79. self.prepared = False