"""Components that do the real authentication stuff. """ import re import sys from pkg_resources import iter_entry_points try: import xmlrpclib # Python 2.x except ImportError: # pragma: no cover import xmlrpc.client as xmlrpclib # Python 3.x def get_all_authenticators(): """Get all authenticators registered via entry points. To create an own authenticator, in the ``setup.py`` of your package add a block like this:: entry_points=''' [waeup.cas.authenticators] myname = my.pkg.my.module:MyAuthenticator ''' where ``my.pkg.my.module`` must be a module importable at runtime and ``MyAuthenticator`` must be some Authenticator class defined in this module. ``myname`` can be chosen as you like, but must not clash with other ``waeup.cas.authenticators``. The name should contain ASCII letters and numbers only, and start with a letter. """ return dict( [(x.name, x.load()) for x in iter_entry_points(group='waeup.cas.authenticators')]) def get_authenticator(local_conf): """Get an `Authenticator` configured by ``local_conf``. `local_conf` must be a dictionary of settings. An authenticator is identified by its entry-point name which must be given as value of the ``auth`` key. It is created passing all keys/values where key starts with ``auth_``. Returns a dict of remaining keys/values in `local_conf` with the value of ``auth`` key replaced by an authenticator instance. """ gen_opts, auth_opts = filter_auth_opts(local_conf) if 'auth' in local_conf: auth_dict = get_all_authenticators() factory = auth_dict.get(local_conf['auth']) # get authenticator auth = factory(**auth_opts) gen_opts.update(auth=auth) return gen_opts def filter_auth_opts(local_conf): """Filter out auth-related opts in `local_conf`, a dict. Returns two dicts ``(, )`` containing the general options and authenticator related options. This is designed to work with typical paste.deploy config files like this:: [foo] foo = bar auth = foo auth_opt1 = bar All settings not starting with `auth_` are put into the general opts dict, while other are put into the authenticator opts dict. """ auth_opts = {} general_opts = {} for name, value in local_conf.items(): if name.startswith('auth_'): auth_opts[name] = value else: general_opts[name] = value return general_opts, auth_opts class Authenticator(object): #: The name of an authenticator name = 'basic' def __init__(self, **kw): pass class DummyAuthenticator(Authenticator): """A dummy authenticator for tests. This authenticator does no real authentication. It simply approves credentials ``('bird', 'bebop')`` and denies all other. """ name = 'dummy' def check_credentials(self, username='', password=''): """If username is ``'bird'`` and password ``'bebop'`` check will succeed. Returns a tuple ``(, )`` with ```` being a boolean and ```` being a string giving the reason why login failed (if so) or an empty string. """ reason = '' result = username == 'bird' and password == 'bebop' if not result: reason = 'Invalid username or password.' return result, reason #: Regular expression matching a starting university marker like #: the string 'MA-' in 'MA-M121212' or 'MA-' in 'MA-APP-13123' RE_SCHOOL_MARKER = re.compile('^[^\-]+-') class KofaAuthenticator(Authenticator): """Authenticate against a running Kofa instance. """ name = 'kofa1' def __init__(self, auth_backends="{}"): try: self.backends = eval(auth_backends) except: raise ValueError('auth_backends must be a ' 'valid Python expression.') self._check_options() def _check_options(self): if not isinstance(self.backends, dict): raise ValueError('Backends must be configured as dicts.') for key, val in self.backends.items(): if not isinstance(val, dict): raise ValueError( 'Backend %s: config must be a dict' % key) if not 'url' in val: raise ValueError( 'Backend %s: config must contain an `url` key.' % key) if not 'marker' in val: self.backends[key]['marker'] = '.+' try: re.compile(self.backends[key]['marker']) except: raise ValueError( 'Backend %s: marker must be a valid regular expr.:' % ( key,)) def check_credentials(self, username='', password=''): """Do the real check. """ for backend_name, backend in self.backends.items(): if backend['marker']: if not re.match(backend['marker'], username): continue # remove school marker username = RE_SCHOOL_MARKER.sub('', username) proxy = xmlrpclib.ServerProxy( backend['url'], allow_none=True) valid = proxy.check_applicant_credentials(username, password) if valid is None: valid = proxy.check_student_credentials(username, password) if valid is not None: return (True, '') return (False, 'Invalid username or password.') class KofaMoodleAuthenticator(KofaAuthenticator): """Authenticate against a running Kofa instance and transfer data to Moodle. Configuration of Moodle: 1. Set 'passwordpolicy' to No 2. Create external web service 'Kofa' with the following functions: core_user_create_users, core_user_get_users, core_user_update_users, enrol_manual_enrol_users 3. Create token for the admin user (no special web service user needed) and for service 'Kofa' 4. Enable and configure CAS server authentication. CAS protocol version is 1.0. Moodle expects SSL/TLS protocol. """ name = 'kofa_moodle1' def _create_user(self, username, userdata, moodle): try: # Usernames in Moodle must not contain uppercase # letters even if extendedusernamechars is set True. # Usernames in Moodle are case insensitive. # In other words, Moodle changes usernames from # uppercase to lowercase when you login. result = moodle.core_user_create_users([ {'username':username.lower(), 'password':'dummy', 'firstname':userdata['firstname'], 'lastname':userdata['lastname'], 'email':userdata['email']}]) except xmlrpclib.Fault: faultstring = sys.exc_info()[1].faultString if 'Email address already exists' in faultstring: return (False, 'Another Moodle user is using the same email address.' ' Email addresses can\'t be used twice in Moodle.') if not 'Username already exists' in faultstring: return (False, faultstring) try: result = moodle.core_user_get_users([ {'key':'username', 'value':username}]) except xmlrpclib.Fault: faultstring = sys.exc_info()[1].faultString return (False, faultstring) user_id = result['users'][0]['id'] # Due to a lack of Moodle (Moodle requires an LDAP # connection) the authentication method can't # be set when the user is created. It must be updated # after creation. try: result = moodle.core_user_update_users([ {'id':user_id,'auth':'cas'}]) except xmlrpclib.Fault: faultstring = sys.exc_info()[1].faultString return (False, faultstring) return (True, '') def check_credentials(self, username='', password=''): """Do the real check. """ for backend_name, backend in self.backends.items(): if backend['marker']: if not re.match(backend['marker'], username): continue # remove school marker kofa_username = RE_SCHOOL_MARKER.sub('', username) else: kofa_username = username proxy = xmlrpclib.ServerProxy( backend['url'], allow_none=True) moodle = xmlrpclib.ServerProxy( backend['moodle_url'], allow_none=True) principal = proxy.check_applicant_credentials(kofa_username, password) if principal is None: principal = proxy.check_student_credentials(kofa_username, password) if principal is None: return (False, 'Invalid username or password') if principal['type'] == 'student': userdata = proxy.get_student_moodle_data(kofa_username) return self._create_user(username, userdata, moodle) if principal['type'] == 'applicant': userdata = proxy.get_applicant_moodle_data(kofa_username) return self._create_user(username, userdata, moodle) return (False, 'User not eligible') return (False, 'Invalid username or password')