|
@@ -1,6 +1,7 @@
|
|
|
-import sys
|
|
|
+import base64
|
|
|
import os
|
|
|
import socket
|
|
|
+import sys
|
|
|
import tempfile
|
|
|
import unittest
|
|
|
|
|
@@ -265,25 +266,29 @@ class EncryptedDictionaryAuthorizedTests(unittest.TestCase):
|
|
|
|
|
|
def test_authorize_baduser(self):
|
|
|
authorizer = self._makeOne({})
|
|
|
- self.assertEqual(authorizer.authorize(('foo', 'bar')), False)
|
|
|
+ self.assertFalse(authorizer.authorize(('foo', 'bar')))
|
|
|
|
|
|
def test_authorize_gooduser_badpassword(self):
|
|
|
authorizer = self._makeOne({'foo':'password'})
|
|
|
- self.assertEqual(authorizer.authorize(('foo', 'bar')), False)
|
|
|
+ self.assertFalse(authorizer.authorize(('foo', 'bar')))
|
|
|
|
|
|
def test_authorize_gooduser_goodpassword(self):
|
|
|
authorizer = self._makeOne({'foo':'password'})
|
|
|
- self.assertEqual(authorizer.authorize(('foo', 'password')), True)
|
|
|
+ self.assertTrue(authorizer.authorize(('foo', 'password')))
|
|
|
+
|
|
|
+ def test_authorize_gooduser_goodpassword_with_colon(self):
|
|
|
+ authorizer = self._makeOne({'foo':'pass:word'})
|
|
|
+ self.assertTrue(authorizer.authorize(('foo', 'pass:word')))
|
|
|
|
|
|
def test_authorize_gooduser_badpassword_sha(self):
|
|
|
password = '{SHA}' + sha1('password').hexdigest()
|
|
|
authorizer = self._makeOne({'foo':password})
|
|
|
- self.assertEqual(authorizer.authorize(('foo', 'bar')), False)
|
|
|
+ self.assertFalse(authorizer.authorize(('foo', 'bar')))
|
|
|
|
|
|
def test_authorize_gooduser_goodpassword_sha(self):
|
|
|
password = '{SHA}' + sha1('password').hexdigest()
|
|
|
authorizer = self._makeOne({'foo':password})
|
|
|
- self.assertEqual(authorizer.authorize(('foo', 'password')), True)
|
|
|
+ self.assertTrue(authorizer.authorize(('foo', 'password')))
|
|
|
|
|
|
class SupervisorAuthHandlerTests(unittest.TestCase):
|
|
|
def _getTargetClass(self):
|
|
@@ -299,6 +304,33 @@ class SupervisorAuthHandlerTests(unittest.TestCase):
|
|
|
self.assertEqual(handler.authorizer.__class__,
|
|
|
encrypted_dictionary_authorizer)
|
|
|
|
|
|
+ def test_handle_request_authorizes_good_credentials(self):
|
|
|
+ request = DummyRequest('/logtail/process1', None, None, None)
|
|
|
+ encoded = base64.b64encode("user:password")
|
|
|
+ request.header = ["Authorization: Basic %s" % encoded]
|
|
|
+ handler = DummyHandler()
|
|
|
+ auth_handler = self._makeOne({'user':'password'}, handler)
|
|
|
+ auth_handler.handle_request(request)
|
|
|
+ self.assertTrue(handler.handled_request)
|
|
|
+
|
|
|
+ def test_handle_request_authorizes_good_password_with_colon(self):
|
|
|
+ request = DummyRequest('/logtail/process1', None, None, None)
|
|
|
+ encoded = base64.b64encode("user:pass:word") # password contains colon
|
|
|
+ request.header = ["Authorization: Basic %s" % encoded]
|
|
|
+ handler = DummyHandler()
|
|
|
+ auth_handler = self._makeOne({'user':'pass:word'}, handler)
|
|
|
+ auth_handler.handle_request(request)
|
|
|
+ self.assertTrue(handler.handled_request)
|
|
|
+
|
|
|
+ def test_handle_request_does_not_authorize_bad_credentials(self):
|
|
|
+ request = DummyRequest('/logtail/process1', None, None, None)
|
|
|
+ encoded = base64.b64encode("wrong:wrong")
|
|
|
+ request.header = ["Authorization: Basic %s" % encoded]
|
|
|
+ handler = DummyHandler()
|
|
|
+ auth_handler = self._makeOne({'user':'password'}, handler)
|
|
|
+ auth_handler.handle_request(request)
|
|
|
+ self.assertFalse(handler.handled_request)
|
|
|
+
|
|
|
|
|
|
class TopLevelFunctionTests(unittest.TestCase):
|
|
|
def _make_http_servers(self, sconfigs):
|
|
@@ -362,6 +394,13 @@ class TopLevelFunctionTests(unittest.TestCase):
|
|
|
self.assertTrue(isinstance(handler, supervisor_auth_handler),
|
|
|
handler)
|
|
|
|
|
|
+class DummyHandler:
|
|
|
+ def __init__(self):
|
|
|
+ self.handled_request = False
|
|
|
+
|
|
|
+ def handle_request(self, request):
|
|
|
+ self.handled_request = True
|
|
|
+
|
|
|
class DummyProducer:
|
|
|
def __init__(self, *data):
|
|
|
self.data = list(data)
|