3# Tests for waeup.cas.authentictors import os import threading import unittest from paste.deploy import loadapp try: from SimpleXMLRPCServer import SimpleXMLRPCServer # Python 2.x except ImportError: from xmlrpc.server import SimpleXMLRPCServer # Python 3.x try: import xmlrpclib # Python 2.x except ImportError: import xmlrpc.client as xmlrpclib # Python 3.x from waeup.cas.authenticators import ( get_all_authenticators, get_authenticator, filter_auth_opts, Authenticator, DummyAuthenticator, KofaAuthenticator, KofaMoodleAuthenticator ) from waeup.cas.server import CASServer class TestHelpers(unittest.TestCase): def test_get_all_authenticators(self): # we can get authenticators via entry points auths = get_all_authenticators() assert 'dummy' in auths assert auths['dummy'] is DummyAuthenticator def test_filter_auth_opts(self): # we can filter auth opts result = filter_auth_opts( dict(auth='foo', auth_bar='baz', auth_baz='blah') ) assert result == ( {'auth': 'foo'}, {'auth_bar': 'baz', 'auth_baz': 'blah'} ) def test_get_authenticator(self): # we can parse a paste.deploy config dict for auth result = get_authenticator({}) assert result == {} result = get_authenticator({'auth': 'dummy'}) assert isinstance(result['auth'], DummyAuthenticator) class AuthenticatorTests(unittest.TestCase): def test_create(self): # we can create Authenticator instances auth = Authenticator() assert isinstance(auth, Authenticator) class DummyAuthenticatorTests(unittest.TestCase): def test_create(self): # we can create DummyAuthenticator instances auth = DummyAuthenticator() assert isinstance(auth, DummyAuthenticator) def test_check_credentials(self): # we can succeed with 'bird'/'bebop'. Others will fail. auth = DummyAuthenticator() result1 = auth.check_credentials('bird', 'bebop') assert result1 == (True, '') result2 = auth.check_credentials('foo', 'bar') assert result2 == (False, 'Invalid username or password.') BACKENDS1 = dict( inst1=dict( url='http://localhost:6666/app', marker='^MA-', ) ) BACKENDS2 = dict( inst1=dict( url='http://localhost:6666/', marker='^SCHOOL1-', ) ) BACKENDS3 = dict( inst1=dict( url='http://localhost:6666/', marker='^SCHOOL1-', moodle_url = 'http://localhost:7777/', ) ) class KofaAuthenticatorTests(unittest.TestCase): def test_create(self): # we can create KofaAuthenticator instances auth = KofaAuthenticator() assert isinstance(auth, KofaAuthenticator) def test_options_defaults(self): # all options have sensible defaults auth = KofaAuthenticator() assert auth.backends == {} def test_options(self): # we can pass options auth = KofaAuthenticator(auth_backends=str(BACKENDS1)) assert auth.backends == BACKENDS1 auth = KofaAuthenticator(auth_backends='{"foo": {"url": "bar"}}') assert auth.backends['foo']['marker'] == '.+' self.assertRaises( ValueError, KofaAuthenticator, auth_backends='not-a-py-expr') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='"Not a dict"') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='{"foo": "not-a-dict"}') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='{"foo": {"no-url-key": "bar"}}') self.assertRaises( ValueError, KofaAuthenticator, auth_backends='{"foo": {"url": "bar", "marker": "inv_re)"}}') def test_paste_deploy_options(self): # we can set CAS server-related options via paste.deploy config paste_conf = os.path.join( os.path.dirname(__file__), 'sample3.ini') app = loadapp('config:%s' % paste_conf) assert isinstance(app, CASServer) assert app.db_connection_string == 'sqlite:///:memory:' assert isinstance(app.auth, KofaAuthenticator) class KofaMoodleAuthenticatorTests(unittest.TestCase): def test_paste_deploy_options(self): # we can set CAS server-related options via paste.deploy config paste_conf = os.path.join( os.path.dirname(__file__), 'sample4.ini') app = loadapp('config:%s' % paste_conf) assert isinstance(app, CASServer) assert app.db_connection_string == 'sqlite:///:memory:' assert isinstance(app.auth, KofaAuthenticator) assert app.auth.name == 'kofa_moodle1' auth_backends = { 'kofa_moodle1': {'url': 'http://xmlrpcuser1:xmlrpcuser1@localhost:8080/app1/', 'marker': '^K', 'moodle_url': 'http://localhost/moodle/webservice/xmlrpc/server.php?wstoken=de1e1cbacf91ec6290bb6f6339122e7d', }, } assert app.auth.backends == auth_backends class FakeKofaServer(SimpleXMLRPCServer): # A fake Kofa server that provides only XMLRPC methods allow_reuse_address = True def __init__(self, *args, **kw): kw.update(allow_none=True) SimpleXMLRPCServer.__init__(self, *args, **kw) self.register_function( self._check_credentials, 'check_applicant_credentials') self.register_function( self._check_credentials, 'check_student_credentials') self.register_function( self._get_moodle_data, 'get_student_moodle_data') self.register_function( self._get_moodle_data, 'get_applicant_moodle_data') def _check_credentials(self, username, password): # fake waeup.kofa check_credentials method. # # This method is supposed to mimic the behaviour of an # original waeup.kofa check_credentials method. It returns a # positive result for the credentials `bird`/`bebop`. if username == 'bird' and password == 'bebop': return {'id': 'bird', 'email': 'bird@gods.net', 'description': 'Mr. Charles Parker', 'type': 'student'} if username == 'pig' and password == 'pog': return {'id': 'pig', 'email': 'pig@gods.net', 'description': 'Mr. Ray Charles', 'type': 'applicant'} if username in ( 'fault1', 'fault2', 'fault3', 'fault4', 'fault7') and password == 'biz': return {'type': 'student'} if username == 'fault5' and password == 'biz': return {'type': 'applicant'} if username == 'fault6' and password == 'biz': return {'type': 'boss'} return None def _get_moodle_data(self, username): # fake waeup.kofa get_student_moodle_data method. if username == 'bird': return dict(email='aa@aa.aa', firstname='Charles', lastname='Parker', ) if username == 'pig': return dict(email='bb@bb.bb', firstname='Ray', lastname='Charles', ) if username in ('fault1', 'fault2', 'fault3', 'fault4', 'fault5', 'fault7'): return dict(email='ff@ff.ff', firstname='John', lastname='Fault', ) class FakeMoodleServer(SimpleXMLRPCServer): # A fake Moodle server that provides only XMLRPC methods allow_reuse_address = True def __init__(self, *args, **kw): kw.update(allow_none=True) SimpleXMLRPCServer.__init__(self, *args, **kw) self.register_function( self._core_user_create_users, 'core_user_create_users') self.register_function( self._core_user_get_users, 'core_user_get_users') self.register_function( self._core_user_update_users, 'core_user_update_users') def _core_user_create_users(self, arg): # fake Moodle core_user_create_users method. if arg[0]['username'] == 'school1-fault1': raise xmlrpclib.Fault('faultCode', 'faultString') if arg[0]['username'] in ('school1-fault4', 'school1-fault5'): raise xmlrpclib.Fault('faultCode', 'Username already exists') if arg[0]['username'] == 'school1-fault7': raise xmlrpclib.Fault('faultCode', 'Email address already exists') return None def _core_user_get_users(self, arg): # fake Moodle core_user_get_users method. if arg[0]['value'] == 'SCHOOL1-fault2': raise xmlrpclib.Fault('faultCode', 'faultString') if arg[0]['value'] == 'SCHOOL1-fault3': return {'users':[{'id':'SCHOOL1-fault3'}]} return {'users':[{'id':'any id'}]} def _core_user_update_users(self, arg): # fake Moodle core_user_update_users method. if arg[0]['id'] == 'SCHOOL1-fault3': raise xmlrpclib.Fault('faultCode', 'faultString') return None class KofaAuthenticatorsIntegrationTests(unittest.TestCase): # A test case where a fake Kofa server and a fake Moodle server # are started before tests (and shut down afterwards). kofa_server = None kofa_th = None moodle_server = None moodle_th = None @classmethod def setUpClass(cls): # set up a fake kofa server cls.kofa_server = FakeKofaServer(('localhost', 6666)) cls.kofa_th = threading.Thread(target=cls.kofa_server.serve_forever) cls.kofa_th.daemon = True cls.kofa_th.start() # set up a fake moodle server cls.moodle_server = FakeMoodleServer(('localhost', 7777)) cls.moodle_th = threading.Thread(target=cls.moodle_server.serve_forever) cls.moodle_th.daemon = True cls.moodle_th.start() @classmethod def tearDownClass(cls): # tear down fake kofa server cls.kofa_server.shutdown() cls.kofa_server.server_close() # tear down fake moodle server cls.moodle_server.shutdown() cls.moodle_server.server_close() def test_fake_kofa_works(self): # make sure the local fake kofa works proxy = xmlrpclib.ServerProxy("http://localhost:6666", allow_none=True) result = proxy.check_student_credentials('bird', 'bebop') assert result == { 'description': 'Mr. Charles Parker', 'email': 'bird@gods.net', 'id': 'bird', 'type': 'student'} result = proxy.check_applicant_credentials('pig', 'pog') assert result == { 'description': 'Mr. Ray Charles', 'email': 'pig@gods.net', 'id': 'pig', 'type': 'applicant'} result = proxy.get_student_moodle_data('pig') assert result == { 'lastname': 'Charles', 'email': 'bb@bb.bb', 'firstname': 'Ray', } result = proxy.get_applicant_moodle_data('pig') assert result == { 'lastname': 'Charles', 'email': 'bb@bb.bb', 'firstname': 'Ray', } return def test_fake_moodle_works(self): # make sure the local fake moodle works proxy = xmlrpclib.ServerProxy("http://localhost:7777", allow_none=True) # test core_user_create_users result = proxy.core_user_create_users( [{'username': 'any name'}]) assert result == None self.assertRaises( xmlrpclib.Fault, proxy.core_user_create_users, [{'username': 'school1-fault1'}]) self.assertRaises( xmlrpclib.Fault, proxy.core_user_create_users, [{'username': 'school1-fault7'}]) # test core_user_get_users result = proxy.core_user_get_users( [{'key': 'username', 'value': 'any name'}]) assert result == {'users':[{'id':'any id'}]} self.assertRaises( xmlrpclib.Fault, proxy.core_user_get_users, [{'key': 'username', 'value': 'SCHOOL1-fault2'}]) # test core_user_update_users result = proxy.core_user_update_users( [{'id': 'any id'}]) assert result == None self.assertRaises( xmlrpclib.Fault, proxy.core_user_update_users, [{'id': 'SCHOOL1-fault3'}]) return def test_check_credentials_KofaAuthenticator(self): # we get real responses when querying Kofa instances auth = KofaAuthenticator(auth_backends=str(BACKENDS2)) result1 = auth.check_credentials('SCHOOL1-bird', 'bebop') assert result1 == (True, '') result2 = auth.check_credentials('SCHOOL1-foo', 'bar') assert result2 == (False, 'Invalid username or password.') result3 = auth.check_credentials('SCHOOL2-bar', 'baz') assert result3 == (False, 'Invalid username or password.') def test_check_credentials_KofaMoodleAuthenticator(self): # we get real responses when querying Kofa instances auth = KofaMoodleAuthenticator(auth_backends=str(BACKENDS3)) result1s = auth.check_credentials('SCHOOL1-bird', 'bebop') assert result1s == (True, '') result1a = auth.check_credentials('SCHOOL1-pig', 'pog') assert result1a == (True, '') result2s = auth.check_credentials('SCHOOL1-foo', 'bar') assert result2s == (False, 'Invalid username or password') result3s = auth.check_credentials('SCHOOL2-bar', 'baz') assert result3s == (False, 'Invalid username or password') result8 = auth.check_credentials('SCHOOL1-fault6', 'biz') assert result8 == (False, 'User not eligible') result9 = auth.check_credentials('SCHOOL1-fault7', 'biz') assert result9 == (False, 'Another Moodle user is using the same email address.' ' Email addresses can\'t be used twice in Moodle.') # exceptions are raised in the following cases result4s = auth.check_credentials('SCHOOL1-fault1', 'biz') assert result4s == (False, 'faultString') result5s = auth.check_credentials('SCHOOL1-fault2', 'biz') assert result5s == (False, 'faultString') result6s = auth.check_credentials('SCHOOL1-fault3', 'biz') assert result6s == (False, 'faultString') # no exception if user exists result7s = auth.check_credentials('SCHOOL1-fault4', 'biz') assert result7s == (True, '') result7a = auth.check_credentials('SCHOOL1-fault5', 'biz') assert result7a == (True, '') # If marker is an empty sring school markers are neither # checked nor removed. BACKENDS3['inst1']['marker'] = '' auth = KofaMoodleAuthenticator(auth_backends=str(BACKENDS3)) result10s = auth.check_credentials('bird', 'bebop') assert result10s == (True, '') result10a = auth.check_credentials('pig', 'pog') assert result10a == (True, '') result11s = auth.check_credentials('SCHOOL1-bird', 'bebop') assert result11s == (False, 'Invalid username or password') result11a = auth.check_credentials('SCHOOL1-pig', 'pog') assert result11a == (False, 'Invalid username or password')