source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 4983

Last change on this file since 4983 was 4983, checked in by uli, 15 years ago

Rely on Python/ZODB weak references when updating entries.

File size: 18.0 KB
RevLine 
[4806]1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
[4870]7import copy
[4806]8import csv
[4821]9import os
10import sys
[4900]11import tempfile
[4821]12import time
[4806]13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from zope.schema.interfaces import IText
[4920]17from waeup.sirp.interfaces import (IBatchProcessor, IFacultyContainer, IFaculty,
[4831]18                              ICourse, IDepartment, ICertificate,
[4832]19                              ICertificateCourse, ISchemaTypeConverter,
20                              FatalCSVError)
[4806]21
22class BatchProcessor(grok.GlobalUtility):
23    """A processor to add, update, or remove data.
24
25    This is a non-active baseclass.
26    """
[4831]27    grok.provides(IBatchProcessor)
[4806]28    grok.context(Interface)
29    grok.baseclass()
30
31    # Name used in pages and forms...
32    name = u'Department importer'
33   
34    # Internal name...
35    util_name = 'departmentimporter'
36   
37    # Items for this processor need an interface with zope.schema fields.
38    iface = IDepartment
39   
40    # The name must be the same as the util_name attribute in order to
41    # register this utility correctly.
42    grok.name(util_name)
43
44    # Headers needed to locate items...
45    location_fields = ['code', 'faculty_code']
46   
47    # A factory with this name must be registered...
48    factory_name = 'waeup.Department'
49
50    @property
51    def required_fields(self):
[4829]52        """Required fields that have no default.
53
54        A list of names of field, whose value cannot be set if not
55        given during creation. Therefore these fields must exist in
56        input.
57
58        Fields with a default != missing_value do not belong to this
59        category.
60        """
[4806]61        result = []
62        for key, field in getFields(self.iface).items():
63            if key in self.location_fields:
64                continue
[4829]65            if field.default is not field.missing_value:
66                continue
[4806]67            if field.required:
68                result.append(key)
69        return result
70       
71    @property
72    def req(self):
73        result = dict(
74            create = self.location_fields + self.required_fields,
75            update = self.location_fields,
76            remove = self.location_fields,
77        )
78        return result
79
80    @property
81    def available_fields(self):
82        result = []
83        return sorted(list(set(
84                    self.location_fields + getFields(self.iface).keys())))
85   
86    def getHeaders(self, mode='create'):
87        return self.available_fields
88
89    def checkHeaders(self, headerfields, mode='create'):
90        req = self.req[mode]
91        # Check for required fields...
92        for field in req:
93            if not field in headerfields:
94                raise FatalCSVError(
95                    "Need at least columns %s for import!" %
96                    ', '.join(["'%s'" % x for x in req]))
97        # Check for double fields...
98        not_ignored_fields = [x for x in headerfields
99                              if not x.startswith('--')]
100        if len(set(not_ignored_fields)) < len(not_ignored_fields):
101            raise FatalCSVError(
102                "Double headers: each column name may only appear once.")
103        return True
104
105    def applyMapping(self, row, mapping):
[4811]106        """Apply mapping to a row of CSV data.
107        """
[4806]108        result = dict()
109        for key, replacement in mapping.items():
110            result[replacement] = row[key]
111        return result
112       
[4832]113    def getMapping(self, path, headerfields, mode):
[4811]114        """Get a mapping from CSV file headerfields to actually used
115           fieldnames.
116        """
[4832]117        result = dict()
[4806]118        reader = csv.reader(open(path, 'rb'))
119        raw_header = reader.next()
[4832]120        for num, field in enumerate(headerfields):
121            if field not in self.location_fields and mode == 'remove':
122                # Ignore non-location fields when removing...
123                field = '--IGNORE--'
124            result[raw_header[num]] = field
125        return result
[4806]126
127    def getFieldConverters(self, fieldnames):
[4811]128        """Get converters for fieldnames.
129        """
[4806]130        result = dict()
131        for key, field in getFields(self.iface).items():
132            if key not in fieldnames:
133                continue
134            converter = ISchemaTypeConverter(field)
135            result[key] = converter
136        return result
137   
138    def convertToTypes(self, row, converter_dict):
[4811]139        """Convert values in given row to destination type.
140        """
[4806]141        if '--IGNORE--' in row.keys():
142            del row['--IGNORE--']
[4821]143        warnings = []
[4806]144        for key, value in row.items():
145            converter = converter_dict.get(key, None)
146            if converter:
[4821]147                try:
148                    row.update({key:converter_dict[key].fromString(value)})
149                except:
150                    msg = "conversion error: field %s: %r" % (
151                        key, sys.exc_info()[1])
152                    warnings.append(msg)
153        return (row, warnings)
[4806]154
155    def callFactory(self, *args, **kw):
156        return createObject(self.factory_name)
157
158    def parentsExist(self, row, site):
[4811]159        """Tell whether the parent object for data in ``row`` exists.
160        """
[4806]161        raise NotImplementedError('method not implemented')
162
163    def entryExists(self, row, site):
[4811]164        """Tell whether there already exists an entry for ``row`` data.
165        """
[4806]166        raise NotImplementedError('method not implemented')
167
168    def getParent(self, row, site):
[4811]169        """Get the parent object for the entry in ``row``.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172   
173    def addEntry(self, obj, row, site):
[4811]174        """Add the entry given given by ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def delEntry(self, row, site):
[4811]179        """Delete entry given by ``row`` data.
180        """
[4806]181        raise NotImplementedError('method not implemented')       
182
183    def updateEntry(self, obj, row, site):
[4829]184        for key, value in row.items():
185            setattr(obj, key, value)
186        return
[4821]187
[4832]188    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]189                      timedelta, logger=None):
190        """Write to log file.
[4821]191        """
[4885]192        if logger is None:
193            return
194        status = 'OK'
195        if warnings > 0:
196            status = 'FAILED'
197        logger.info("-" * 20)
198        logger.info("%s: Batch processing finished: %s" % (user, status))
199        logger.info("%s: Source: %s" % (user, path))
200        logger.info("%s: Mode: %s" % (user, mode))
201        logger.info("%s: User: %s" % (user, user))
202        if warnings > 0:
[4900]203            logger.info("%s: Failed datasets: %s" % (
204                    user, os.path.basename(fail_path)))
[4885]205        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
206                user, timedelta, timedelta/(num or 1)))
207        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
208                user, num, num - warnings, warnings
[4821]209                ))
[4885]210        logger.info("-" * 20)
[4821]211        return
[4877]212
213    def writeFailedRow(self, writer, row, warnings):
214        """Write a row with error messages to error CSV.
215
216        If warnings is a list of strings, they will be concatenated.
217        """
218        error_col = warnings
219        if isinstance(warnings, list):
220            error_col = ' / '.join(warnings)
221        row['--ERRORS--'] = error_col
222        writer.writerow(row)
223        return
[4806]224   
[4885]225    def doImport(self, path, headerfields, mode='create', user='Unknown',
226                 logger=None):
[4811]227        """Perform actual import.
228        """
[4832]229        time_start = time.time()
[4806]230        self.checkHeaders(headerfields, mode)
[4832]231        mapping = self.getMapping(path, headerfields, mode)
[4806]232        converters = self.getFieldConverters(headerfields)
233        reader = csv.DictReader(open(path, 'rb'))
[4889]234
[4900]235        temp_dir = tempfile.mkdtemp()
236       
[4889]237        (base, ext) = os.path.splitext(path)
[4900]238        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[4911]239        failed_headers = mapping.keys()
[4877]240        failed_headers.append('--ERRORS--')
[4821]241        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
242                                       failed_headers)
[4911]243        first_row = mapping.items()
244        first_row.append(("--ERRORS--", "--ERRORS--"),)
245        failed_writer.writerow(dict(first_row))
[4891]246
[4900]247        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[4891]248        finished_headers = [x for x in mapping.values()]
249        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
250                                         finished_headers)
251        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[4821]252       
[4806]253        num =0
[4878]254        num_warns = 0
[4806]255        site = grok.getSite()
256        for raw_row in reader:
257            num += 1
258            string_row = self.applyMapping(raw_row, mapping)
[4870]259            row, conv_warnings = self.convertToTypes(
260                copy.deepcopy(string_row), converters)
[4821]261            if len(conv_warnings):
[4878]262                num_warns += 1
[4911]263                self.writeFailedRow(failed_writer, raw_row, conv_warnings)
[4821]264                continue
265               
[4806]266            if mode == 'create':
267                if not self.parentsExist(row, site):
[4878]268                    num_warns += 1
[4877]269                    self.writeFailedRow(
[4911]270                        failed_writer, raw_row,
[4877]271                        "Not all parents do exist yet. Skipping")
[4806]272                    continue
273                if self.entryExists(row, site):
[4878]274                    num_warns += 1
[4877]275                    self.writeFailedRow(
[4911]276                        failed_writer, raw_row,
[4877]277                        "This object already exists. Skipping.")
[4806]278                    continue
279                obj = self.callFactory()
280                for key, value in row.items():
281                    setattr(obj, key, value)
282                self.addEntry(obj, row, site)
283            elif mode == 'remove':
284                if not self.entryExists(row, site):
[4878]285                    num_warns += 1
[4877]286                    self.writeFailedRow(
[4911]287                        failed_writer, raw_row,
[4877]288                        "Cannot remove: no such entry.")
[4806]289                    continue
290                self.delEntry(row, site)
291            elif mode == 'update':
292                obj = self.getEntry(row, site)
293                if obj is None:
[4878]294                    num_warns += 1
[4877]295                    self.writeFailedRow(
[4911]296                        failed_writer, raw_row,
[4877]297                        "Cannot update: no such entry.")
[4806]298                    continue
299                self.updateEntry(obj, row, site)
[4891]300            finished_writer.writerow(string_row)
[4821]301
[4832]302        time_end = time.time()
303        timedelta = time_end - time_start
304       
[4878]305        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]306                           timedelta, logger=logger)
[4894]307        failed_path = os.path.abspath(failed_path)
[4878]308        if num_warns == 0:
[4821]309            del failed_writer
310            os.unlink(failed_path)
[4894]311            failed_path = None
312        return (num, num_warns,
313                os.path.abspath(finished_path), failed_path)
[4806]314
[4844]315class FacultyProcessor(BatchProcessor):
[4829]316    """A batch processor for IFaculty objects.
317    """
[4831]318    grok.provides(IBatchProcessor)
[4806]319    grok.context(Interface)
320    util_name = 'facultyimporter'
321    grok.name(util_name)
322
323    name = u'Faculty importer'
324    iface = IFaculty
325
326    location_fields = ['code',]
327    factory_name = 'waeup.Faculty'
328
329    def parentsExist(self, row, site):
330        return 'faculties' in site.keys()
331
332    def entryExists(self, row, site):
333        return row['code'] in site['faculties'].keys()
334
335    def getParent(self, row, site):
336        return site['faculties']
337
338    def getEntry(self, row, site):
339        if not self.entryExists(row, site):
340            return None
341        parent = self.getParent(row, site)
342        return parent.get(row['code'])
343   
344    def addEntry(self, obj, row, site):
345        parent = self.getParent(row, site)
346        parent.addFaculty(obj)
347        return
348
349    def delEntry(self, row, site):
350        parent = self.getParent(row, site)
351        del parent[row['code']]
352        pass
353
[4844]354class DepartmentProcessor(BatchProcessor):
[4829]355    """A batch processor for IDepartment objects.
356    """
[4831]357    grok.provides(IBatchProcessor)
[4806]358    grok.context(Interface)
359    util_name = 'departmentimporter'
360    grok.name(util_name)
361
362    name = u'Department importer'
363    iface = IDepartment
364
365    location_fields = ['code', 'faculty_code']
366    factory_name = 'waeup.Department'
367
368    def parentsExist(self, row, site):
369        if not 'faculties' in site.keys():
370            return False
371        return row['faculty_code'] in site['faculties']
372
373    def entryExists(self, row, site):
374        if not self.parentsExist(row, site):
375            return False
376        parent = self.getParent(row, site)
377        return row['code'] in parent.keys()
378
379    def getParent(self, row, site):
380        return site['faculties'][row['faculty_code']]
381
382    def getEntry(self, row, site):
383        if not self.entryExists(row, site):
384            return None
385        parent = self.getParent(row, site)
386        return parent.get(row['code'])
387   
388    def addEntry(self, obj, row, site):
389        parent = self.getParent(row, site)
390        parent.addDepartment(obj)
391        return
392
393    def delEntry(self, row, site):
394        parent = self.getParent(row, site)
395        del parent[row['code']]
[4821]396        return
[4806]397
[4844]398class CourseProcessor(BatchProcessor):
[4829]399    """A batch processor for ICourse objects.
400    """
[4831]401    grok.provides(IBatchProcessor)
[4821]402    grok.context(Interface)
403    util_name = 'courseimporter'
404    grok.name(util_name)
405
406    name = u'Course importer'
407    iface = ICourse
408
409    location_fields = ['code', 'faculty_code', 'department_code']
410    factory_name = 'waeup.Course'
411
412    def parentsExist(self, row, site):
413        if not 'faculties' in site.keys():
414            return False
415        if not row['faculty_code'] in site['faculties'].keys():
416            return False
417        faculty = site['faculties'][row['faculty_code']]
418        return row['department_code'] in faculty.keys()
419
420    def entryExists(self, row, site):
421        if not self.parentsExist(row, site):
422            return False
423        parent = self.getParent(row, site)
424        return row['code'] in parent.keys()
425
426    def getParent(self, row, site):
427        dept = site['faculties'][row['faculty_code']][row['department_code']]
428        return dept.courses
429
430    def getEntry(self, row, site):
431        if not self.entryExists(row, site):
432            return None
433        parent = self.getParent(row, site)
434        return parent.get(row['code'])
435   
436    def addEntry(self, obj, row, site):
437        parent = self.getParent(row, site)
438        parent.addCourse(obj)
439        return
440
441    def delEntry(self, row, site):
442        parent = self.getParent(row, site)
443        del parent[row['code']]
444        return
445
[4844]446class CertificateProcessor(BatchProcessor):
[4829]447    """A batch processor for ICertificate objects.
448    """
[4831]449    grok.provides(IBatchProcessor)
[4829]450    grok.context(Interface)
451    util_name = 'certificateimporter'
452    grok.name(util_name)
453
454    name = u'Certificate importer'
455    iface = ICertificate
456
457    location_fields = ['code', 'faculty_code', 'department_code']
458    factory_name = 'waeup.Certificate'
459
460    def parentsExist(self, row, site):
461        if not 'faculties' in site.keys():
462            return False
463        if not row['faculty_code'] in site['faculties'].keys():
464            return False
465        faculty = site['faculties'][row['faculty_code']]
466        return row['department_code'] in faculty.keys()
467
468    def entryExists(self, row, site):
469        if not self.parentsExist(row, site):
470            return False
471        parent = self.getParent(row, site)
472        return row['code'] in parent.keys()
473
474    def getParent(self, row, site):
475        dept = site['faculties'][row['faculty_code']][row['department_code']]
476        return dept.certificates
477
478    def getEntry(self, row, site):
479        if not self.entryExists(row, site):
480            return None
481        parent = self.getParent(row, site)
482        return parent.get(row['code'])
483
484    def addEntry(self, obj, row, site):
485        parent = self.getParent(row, site)
486        parent.addCertificate(obj)
[4821]487        return
488
[4829]489    def delEntry(self, row, site):
490        parent = self.getParent(row, site)
491        del parent[row['code']]
492        return
[4832]493
494class CertificateCourseProcessor(BatchProcessor):
495    """A batch processor for ICertificateCourse objects.
496    """
497    grok.provides(IBatchProcessor)
498    grok.context(Interface)
499    util_name = 'certificatecourseimporter'
500    grok.name(util_name)
501
502    name = u'CertificateCourse importer'
503    iface = ICertificateCourse
504
505    location_fields = ['course', 'level', 'faculty_code', 'department_code',
506                       'certificate_code',]
507    factory_name = 'waeup.CertificateCourse'
508
509    def parentsExist(self, row, site):
510        if not 'faculties' in site.keys():
511            return False
512        if not row['faculty_code'] in site['faculties'].keys():
513            return False
514        faculty = site['faculties'][row['faculty_code']]
515        if not row['department_code'] in faculty.keys():
516            return False
517        dept = faculty[row['department_code']]
518        return row['certificate_code'] in dept.certificates.keys()
519
520    def entryExists(self, row, site):
521        if not self.parentsExist(row, site):
522            return False
523        parent = self.getParent(row, site)
524        code = "%s_%s" % (row['course'].code, row['level'])
525        return code in parent.keys()
526
527    def getParent(self, row, site):
528        dept = site['faculties'][row['faculty_code']][row['department_code']]
529        return dept.certificates[row['certificate_code']]
530
531    def getEntry(self, row, site):
532        if not self.entryExists(row, site):
533            return None
534        parent = self.getParent(row, site)
535        return parent.get(row['course'])
536
537    def addEntry(self, obj, row, site):
538        parent = self.getParent(row, site)
539        parent.addCourseRef(row['course'],
540                            row['level'], row['core_or_elective'])
541        return
542
543    def delEntry(self, row, site):
544        parent = self.getParent(row, site)
545        parent.delCourseRef(row['course'].code, row['level'])
546        return
Note: See TracBrowser for help on using the repository browser.