"""Components to handle access codes. """ import csv import grok import os from BTrees.OIBTree import OIBTree from datetime import datetime from hurry.workflow.interfaces import ( IWorkflowInfo, InvalidTransitionError, IWorkflowState) from random import SystemRandom as random from waeup.sirp.interfaces import IWAeUPSIRPPluggable from waeup.sirp.accesscodes.interfaces import ( IAccessCode, IAccessCodeBatch, IAccessCodeBatchContainer ) from waeup.sirp.accesscodes.workflow import DISABLED, USED class ManageACBatches(grok.Permission): grok.name('waeup.manageACBatches') class AccessCode(grok.Model): grok.implements(IAccessCode) def __init__(self, batch_serial, random_num): self.batch_serial = batch_serial self.random_num = random_num IWorkflowInfo(self).fireTransition('init') @property def representation(self): return '%s-%s-%s' % ( self.batch_prefix, self.batch_num, self.random_num) @property def batch(self): return getattr(self, '__parent__', None) @property def batch_prefix(self): if self.batch is None: return '' return self.batch.prefix @property def batch_num(self): if self.batch is None: return '' return self.batch.num @property def cost(self): if self.batch is None: return None return self.batch.cost @property def disabled(self): return IWorkflowState(self).getState() == DISABLED @property def used(self): return IWorkflowState(self).getState() == USED class AccessCodeBatch(grok.Model): """A batch of access codes. """ grok.implements(IAccessCodeBatch) def __init__(self, creation_date, creator, batch_prefix, cost, entry_num, num=None): super(AccessCodeBatch, self).__init__() self.creation_date = creation_date self.creator = creator self.prefix = batch_prefix.upper() self.cost = cost self.entry_num = entry_num self.num = num self.invalidated_num = 0 self.disabled_num = 0 self._entries = list() self._acids = OIBTree() self._createEntries() def _createEntries(self): """Create the entries for this batch. """ for num, pin in enumerate(self._getNewRandomNum(num=self.entry_num)): self.addAccessCode(num, pin) return def _getNewRandomNum(self, num=1): """Create a set of ``num`` random numbers of 10 digits each. The number is returned as string. """ curr = 1 while curr <= num: pin = '' for x in range(10): pin += str(random().randint(0, 9)) if not '%s-%s-%s' % (self.prefix, self.num, pin) in self._acids: curr += 1 yield pin # PIN already in use def _getStoragePath(self): """Get the directory, where we store all batch-related CSV files. """ site = grok.getSite() storagepath = site['datacenter'].storage ac_storage = os.path.join(storagepath, 'accesscodes') if not os.path.exists(ac_storage): os.mkdir(ac_storage) return ac_storage def entries(self): """Get all entries of this batch as generator. """ for x in self._entries: yield x def getAccessCode(self, ac_id): """Get the AccessCode with ID ``ac_id`` or ``KeyError``. """ return self._entries[self._acids[ac_id]] def addAccessCode(self, num, pin): """Add an access-code. """ ac = AccessCode(num, pin) ac.__parent__ = self self._entries.append(ac) self._acids.update({ac.representation: num}) return def invalidate(self, ac_id, student_id=None): """Invalidate the AC with ID ``ac_id``. """ num = self._acids[ac_id] self.invalidated_num += 1 def disable(self, ac_id, user_id=None): """Disable the AC with ID ``ac_id``. ``user_id`` is the user ID of the user triggering the process. Already disabled ACs are left untouched. """ num = self._acids[ac_id] ac = self.getAccessCode(ac_id) if ac._disabled == True: return self.disabled_num += 1 def enable(self, ac_id): """(Re-)enable the AC with ID ``ac_id``. This leaves the given AC in state ``unused``. Already enabled ACs are left untouched. """ num = self._acids[ac_id] self.disabled_num -= 1 def createCSVLogFile(self): """Create a CSV file with data in batch. Data will not contain invalidation date nor student ids. File will be created in ``accesscodes`` subdir of data center storage path. Returns name of created file. """ date = self.creation_date.strftime('%Y_%m_%d_%H_%M_%S') ac_storage = self._getStoragePath() csv_path = os.path.join( ac_storage, '%s-%s-%s-%s.csv' % ( self.prefix, self.num, date, self.creator) ) writer = csv.writer(open(csv_path, 'w'), quoting=csv.QUOTE_ALL) writer.writerow(['serial', 'ac', 'cost']) writer.writerow([self.prefix, str(self.num), "%0.2f" % self.cost]) for value in self._entries: writer.writerow( [str(value.batch_serial), str(value.representation)] ) site = grok.getSite() logger = site.logger logger.info( "Created batch %s-%s" % (self.prefix, self.num)) logger.info( "Written batch CSV to %s" % csv_path) return os.path.basename(csv_path) def archive(self): """Create a CSV file for archive. """ ac_storage = self._getStoragePath() now = datetime.now() timestamp = now.strftime('%Y_%m_%d_%H_%M_%S') csv_path = os.path.join( ac_storage, '%s-%s_archive-%s-%s.csv' % ( self.prefix, self.num, timestamp, self.creator) ) writer = csv.writer(open(csv_path, 'w'), quoting=csv.QUOTE_ALL) writer.writerow(['prefix', 'serial', 'ac', 'date']) writer.writerow([self.prefix, '%0.2f' % self.cost, str(self.num), str(self.entry_num)]) for value in self._entries: writer.writerow([ self.prefix, value.batch_serial, value.representation, ]) return os.path.basename(csv_path) def search(self, searchterm, searchtype): if searchtype == 'serial': if len(self._entries) < searchterm + 1: return [] return [self._entries[searchterm]] if searchtype == 'pin': try: entry = self.getAccessCode(searchterm) return [entry] except KeyError: return [] class AccessCodeBatchContainer(grok.Container): grok.implements(IAccessCodeBatchContainer) def _getStoragePath(self): """Get the directory, where batch import files are stored. """ site = grok.getSite() storagepath = site['datacenter'].storage ac_storage = os.path.join(storagepath, 'accesscodes') import_path = os.path.join(ac_storage, 'imports') if not os.path.exists(import_path): os.mkdir(import_path) return import_path def addBatch(self, batch): """Add a batch. """ batch.num = self.getNum(batch.prefix) key = "%s-%s" % (batch.prefix, batch.num) self[key] = batch self._p_changed = True def createBatch(self, creation_date, creator, batch_prefix, cost, entry_num): """Create and add a batch. """ batch_num = self.getNum(batch_prefix) batch = AccessCodeBatch(creation_date, creator, batch_prefix, cost, entry_num, num=batch_num) self.addBatch(batch) return batch def getNum(self, prefix): """Get next unused num for given prefix. """ num = 1 while self.get('%s-%s' % (prefix, num), None) is not None: num += 1 return num def getImportFiles(self): """Return a generator with basenames of available import files. """ path = self._getStoragePath() for filename in sorted(os.listdir(path)): yield filename def reimport(self, filename, creator=u'UNKNOWN'): """Reimport a batch given in CSV file. CSV file must be of format as generated by createCSVLogFile method. """ path = os.path.join(self._getStoragePath(), filename) reader = csv.DictReader(open(path, 'rb'), quoting=csv.QUOTE_ALL) entry = reader.next() cost = float(entry['cost']) num = int(entry['ac']) batch_name = '%s-%s' % (entry['serial'], num) if batch_name in self.keys(): raise KeyError('Batch already exists: %s' % batch_name) batch = AccessCodeBatch( datetime.now(), creator, entry['serial'], cost, 0, num=num) num_entries = 0 for row in reader: pin = row['ac'] serial = int(row['serial']) rand_num = pin.rsplit('-', 1)[-1] batch.addAccessCode(serial, rand_num) num_entries += 1 batch.entry_num = num_entries self[batch_name] = batch batch.createCSVLogFile() return def search(self, searchterm, searchtype, ): """Look for access-codes that comply with the given params. """ results = [] if searchtype == 'serial': try: searchterm = int(searchterm) except: return [] for batchname in self.keys(): part_result = self[batchname].search(searchterm, searchtype) results.extend(part_result) return results def getAccessCode(self, ac_id): """Get the AccessCode with ID ``ac_id`` or ``KeyError``. """ for batchname in self.keys(): batch = self[batchname] try: return batch.getAccessCode(ac_id) except KeyError: continue return None def disable(self, ac_id, user_id): """Disable the AC with ID ``ac_id``. ``user_id`` is the user ID of the user triggering the process. Already disabled ACs are left untouched. """ ac = self.getAccessCode(ac_id) if ac is None: return ac.__parent__.disable(ac_id, user_id) return def enable(self, ac_id): """(Re-)enable the AC with ID ``ac_id``. This leaves the given AC in state ``unused``. Already enabled ACs are left untouched. """ ac = self.getAccessCode(ac_id) if ac is None: return ac.__parent__.enable(ac_id) return def invalidate(self, ac_id): """Invalidate the AC with ID ``ac_id``. """ ac = self.getAccessCode(ac_id) if ac is None: return ac.__parent__.invalidate(ac_id) return class AccessCodePlugin(grok.GlobalUtility): grok.name('accesscodes') grok.implements(IWAeUPSIRPPluggable) def setup(self, site, name, logger): site['accesscodes'] = AccessCodeBatchContainer() logger.info('Installed container for access code batches.') return def update(self, site, name, logger): if not 'accesscodes' in site.keys(): logger.info('Updating site at %s. Installing access codes.' % ( site,)) self.setup(site, name, logger) else: logger.info( 'AccessCodePlugin: Updating site at %s: Nothing to do.' % ( site, )) return def get_access_code(access_code): """Get an access code instance. An access code here is a string like ``PUDE-1-1234567890``. Returns ``None`` if the given code cannot be found. This is a convenicence function that is faster than looking up a batch container for the approriate access code. """ site = grok.getSite() if not isinstance(access_code, basestring): return None try: batch_id, ac_id = access_code.rsplit('-', 1) except: return None batch = site['accesscodes'].get(batch_id, None) if batch is None: return None try: code = batch.getAccessCode(access_code) except KeyError: return None return code def fire_transition(access_code, arg, toward=False): """Fire workflow transition for access code. The access code instance is looked up via `access_code` (a string like ``APP-1-12345678``). `arg` tells what kind of transition to trigger. This will be a transition id like ``'use'`` or ``'init'``, or some transition target like :var:`waeup.sirp.accesscodes.workflow.INITIALIZED`. If `toward` is ``False`` (the default) you have to pass a transition id as `arg`, otherwise you must give a transition target. :func:`fire_transition` might raise exceptions depending on the reason why the requested transition cannot be performed. The following exceptions can occur during processing: :exc:`KeyError`: signals not existent access code, batch or site. :exc:`ValueError`: signals illegal format of `access_code` string. The regular format is ``APP-N-XXXXXXXX``. :exc:`hurry.workflow.interfaces.InvalidTransitionError`: the transition requested cannot be performed because the workflow rules forbid it. :exc:`Unauthorized`: the current user is not allowed to perform the requested transition. """ try: batch_id, ac_id = access_code.rsplit('-', 1) except ValueError: raise ValueError( 'Invalid access code format: %s (use: APP-N-XXXXXXXX)' % ( access_code,)) try: ac = grok.getSite()['accesscodes'][batch_id].getAccessCode(access_code) except TypeError: raise KeyError( 'No site available for looking up accesscodes') info = IWorkflowInfo(ac) if toward: info.fireTransitionToward(arg) else: info.fireTransition(arg) return True def invalidate_accesscode(access_code): """Invalidate AccessCode denoted by string ``access_code``. Fires an appropriate transition to perform the task. See :func:`fire_transition` for possible exceptions and their meanings. """ return fire_transition(access_code, 'use') def disable_accesscode(access_code): """Disable AccessCode denoted by string ``access_code``. Fires an appropriate transition to perform the task. See :func:`fire_transition` for possible exceptions and their meanings. """ return fire_transition(access_code, DISABLED, toward=True) def reenable_accesscode(access_code): """Reenable AccessCode denoted by string ``access_code``. Fires an appropriate transition to perform the task. See :func:`fire_transition` for possible exceptions and their meanings. """ return fire_transition(access_code, 'reenable')