123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- import base64
- import os
- import socket
- import stat
- import sys
- import tempfile
- import unittest
- try:
- from hashlib import sha1
- except ImportError:
- from sha import new as sha1
- from supervisor.tests.base import DummySupervisor
- from supervisor.tests.base import PopulatedDummySupervisor
- from supervisor.tests.base import DummyRPCInterfaceFactory
- from supervisor.tests.base import DummyPConfig
- from supervisor.tests.base import DummyOptions
- from supervisor.tests.base import DummyRequest
- from supervisor.http import NOT_DONE_YET
- class HandlerTests:
- def _makeOne(self, supervisord):
- return self._getTargetClass()(supervisord)
- def test_match(self):
- class FakeRequest:
- def __init__(self, uri):
- self.uri = uri
- supervisor = DummySupervisor()
- handler = self._makeOne(supervisor)
- self.assertEqual(handler.match(FakeRequest(handler.path)), True)
- class LogtailHandlerTests(HandlerTests, unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import logtail_handler
- return logtail_handler
- def test_handle_request_stdout_logfile_none(self):
- options = DummyOptions()
- pconfig = DummyPConfig(options, 'process1', '/bin/process1', priority=1,
- stdout_logfile='/tmp/process1.log')
- supervisord = PopulatedDummySupervisor(options, 'process1', pconfig)
- handler = self._makeOne(supervisord)
- request = DummyRequest('/logtail/process1', None, None, None)
- handler.handle_request(request)
- self.assertEqual(request._error, 410)
- def test_handle_request_stdout_logfile_missing(self):
- options = DummyOptions()
- pconfig = DummyPConfig(options, 'foo', 'foo', 'it/is/missing')
- supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
- handler = self._makeOne(supervisord)
- request = DummyRequest('/logtail/foo', None, None, None)
- handler.handle_request(request)
- self.assertEqual(request._error, 410)
- def test_handle_request(self):
- f = tempfile.NamedTemporaryFile()
- t = f.name
- options = DummyOptions()
- pconfig = DummyPConfig(options, 'foo', 'foo', stdout_logfile=t)
- supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
- handler = self._makeOne(supervisord)
- request = DummyRequest('/logtail/foo', None, None, None)
- handler.handle_request(request)
- self.assertEqual(request._error, None)
- from supervisor.medusa import http_date
- self.assertEqual(request.headers['Last-Modified'],
- http_date.build_http_date(os.stat(t)[stat.ST_MTIME]))
- self.assertEqual(request.headers['Content-Type'], 'text/plain')
- self.assertEqual(len(request.producers), 1)
- self.assertEqual(request._done, True)
- class MainLogTailHandlerTests(HandlerTests, unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import mainlogtail_handler
- return mainlogtail_handler
- def test_handle_request_stdout_logfile_none(self):
- supervisor = DummySupervisor()
- handler = self._makeOne(supervisor)
- request = DummyRequest('/mainlogtail', None, None, None)
- handler.handle_request(request)
- self.assertEqual(request._error, 410)
- def test_handle_request_stdout_logfile_missing(self):
- supervisor = DummySupervisor()
- supervisor.options.logfile = '/not/there'
- request = DummyRequest('/mainlogtail', None, None, None)
- handler = self._makeOne(supervisor)
- handler.handle_request(request)
- self.assertEqual(request._error, 410)
- def test_handle_request(self):
- supervisor = DummySupervisor()
- f = tempfile.NamedTemporaryFile()
- t = f.name
- supervisor.options.logfile = t
- handler = self._makeOne(supervisor)
- request = DummyRequest('/mainlogtail', None, None, None)
- handler.handle_request(request)
- self.assertEqual(request._error, None)
- from supervisor.medusa import http_date
- self.assertEqual(request.headers['Last-Modified'],
- http_date.build_http_date(os.stat(t)[stat.ST_MTIME]))
- self.assertEqual(request.headers['Content-Type'], 'text/plain')
- self.assertEqual(len(request.producers), 1)
- self.assertEqual(request._done, True)
- class TailFProducerTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import tail_f_producer
- return tail_f_producer
- def _makeOne(self, request, filename, head):
- return self._getTargetClass()(request, filename, head)
- def test_handle_more(self):
- request = DummyRequest('/logtail/foo', None, None, None)
- from supervisor import http
- f = tempfile.NamedTemporaryFile()
- f.write('a' * 80)
- f.flush()
- producer = self._makeOne(request, f.name, 80)
- result = producer.more()
- self.assertEqual(result, 'a' * 80)
- f.write('w' * 100)
- f.flush()
- result = producer.more()
- self.assertEqual(result, 'w' * 100)
- result = producer.more()
- self.assertEqual(result, http.NOT_DONE_YET)
- f.truncate(0)
- f.flush()
- result = producer.more()
- self.assertEqual(result, '==> File truncated <==\n')
- def test_handle_more_follow(self):
- request = DummyRequest('/logtail/foo', None, None, None)
- f = tempfile.NamedTemporaryFile()
- f.write('a' * 80)
- f.flush()
- producer = self._makeOne(request, f.name, 80)
- result = producer.more()
- self.assertEqual(result, 'a' * 80)
- f.close()
- f2 = open(f.name, 'w')
- try:
- f2.write('b' * 80)
- f2.close()
- result = producer.more()
- finally:
- os.unlink(f2.name)
- self.assertEqual(result, 'b' * 80)
- class DeferringChunkedProducerTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import deferring_chunked_producer
- return deferring_chunked_producer
- def _makeOne(self, producer, footers=None):
- return self._getTargetClass()(producer, footers)
- def test_more_not_done_yet(self):
- wrapped = DummyProducer(NOT_DONE_YET)
- producer = self._makeOne(wrapped)
- self.assertEqual(producer.more(), NOT_DONE_YET)
- def test_more_string(self):
- wrapped = DummyProducer('hello')
- producer = self._makeOne(wrapped)
- self.assertEqual(producer.more(), '5\r\nhello\r\n')
- def test_more_nodata(self):
- wrapped = DummyProducer()
- producer = self._makeOne(wrapped, footers=['a', 'b'])
- self.assertEqual(producer.more(), '0\r\na\r\nb\r\n\r\n')
- class DeferringCompositeProducerTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import deferring_composite_producer
- return deferring_composite_producer
- def _makeOne(self, producers):
- return self._getTargetClass()(producers)
- def test_more_not_done_yet(self):
- wrapped = DummyProducer(NOT_DONE_YET)
- producer = self._makeOne([wrapped])
- self.assertEqual(producer.more(), NOT_DONE_YET)
- def test_more_string(self):
- wrapped1 = DummyProducer('hello')
- wrapped2 = DummyProducer('goodbye')
- producer = self._makeOne([wrapped1, wrapped2])
- self.assertEqual(producer.more(), 'hello')
- self.assertEqual(producer.more(), 'goodbye')
- self.assertEqual(producer.more(), '')
- def test_more_nodata(self):
- wrapped = DummyProducer()
- producer = self._makeOne([wrapped])
- self.assertEqual(producer.more(), '')
- class DeferringGlobbingProducerTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import deferring_globbing_producer
- return deferring_globbing_producer
- def _makeOne(self, producer, buffer_size=1<<16):
- return self._getTargetClass()(producer, buffer_size)
- def test_more_not_done_yet(self):
- wrapped = DummyProducer(NOT_DONE_YET)
- producer = self._makeOne(wrapped)
- self.assertEqual(producer.more(), NOT_DONE_YET)
- def test_more_string(self):
- wrapped = DummyProducer('hello', 'there', 'guy')
- producer = self._makeOne(wrapped, buffer_size=1)
- self.assertEqual(producer.more(), 'hello')
- wrapped = DummyProducer('hello', 'there', 'guy')
- producer = self._makeOne(wrapped, buffer_size=50)
- self.assertEqual(producer.more(), 'hellothereguy')
- def test_more_nodata(self):
- wrapped = DummyProducer()
- producer = self._makeOne(wrapped)
- self.assertEqual(producer.more(), '')
- class DeferringHookedProducerTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import deferring_hooked_producer
- return deferring_hooked_producer
- def _makeOne(self, producer, function):
- return self._getTargetClass()(producer, function)
- def test_more_not_done_yet(self):
- wrapped = DummyProducer(NOT_DONE_YET)
- producer = self._makeOne(wrapped, None)
- self.assertEqual(producer.more(), NOT_DONE_YET)
- def test_more_string(self):
- wrapped = DummyProducer('hello')
- L = []
- def callback(bytes):
- L.append(bytes)
- producer = self._makeOne(wrapped, callback)
- self.assertEqual(producer.more(), 'hello')
- self.assertEqual(L, [])
- producer.more()
- self.assertEqual(L, [5])
- def test_more_nodata(self):
- wrapped = DummyProducer()
- L = []
- def callback(bytes):
- L.append(bytes)
- producer = self._makeOne(wrapped, callback)
- self.assertEqual(producer.more(), '')
- self.assertEqual(L, [0])
- class EncryptedDictionaryAuthorizedTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import encrypted_dictionary_authorizer
- return encrypted_dictionary_authorizer
- def _makeOne(self, dict):
- return self._getTargetClass()(dict)
- def test_authorize_baduser(self):
- authorizer = self._makeOne({})
- self.assertFalse(authorizer.authorize(('foo', 'bar')))
- def test_authorize_gooduser_badpassword(self):
- authorizer = self._makeOne({'foo':'password'})
- self.assertFalse(authorizer.authorize(('foo', 'bar')))
- def test_authorize_gooduser_goodpassword(self):
- authorizer = self._makeOne({'foo':'password'})
- self.assertTrue(authorizer.authorize(('foo', 'password')))
- def test_authorize_gooduser_goodpassword_with_colon(self):
- authorizer = self._makeOne({'foo':'pass:word'})
- self.assertTrue(authorizer.authorize(('foo', 'pass:word')))
- def test_authorize_gooduser_badpassword_sha(self):
- password = '{SHA}' + sha1('password').hexdigest()
- authorizer = self._makeOne({'foo':password})
- self.assertFalse(authorizer.authorize(('foo', 'bar')))
- def test_authorize_gooduser_goodpassword_sha(self):
- password = '{SHA}' + sha1('password').hexdigest()
- authorizer = self._makeOne({'foo':password})
- self.assertTrue(authorizer.authorize(('foo', 'password')))
- class SupervisorAuthHandlerTests(unittest.TestCase):
- def _getTargetClass(self):
- from supervisor.http import supervisor_auth_handler
- return supervisor_auth_handler
- def _makeOne(self, dict, handler):
- return self._getTargetClass()(dict, handler)
- def test_ctor(self):
- handler = self._makeOne({'a':1}, None)
- from supervisor.http import encrypted_dictionary_authorizer
- self.assertEqual(handler.authorizer.__class__,
- encrypted_dictionary_authorizer)
- def test_handle_request_authorizes_good_credentials(self):
- request = DummyRequest('/logtail/process1', None, None, None)
- encoded = base64.b64encode("user:password")
- request.header = ["Authorization: Basic %s" % encoded]
- handler = DummyHandler()
- auth_handler = self._makeOne({'user':'password'}, handler)
- auth_handler.handle_request(request)
- self.assertTrue(handler.handled_request)
- def test_handle_request_authorizes_good_password_with_colon(self):
- request = DummyRequest('/logtail/process1', None, None, None)
- encoded = base64.b64encode("user:pass:word") # password contains colon
- request.header = ["Authorization: Basic %s" % encoded]
- handler = DummyHandler()
- auth_handler = self._makeOne({'user':'pass:word'}, handler)
- auth_handler.handle_request(request)
- self.assertTrue(handler.handled_request)
- def test_handle_request_does_not_authorize_bad_credentials(self):
- request = DummyRequest('/logtail/process1', None, None, None)
- encoded = base64.b64encode("wrong:wrong")
- request.header = ["Authorization: Basic %s" % encoded]
- handler = DummyHandler()
- auth_handler = self._makeOne({'user':'password'}, handler)
- auth_handler.handle_request(request)
- self.assertFalse(handler.handled_request)
- class TopLevelFunctionTests(unittest.TestCase):
- def _make_http_servers(self, sconfigs):
- options = DummyOptions()
- options.server_configs = sconfigs
- options.rpcinterface_factories = [('dummy',DummyRPCInterfaceFactory,{})]
- supervisord = DummySupervisor()
- from supervisor.http import make_http_servers
- servers = make_http_servers(options, supervisord)
- try:
- for config, s in servers:
- s.close()
- socketfile = config.get('file')
- if socketfile is not None:
- os.unlink(socketfile)
- finally:
- from asyncore import socket_map
- socket_map.clear()
- return servers
- def test_make_http_servers_noauth(self):
- socketfile = tempfile.mktemp()
- inet = {'family':socket.AF_INET, 'host':'localhost', 'port':17735,
- 'username':None, 'password':None, 'section':'inet_http_server'}
- unix = {'family':socket.AF_UNIX, 'file':socketfile, 'chmod':0700,
- 'chown':(-1, -1), 'username':None, 'password':None,
- 'section':'unix_http_server'}
- servers = self._make_http_servers([inet, unix])
- self.assertEqual(len(servers), 2)
- inetdata = servers[0]
- self.assertEqual(inetdata[0], inet)
- server = inetdata[1]
- idents = [
- 'Supervisor XML-RPC Handler',
- 'Logtail HTTP Request Handler',
- 'Main Logtail HTTP Request Handler',
- 'Supervisor Web UI HTTP Request Handler',
- 'Default HTTP Request Handler'
- ]
- self.assertEqual([x.IDENT for x in server.handlers], idents)
- unixdata = servers[1]
- self.assertEqual(unixdata[0], unix)
- server = unixdata[1]
- self.assertEqual([x.IDENT for x in server.handlers], idents)
- def test_make_http_servers_withauth(self):
- socketfile = tempfile.mktemp()
- inet = {'family':socket.AF_INET, 'host':'localhost', 'port':17736,
- 'username':'username', 'password':'password',
- 'section':'inet_http_server'}
- unix = {'family':socket.AF_UNIX, 'file':socketfile, 'chmod':0700,
- 'chown':(-1, -1), 'username':'username', 'password':'password',
- 'section':'unix_http_server'}
- servers = self._make_http_servers([inet, unix])
- self.assertEqual(len(servers), 2)
- from supervisor.http import supervisor_auth_handler
- for config, server in servers:
- for handler in server.handlers:
- self.assertTrue(isinstance(handler, supervisor_auth_handler),
- handler)
- class DummyHandler:
- def __init__(self):
- self.handled_request = False
- def handle_request(self, request):
- self.handled_request = True
- class DummyProducer:
- def __init__(self, *data):
- self.data = list(data)
- def more(self):
- if self.data:
- return self.data.pop(0)
- else:
- return ''
- def test_suite():
- return unittest.findTestCases(sys.modules[__name__])
- if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
|