# Tests for waeup.cas.authentictors import os import threading import unittest from paste.deploy import loadapp try: from SimpleXMLRPCServer import SimpleXMLRPCServer # Python 2.x except ImportError: from xmlrpc.server import SimpleXMLRPCServer # Python 3.x try: import xmlrpclib # Python 2.x except ImportError: import xmlrpc.client as xmlrpclib # Python 3.x from waeup.cas.authenticators import ( get_all_authenticators, get_authenticator, filter_auth_opts, Authenticator, DummyAuthenticator, KofaAuthenticator ) from waeup.cas.server import CASServer class TestHelpers(unittest.TestCase): def test_get_all_authenticators(self): # we can get authenticators via entry points auths = get_all_authenticators() assert 'dummy' in auths assert auths['dummy'] is DummyAuthenticator def test_filter_auth_opts(self): # we can filter auth opts result = filter_auth_opts( dict(auth='foo', auth_bar='baz', auth_baz='blah') ) assert result == ( {'auth': 'foo'}, {'auth_bar': 'baz', 'auth_baz': 'blah'} ) def test_get_authenticator(self): # we can parse a paste.deploy config dict for auth result = get_authenticator({}) assert result == {} result = get_authenticator({'auth': 'dummy'}) assert isinstance(result['auth'], DummyAuthenticator) class AuthenticatorTests(unittest.TestCase): def test_create(self): # we can create Authenticator instances auth = Authenticator() assert isinstance(auth, Authenticator) class DummyAuthenticatorTests(unittest.TestCase): def test_create(self): # we can create DummyAuthenticator instances auth = DummyAuthenticator() assert isinstance(auth, DummyAuthenticator) def test_check_credentials(self): # we can succeed with 'bird'/'bebop'. Others will fail. auth = DummyAuthenticator() result1 = auth.check_credentials('bird', 'bebop') assert result1 == (True, '') result2 = auth.check_credentials('foo', 'bar') assert result2 == (False, 'Invalid username or password.') BACKENDS1 = dict( inst1=dict( url='http://localhost:6666/app', marker='^MA-', ) ) BACKENDS2 = dict( inst1=dict( url='http://localhost:6666/', marker='^SCHOOL1-', ) ) class KofaAuthenticatorTests(unittest.TestCase): def test_create(self): # we can create KofaAuthenticator instances auth = KofaAuthenticator() assert isinstance(auth, KofaAuthenticator) def test_options_defaults(self): # all options have sensible defaults auth = KofaAuthenticator() assert auth.backends == {} def test_options(self): # we can pass options auth = KofaAuthenticator(auth_backends=str(BACKENDS1)) assert auth.backends == BACKENDS1 auth = KofaAuthenticator(auth_backends='{"foo": {"url": "bar"}}') assert auth.backends['foo']['marker'] == '.+' self.assertRaises( ValueError, KofaAuthenticator, auth_backends='not-a-py-expr') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='"Not a dict"') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='{"foo": "not-a-dict"}') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='{"foo": {"no-url-key": "bar"}}') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='{"foo": {"url": "bar", "marker": "inv_re)"}}') def test_paste_deploy_options(self): # we can set CAS server-related options via paste.deploy config paste_conf = os.path.join( os.path.dirname(__file__), 'sample3.ini') app = loadapp('config:%s' % paste_conf) assert isinstance(app, CASServer) assert app.db_connection_string == 'sqlite:///:memory:' assert isinstance(app.auth, KofaAuthenticator) class FakeKofaServer(SimpleXMLRPCServer): # A fake Kofa server that provides only XMLRPC methods allow_reuse_address = True def __init__(self, *args, **kw): kw.update(allow_none=True) SimpleXMLRPCServer.__init__(self, *args, **kw) self.register_function(self._check_credentials, 'check_credentials') def _check_credentials(self, username, password): # fake waeup.kofa check_credentials method. # # This method is supposed to mimic the behaviour of an # original waeup.kofa check_credentials method. It returns a # positive result for the credentials `bird`/`bebop`. if username == 'bird' and password == 'bebop': return {'id': 'bird', 'email': 'bird@gods.net', 'description': 'Mr. Charles Parker'} return None class XMLRPCFakeKofaTestCase(unittest.TestCase): # A test case where a fake Kofa server is started before tests (and # shut down afterwards). server = None th = None @classmethod def setUpClass(cls): cls.server = FakeKofaServer(('localhost', 6666)) cls.th = threading.Thread(target=cls.server.serve_forever) cls.th.daemon = True cls.th.start() @classmethod def tearDownClass(cls): cls.server.shutdown() cls.server.server_close() class MyCase(XMLRPCFakeKofaTestCase): def test_fake_kofa_works(self): # make sure the local fake kofa works proxy = xmlrpclib.ServerProxy("http://localhost:6666", allow_none=True) result = proxy.check_credentials('bird', 'bebop') assert result == { 'description': 'Mr. Charles Parker', 'email': 'bird@gods.net', 'id': 'bird'} return def test_check_credentials(self): # we get real responses when querying Kofa instances auth = KofaAuthenticator(auth_backends=str(BACKENDS2)) result1 = auth.check_credentials('SCHOOL1-bird', 'bebop') assert result1 == (True, '') result2 = auth.check_credentials('SCHOOL1-foo', 'bar') assert result2 == (False, 'Invalid username or password.') result3 = auth.check_credentials('SCHOOL2-bar', 'baz') assert result3 == (False, 'Invalid username or password.')