source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 4990

Last change on this file since 4990 was 4984, checked in by uli, 15 years ago

Add docs.

File size: 18.1 KB
RevLine 
[4806]1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
[4870]7import copy
[4806]8import csv
[4821]9import os
10import sys
[4900]11import tempfile
[4821]12import time
[4806]13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from zope.schema.interfaces import IText
[4920]17from waeup.sirp.interfaces import (IBatchProcessor, IFacultyContainer, IFaculty,
[4831]18                              ICourse, IDepartment, ICertificate,
[4832]19                              ICertificateCourse, ISchemaTypeConverter,
20                              FatalCSVError)
[4806]21
22class BatchProcessor(grok.GlobalUtility):
23    """A processor to add, update, or remove data.
24
25    This is a non-active baseclass.
26    """
[4831]27    grok.provides(IBatchProcessor)
[4806]28    grok.context(Interface)
29    grok.baseclass()
30
31    # Name used in pages and forms...
32    name = u'Department importer'
33   
34    # Internal name...
35    util_name = 'departmentimporter'
36   
37    # Items for this processor need an interface with zope.schema fields.
38    iface = IDepartment
39   
40    # The name must be the same as the util_name attribute in order to
41    # register this utility correctly.
42    grok.name(util_name)
43
44    # Headers needed to locate items...
45    location_fields = ['code', 'faculty_code']
46   
47    # A factory with this name must be registered...
48    factory_name = 'waeup.Department'
49
50    @property
51    def required_fields(self):
[4829]52        """Required fields that have no default.
53
54        A list of names of field, whose value cannot be set if not
55        given during creation. Therefore these fields must exist in
56        input.
57
58        Fields with a default != missing_value do not belong to this
59        category.
60        """
[4806]61        result = []
62        for key, field in getFields(self.iface).items():
63            if key in self.location_fields:
64                continue
[4829]65            if field.default is not field.missing_value:
66                continue
[4806]67            if field.required:
68                result.append(key)
69        return result
70       
71    @property
72    def req(self):
73        result = dict(
74            create = self.location_fields + self.required_fields,
75            update = self.location_fields,
76            remove = self.location_fields,
77        )
78        return result
79
80    @property
81    def available_fields(self):
82        result = []
83        return sorted(list(set(
84                    self.location_fields + getFields(self.iface).keys())))
85   
86    def getHeaders(self, mode='create'):
87        return self.available_fields
88
89    def checkHeaders(self, headerfields, mode='create'):
90        req = self.req[mode]
91        # Check for required fields...
92        for field in req:
93            if not field in headerfields:
94                raise FatalCSVError(
95                    "Need at least columns %s for import!" %
96                    ', '.join(["'%s'" % x for x in req]))
97        # Check for double fields...
98        not_ignored_fields = [x for x in headerfields
99                              if not x.startswith('--')]
100        if len(set(not_ignored_fields)) < len(not_ignored_fields):
101            raise FatalCSVError(
102                "Double headers: each column name may only appear once.")
103        return True
104
105    def applyMapping(self, row, mapping):
[4811]106        """Apply mapping to a row of CSV data.
107        """
[4806]108        result = dict()
109        for key, replacement in mapping.items():
110            result[replacement] = row[key]
111        return result
112       
[4832]113    def getMapping(self, path, headerfields, mode):
[4811]114        """Get a mapping from CSV file headerfields to actually used
115           fieldnames.
116        """
[4832]117        result = dict()
[4806]118        reader = csv.reader(open(path, 'rb'))
119        raw_header = reader.next()
[4832]120        for num, field in enumerate(headerfields):
121            if field not in self.location_fields and mode == 'remove':
122                # Ignore non-location fields when removing...
123                field = '--IGNORE--'
124            result[raw_header[num]] = field
125        return result
[4806]126
127    def getFieldConverters(self, fieldnames):
[4811]128        """Get converters for fieldnames.
129        """
[4806]130        result = dict()
131        for key, field in getFields(self.iface).items():
132            if key not in fieldnames:
133                continue
134            converter = ISchemaTypeConverter(field)
135            result[key] = converter
136        return result
137   
138    def convertToTypes(self, row, converter_dict):
[4811]139        """Convert values in given row to destination type.
140        """
[4806]141        if '--IGNORE--' in row.keys():
142            del row['--IGNORE--']
[4821]143        warnings = []
[4806]144        for key, value in row.items():
145            converter = converter_dict.get(key, None)
146            if converter:
[4821]147                try:
148                    row.update({key:converter_dict[key].fromString(value)})
149                except:
150                    msg = "conversion error: field %s: %r" % (
151                        key, sys.exc_info()[1])
152                    warnings.append(msg)
153        return (row, warnings)
[4806]154
155    def callFactory(self, *args, **kw):
156        return createObject(self.factory_name)
157
158    def parentsExist(self, row, site):
[4811]159        """Tell whether the parent object for data in ``row`` exists.
160        """
[4806]161        raise NotImplementedError('method not implemented')
162
163    def entryExists(self, row, site):
[4811]164        """Tell whether there already exists an entry for ``row`` data.
165        """
[4806]166        raise NotImplementedError('method not implemented')
167
168    def getParent(self, row, site):
[4811]169        """Get the parent object for the entry in ``row``.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172   
173    def addEntry(self, obj, row, site):
[4811]174        """Add the entry given given by ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def delEntry(self, row, site):
[4811]179        """Delete entry given by ``row`` data.
180        """
[4806]181        raise NotImplementedError('method not implemented')       
182
183    def updateEntry(self, obj, row, site):
[4984]184        """Update obj to the values given in row.
185        """
[4829]186        for key, value in row.items():
187            setattr(obj, key, value)
188        return
[4821]189
[4832]190    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]191                      timedelta, logger=None):
192        """Write to log file.
[4821]193        """
[4885]194        if logger is None:
195            return
196        status = 'OK'
197        if warnings > 0:
198            status = 'FAILED'
199        logger.info("-" * 20)
200        logger.info("%s: Batch processing finished: %s" % (user, status))
201        logger.info("%s: Source: %s" % (user, path))
202        logger.info("%s: Mode: %s" % (user, mode))
203        logger.info("%s: User: %s" % (user, user))
204        if warnings > 0:
[4900]205            logger.info("%s: Failed datasets: %s" % (
206                    user, os.path.basename(fail_path)))
[4885]207        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
208                user, timedelta, timedelta/(num or 1)))
209        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
210                user, num, num - warnings, warnings
[4821]211                ))
[4885]212        logger.info("-" * 20)
[4821]213        return
[4877]214
215    def writeFailedRow(self, writer, row, warnings):
216        """Write a row with error messages to error CSV.
217
218        If warnings is a list of strings, they will be concatenated.
219        """
220        error_col = warnings
221        if isinstance(warnings, list):
222            error_col = ' / '.join(warnings)
223        row['--ERRORS--'] = error_col
224        writer.writerow(row)
225        return
[4806]226   
[4885]227    def doImport(self, path, headerfields, mode='create', user='Unknown',
228                 logger=None):
[4811]229        """Perform actual import.
230        """
[4832]231        time_start = time.time()
[4806]232        self.checkHeaders(headerfields, mode)
[4832]233        mapping = self.getMapping(path, headerfields, mode)
[4806]234        converters = self.getFieldConverters(headerfields)
235        reader = csv.DictReader(open(path, 'rb'))
[4889]236
[4900]237        temp_dir = tempfile.mkdtemp()
238       
[4889]239        (base, ext) = os.path.splitext(path)
[4900]240        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[4911]241        failed_headers = mapping.keys()
[4877]242        failed_headers.append('--ERRORS--')
[4821]243        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
244                                       failed_headers)
[4911]245        first_row = mapping.items()
246        first_row.append(("--ERRORS--", "--ERRORS--"),)
247        failed_writer.writerow(dict(first_row))
[4891]248
[4900]249        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[4891]250        finished_headers = [x for x in mapping.values()]
251        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
252                                         finished_headers)
253        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[4821]254       
[4806]255        num =0
[4878]256        num_warns = 0
[4806]257        site = grok.getSite()
258        for raw_row in reader:
259            num += 1
260            string_row = self.applyMapping(raw_row, mapping)
[4870]261            row, conv_warnings = self.convertToTypes(
262                copy.deepcopy(string_row), converters)
[4821]263            if len(conv_warnings):
[4878]264                num_warns += 1
[4911]265                self.writeFailedRow(failed_writer, raw_row, conv_warnings)
[4821]266                continue
267               
[4806]268            if mode == 'create':
269                if not self.parentsExist(row, site):
[4878]270                    num_warns += 1
[4877]271                    self.writeFailedRow(
[4911]272                        failed_writer, raw_row,
[4877]273                        "Not all parents do exist yet. Skipping")
[4806]274                    continue
275                if self.entryExists(row, site):
[4878]276                    num_warns += 1
[4877]277                    self.writeFailedRow(
[4911]278                        failed_writer, raw_row,
[4877]279                        "This object already exists. Skipping.")
[4806]280                    continue
281                obj = self.callFactory()
282                for key, value in row.items():
283                    setattr(obj, key, value)
284                self.addEntry(obj, row, site)
285            elif mode == 'remove':
286                if not self.entryExists(row, site):
[4878]287                    num_warns += 1
[4877]288                    self.writeFailedRow(
[4911]289                        failed_writer, raw_row,
[4877]290                        "Cannot remove: no such entry.")
[4806]291                    continue
292                self.delEntry(row, site)
293            elif mode == 'update':
294                obj = self.getEntry(row, site)
295                if obj is None:
[4878]296                    num_warns += 1
[4877]297                    self.writeFailedRow(
[4911]298                        failed_writer, raw_row,
[4877]299                        "Cannot update: no such entry.")
[4806]300                    continue
301                self.updateEntry(obj, row, site)
[4891]302            finished_writer.writerow(string_row)
[4821]303
[4832]304        time_end = time.time()
305        timedelta = time_end - time_start
306       
[4878]307        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]308                           timedelta, logger=logger)
[4894]309        failed_path = os.path.abspath(failed_path)
[4878]310        if num_warns == 0:
[4821]311            del failed_writer
312            os.unlink(failed_path)
[4894]313            failed_path = None
314        return (num, num_warns,
315                os.path.abspath(finished_path), failed_path)
[4806]316
[4844]317class FacultyProcessor(BatchProcessor):
[4829]318    """A batch processor for IFaculty objects.
319    """
[4831]320    grok.provides(IBatchProcessor)
[4806]321    grok.context(Interface)
322    util_name = 'facultyimporter'
323    grok.name(util_name)
324
325    name = u'Faculty importer'
326    iface = IFaculty
327
328    location_fields = ['code',]
329    factory_name = 'waeup.Faculty'
330
331    def parentsExist(self, row, site):
332        return 'faculties' in site.keys()
333
334    def entryExists(self, row, site):
335        return row['code'] in site['faculties'].keys()
336
337    def getParent(self, row, site):
338        return site['faculties']
339
340    def getEntry(self, row, site):
341        if not self.entryExists(row, site):
342            return None
343        parent = self.getParent(row, site)
344        return parent.get(row['code'])
345   
346    def addEntry(self, obj, row, site):
347        parent = self.getParent(row, site)
348        parent.addFaculty(obj)
349        return
350
351    def delEntry(self, row, site):
352        parent = self.getParent(row, site)
353        del parent[row['code']]
354        pass
355
[4844]356class DepartmentProcessor(BatchProcessor):
[4829]357    """A batch processor for IDepartment objects.
358    """
[4831]359    grok.provides(IBatchProcessor)
[4806]360    grok.context(Interface)
361    util_name = 'departmentimporter'
362    grok.name(util_name)
363
364    name = u'Department importer'
365    iface = IDepartment
366
367    location_fields = ['code', 'faculty_code']
368    factory_name = 'waeup.Department'
369
370    def parentsExist(self, row, site):
371        if not 'faculties' in site.keys():
372            return False
373        return row['faculty_code'] in site['faculties']
374
375    def entryExists(self, row, site):
376        if not self.parentsExist(row, site):
377            return False
378        parent = self.getParent(row, site)
379        return row['code'] in parent.keys()
380
381    def getParent(self, row, site):
382        return site['faculties'][row['faculty_code']]
383
384    def getEntry(self, row, site):
385        if not self.entryExists(row, site):
386            return None
387        parent = self.getParent(row, site)
388        return parent.get(row['code'])
389   
390    def addEntry(self, obj, row, site):
391        parent = self.getParent(row, site)
392        parent.addDepartment(obj)
393        return
394
395    def delEntry(self, row, site):
396        parent = self.getParent(row, site)
397        del parent[row['code']]
[4821]398        return
[4806]399
[4844]400class CourseProcessor(BatchProcessor):
[4829]401    """A batch processor for ICourse objects.
402    """
[4831]403    grok.provides(IBatchProcessor)
[4821]404    grok.context(Interface)
405    util_name = 'courseimporter'
406    grok.name(util_name)
407
408    name = u'Course importer'
409    iface = ICourse
410
411    location_fields = ['code', 'faculty_code', 'department_code']
412    factory_name = 'waeup.Course'
413
414    def parentsExist(self, row, site):
415        if not 'faculties' in site.keys():
416            return False
417        if not row['faculty_code'] in site['faculties'].keys():
418            return False
419        faculty = site['faculties'][row['faculty_code']]
420        return row['department_code'] in faculty.keys()
421
422    def entryExists(self, row, site):
423        if not self.parentsExist(row, site):
424            return False
425        parent = self.getParent(row, site)
426        return row['code'] in parent.keys()
427
428    def getParent(self, row, site):
429        dept = site['faculties'][row['faculty_code']][row['department_code']]
430        return dept.courses
431
432    def getEntry(self, row, site):
433        if not self.entryExists(row, site):
434            return None
435        parent = self.getParent(row, site)
436        return parent.get(row['code'])
437   
438    def addEntry(self, obj, row, site):
439        parent = self.getParent(row, site)
440        parent.addCourse(obj)
441        return
442
443    def delEntry(self, row, site):
444        parent = self.getParent(row, site)
445        del parent[row['code']]
446        return
447
[4844]448class CertificateProcessor(BatchProcessor):
[4829]449    """A batch processor for ICertificate objects.
450    """
[4831]451    grok.provides(IBatchProcessor)
[4829]452    grok.context(Interface)
453    util_name = 'certificateimporter'
454    grok.name(util_name)
455
456    name = u'Certificate importer'
457    iface = ICertificate
458
459    location_fields = ['code', 'faculty_code', 'department_code']
460    factory_name = 'waeup.Certificate'
461
462    def parentsExist(self, row, site):
463        if not 'faculties' in site.keys():
464            return False
465        if not row['faculty_code'] in site['faculties'].keys():
466            return False
467        faculty = site['faculties'][row['faculty_code']]
468        return row['department_code'] in faculty.keys()
469
470    def entryExists(self, row, site):
471        if not self.parentsExist(row, site):
472            return False
473        parent = self.getParent(row, site)
474        return row['code'] in parent.keys()
475
476    def getParent(self, row, site):
477        dept = site['faculties'][row['faculty_code']][row['department_code']]
478        return dept.certificates
479
480    def getEntry(self, row, site):
481        if not self.entryExists(row, site):
482            return None
483        parent = self.getParent(row, site)
484        return parent.get(row['code'])
485
486    def addEntry(self, obj, row, site):
487        parent = self.getParent(row, site)
488        parent.addCertificate(obj)
[4821]489        return
490
[4829]491    def delEntry(self, row, site):
492        parent = self.getParent(row, site)
493        del parent[row['code']]
494        return
[4832]495
496class CertificateCourseProcessor(BatchProcessor):
497    """A batch processor for ICertificateCourse objects.
498    """
499    grok.provides(IBatchProcessor)
500    grok.context(Interface)
501    util_name = 'certificatecourseimporter'
502    grok.name(util_name)
503
504    name = u'CertificateCourse importer'
505    iface = ICertificateCourse
506
507    location_fields = ['course', 'level', 'faculty_code', 'department_code',
508                       'certificate_code',]
509    factory_name = 'waeup.CertificateCourse'
510
511    def parentsExist(self, row, site):
512        if not 'faculties' in site.keys():
513            return False
514        if not row['faculty_code'] in site['faculties'].keys():
515            return False
516        faculty = site['faculties'][row['faculty_code']]
517        if not row['department_code'] in faculty.keys():
518            return False
519        dept = faculty[row['department_code']]
520        return row['certificate_code'] in dept.certificates.keys()
521
522    def entryExists(self, row, site):
523        if not self.parentsExist(row, site):
524            return False
525        parent = self.getParent(row, site)
526        code = "%s_%s" % (row['course'].code, row['level'])
527        return code in parent.keys()
528
529    def getParent(self, row, site):
530        dept = site['faculties'][row['faculty_code']][row['department_code']]
531        return dept.certificates[row['certificate_code']]
532
533    def getEntry(self, row, site):
534        if not self.entryExists(row, site):
535            return None
536        parent = self.getParent(row, site)
537        return parent.get(row['course'])
538
539    def addEntry(self, obj, row, site):
540        parent = self.getParent(row, site)
541        parent.addCourseRef(row['course'],
542                            row['level'], row['core_or_elective'])
543        return
544
545    def delEntry(self, row, site):
546        parent = self.getParent(row, site)
547        parent.delCourseRef(row['course'].code, row['level'])
548        return
Note: See TracBrowser for help on using the repository browser.