"""WAeUP components for batch processing. Batch processors eat CSV files to add, update or remove large numbers of certain kinds of objects at once. """ import grok import copy import csv import os import sys import tempfile import time from zope.component import createObject from zope.interface import Interface from zope.schema import getFields from zope.schema.interfaces import IText from waeup.sirp.interfaces import (IBatchProcessor, IFacultyContainer, IFaculty, ICourse, IDepartment, ICertificate, ICertificateCourse, ISchemaTypeConverter, FatalCSVError) class BatchProcessor(grok.GlobalUtility): """A processor to add, update, or remove data. This is a non-active baseclass. """ grok.provides(IBatchProcessor) grok.context(Interface) grok.baseclass() # Name used in pages and forms... name = u'Department importer' # Internal name... util_name = 'departmentimporter' # Items for this processor need an interface with zope.schema fields. iface = IDepartment # The name must be the same as the util_name attribute in order to # register this utility correctly. grok.name(util_name) # Headers needed to locate items... location_fields = ['code', 'faculty_code'] # A factory with this name must be registered... factory_name = 'waeup.Department' @property def required_fields(self): """Required fields that have no default. A list of names of field, whose value cannot be set if not given during creation. Therefore these fields must exist in input. Fields with a default != missing_value do not belong to this category. """ result = [] for key, field in getFields(self.iface).items(): if key in self.location_fields: continue if field.default is not field.missing_value: continue if field.required: result.append(key) return result @property def req(self): result = dict( create = self.location_fields + self.required_fields, update = self.location_fields, remove = self.location_fields, ) return result @property def available_fields(self): result = [] return sorted(list(set( self.location_fields + getFields(self.iface).keys()))) def getHeaders(self, mode='create'): return self.available_fields def checkHeaders(self, headerfields, mode='create'): req = self.req[mode] # Check for required fields... for field in req: if not field in headerfields: raise FatalCSVError( "Need at least columns %s for import!" % ', '.join(["'%s'" % x for x in req])) # Check for double fields... not_ignored_fields = [x for x in headerfields if not x.startswith('--')] if len(set(not_ignored_fields)) < len(not_ignored_fields): raise FatalCSVError( "Double headers: each column name may only appear once.") return True def applyMapping(self, row, mapping): """Apply mapping to a row of CSV data. """ result = dict() for key, replacement in mapping.items(): result[replacement] = row[key] return result def getMapping(self, path, headerfields, mode): """Get a mapping from CSV file headerfields to actually used fieldnames. """ result = dict() reader = csv.reader(open(path, 'rb')) raw_header = reader.next() for num, field in enumerate(headerfields): if field not in self.location_fields and mode == 'remove': # Ignore non-location fields when removing... field = '--IGNORE--' result[raw_header[num]] = field return result def getFieldConverters(self, fieldnames): """Get converters for fieldnames. """ result = dict() for key, field in getFields(self.iface).items(): if key not in fieldnames: continue converter = ISchemaTypeConverter(field) result[key] = converter return result def convertToTypes(self, row, converter_dict): """Convert values in given row to destination type. """ if '--IGNORE--' in row.keys(): del row['--IGNORE--'] warnings = [] for key, value in row.items(): converter = converter_dict.get(key, None) if converter: try: row.update({key:converter_dict[key].fromString(value)}) except: msg = "conversion error: field %s: %r" % ( key, sys.exc_info()[1]) warnings.append(msg) return (row, warnings) def callFactory(self, *args, **kw): return createObject(self.factory_name) def parentsExist(self, row, site): """Tell whether the parent object for data in ``row`` exists. """ raise NotImplementedError('method not implemented') def entryExists(self, row, site): """Tell whether there already exists an entry for ``row`` data. """ raise NotImplementedError('method not implemented') def getParent(self, row, site): """Get the parent object for the entry in ``row``. """ raise NotImplementedError('method not implemented') def addEntry(self, obj, row, site): """Add the entry given given by ``row`` data. """ raise NotImplementedError('method not implemented') def delEntry(self, row, site): """Delete entry given by ``row`` data. """ raise NotImplementedError('method not implemented') def updateEntry(self, obj, row, site): """Update obj to the values given in row. """ for key, value in row.items(): setattr(obj, key, value) return def createLogfile(self, path, fail_path, num, warnings, mode, user, timedelta, logger=None): """Write to log file. """ if logger is None: return status = 'OK' if warnings > 0: status = 'FAILED' logger.info("-" * 20) logger.info("%s: Batch processing finished: %s" % (user, status)) logger.info("%s: Source: %s" % (user, path)) logger.info("%s: Mode: %s" % (user, mode)) logger.info("%s: User: %s" % (user, user)) if warnings > 0: logger.info("%s: Failed datasets: %s" % ( user, os.path.basename(fail_path))) logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % ( user, timedelta, timedelta/(num or 1))) logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % ( user, num, num - warnings, warnings )) logger.info("-" * 20) return def writeFailedRow(self, writer, row, warnings): """Write a row with error messages to error CSV. If warnings is a list of strings, they will be concatenated. """ error_col = warnings if isinstance(warnings, list): error_col = ' / '.join(warnings) row['--ERRORS--'] = error_col writer.writerow(row) return def doImport(self, path, headerfields, mode='create', user='Unknown', logger=None): """Perform actual import. """ time_start = time.time() self.checkHeaders(headerfields, mode) mapping = self.getMapping(path, headerfields, mode) converters = self.getFieldConverters(headerfields) reader = csv.DictReader(open(path, 'rb')) temp_dir = tempfile.mkdtemp() (base, ext) = os.path.splitext(path) failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext)) failed_headers = mapping.keys() failed_headers.append('--ERRORS--') failed_writer = csv.DictWriter(open(failed_path, 'wb'), failed_headers) first_row = mapping.items() first_row.append(("--ERRORS--", "--ERRORS--"),) failed_writer.writerow(dict(first_row)) finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext)) finished_headers = [x for x in mapping.values()] finished_writer = csv.DictWriter(open(finished_path, 'wb'), finished_headers) finished_writer.writerow(dict([(x,x) for x in finished_headers])) num =0 num_warns = 0 site = grok.getSite() for raw_row in reader: num += 1 string_row = self.applyMapping(raw_row, mapping) row, conv_warnings = self.convertToTypes( copy.deepcopy(string_row), converters) if len(conv_warnings): num_warns += 1 self.writeFailedRow(failed_writer, raw_row, conv_warnings) continue if mode == 'create': if not self.parentsExist(row, site): num_warns += 1 self.writeFailedRow( failed_writer, raw_row, "Not all parents do exist yet. Skipping") continue if self.entryExists(row, site): num_warns += 1 self.writeFailedRow( failed_writer, raw_row, "This object already exists. Skipping.") continue obj = self.callFactory() for key, value in row.items(): setattr(obj, key, value) self.addEntry(obj, row, site) elif mode == 'remove': if not self.entryExists(row, site): num_warns += 1 self.writeFailedRow( failed_writer, raw_row, "Cannot remove: no such entry.") continue self.delEntry(row, site) elif mode == 'update': obj = self.getEntry(row, site) if obj is None: num_warns += 1 self.writeFailedRow( failed_writer, raw_row, "Cannot update: no such entry.") continue self.updateEntry(obj, row, site) finished_writer.writerow(string_row) time_end = time.time() timedelta = time_end - time_start self.createLogfile(path, failed_path, num, num_warns, mode, user, timedelta, logger=logger) failed_path = os.path.abspath(failed_path) if num_warns == 0: del failed_writer os.unlink(failed_path) failed_path = None return (num, num_warns, os.path.abspath(finished_path), failed_path) class FacultyProcessor(BatchProcessor): """A batch processor for IFaculty objects. """ grok.provides(IBatchProcessor) grok.context(Interface) util_name = 'facultyimporter' grok.name(util_name) name = u'Faculty importer' iface = IFaculty location_fields = ['code',] factory_name = 'waeup.Faculty' def parentsExist(self, row, site): return 'faculties' in site.keys() def entryExists(self, row, site): return row['code'] in site['faculties'].keys() def getParent(self, row, site): return site['faculties'] def getEntry(self, row, site): if not self.entryExists(row, site): return None parent = self.getParent(row, site) return parent.get(row['code']) def addEntry(self, obj, row, site): parent = self.getParent(row, site) parent.addFaculty(obj) return def delEntry(self, row, site): parent = self.getParent(row, site) del parent[row['code']] pass class DepartmentProcessor(BatchProcessor): """A batch processor for IDepartment objects. """ grok.provides(IBatchProcessor) grok.context(Interface) util_name = 'departmentimporter' grok.name(util_name) name = u'Department importer' iface = IDepartment location_fields = ['code', 'faculty_code'] factory_name = 'waeup.Department' def parentsExist(self, row, site): if not 'faculties' in site.keys(): return False return row['faculty_code'] in site['faculties'] def entryExists(self, row, site): if not self.parentsExist(row, site): return False parent = self.getParent(row, site) return row['code'] in parent.keys() def getParent(self, row, site): return site['faculties'][row['faculty_code']] def getEntry(self, row, site): if not self.entryExists(row, site): return None parent = self.getParent(row, site) return parent.get(row['code']) def addEntry(self, obj, row, site): parent = self.getParent(row, site) parent.addDepartment(obj) return def delEntry(self, row, site): parent = self.getParent(row, site) del parent[row['code']] return class CourseProcessor(BatchProcessor): """A batch processor for ICourse objects. """ grok.provides(IBatchProcessor) grok.context(Interface) util_name = 'courseimporter' grok.name(util_name) name = u'Course importer' iface = ICourse location_fields = ['code', 'faculty_code', 'department_code'] factory_name = 'waeup.Course' def parentsExist(self, row, site): if not 'faculties' in site.keys(): return False if not row['faculty_code'] in site['faculties'].keys(): return False faculty = site['faculties'][row['faculty_code']] return row['department_code'] in faculty.keys() def entryExists(self, row, site): if not self.parentsExist(row, site): return False parent = self.getParent(row, site) return row['code'] in parent.keys() def getParent(self, row, site): dept = site['faculties'][row['faculty_code']][row['department_code']] return dept.courses def getEntry(self, row, site): if not self.entryExists(row, site): return None parent = self.getParent(row, site) return parent.get(row['code']) def addEntry(self, obj, row, site): parent = self.getParent(row, site) parent.addCourse(obj) return def delEntry(self, row, site): parent = self.getParent(row, site) del parent[row['code']] return class CertificateProcessor(BatchProcessor): """A batch processor for ICertificate objects. """ grok.provides(IBatchProcessor) grok.context(Interface) util_name = 'certificateimporter' grok.name(util_name) name = u'Certificate importer' iface = ICertificate location_fields = ['code', 'faculty_code', 'department_code'] factory_name = 'waeup.Certificate' def parentsExist(self, row, site): if not 'faculties' in site.keys(): return False if not row['faculty_code'] in site['faculties'].keys(): return False faculty = site['faculties'][row['faculty_code']] return row['department_code'] in faculty.keys() def entryExists(self, row, site): if not self.parentsExist(row, site): return False parent = self.getParent(row, site) return row['code'] in parent.keys() def getParent(self, row, site): dept = site['faculties'][row['faculty_code']][row['department_code']] return dept.certificates def getEntry(self, row, site): if not self.entryExists(row, site): return None parent = self.getParent(row, site) return parent.get(row['code']) def addEntry(self, obj, row, site): parent = self.getParent(row, site) parent.addCertificate(obj) return def delEntry(self, row, site): parent = self.getParent(row, site) del parent[row['code']] return class CertificateCourseProcessor(BatchProcessor): """A batch processor for ICertificateCourse objects. """ grok.provides(IBatchProcessor) grok.context(Interface) util_name = 'certificatecourseimporter' grok.name(util_name) name = u'CertificateCourse importer' iface = ICertificateCourse location_fields = ['course', 'level', 'faculty_code', 'department_code', 'certificate_code',] factory_name = 'waeup.CertificateCourse' def parentsExist(self, row, site): if not 'faculties' in site.keys(): return False if not row['faculty_code'] in site['faculties'].keys(): return False faculty = site['faculties'][row['faculty_code']] if not row['department_code'] in faculty.keys(): return False dept = faculty[row['department_code']] return row['certificate_code'] in dept.certificates.keys() def entryExists(self, row, site): if not self.parentsExist(row, site): return False parent = self.getParent(row, site) code = "%s_%s" % (row['course'].code, row['level']) return code in parent.keys() def getParent(self, row, site): dept = site['faculties'][row['faculty_code']][row['department_code']] return dept.certificates[row['certificate_code']] def getEntry(self, row, site): if not self.entryExists(row, site): return None parent = self.getParent(row, site) return parent.get(row['course']) def addEntry(self, obj, row, site): parent = self.getParent(row, site) parent.addCourseRef(row['course'], row['level'], row['core_or_elective']) return def delEntry(self, row, site): parent = self.getParent(row, site) parent.delCourseRef(row['course'].code, row['level']) return