source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 4990

Last change on this file since 4990 was 4984, checked in by uli, 15 years ago

Add docs.

File size: 18.1 KB
Line 
1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
7import copy
8import csv
9import os
10import sys
11import tempfile
12import time
13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from zope.schema.interfaces import IText
17from waeup.sirp.interfaces import (IBatchProcessor, IFacultyContainer, IFaculty,
18                              ICourse, IDepartment, ICertificate,
19                              ICertificateCourse, ISchemaTypeConverter,
20                              FatalCSVError)
21
22class BatchProcessor(grok.GlobalUtility):
23    """A processor to add, update, or remove data.
24
25    This is a non-active baseclass.
26    """
27    grok.provides(IBatchProcessor)
28    grok.context(Interface)
29    grok.baseclass()
30
31    # Name used in pages and forms...
32    name = u'Department importer'
33   
34    # Internal name...
35    util_name = 'departmentimporter'
36   
37    # Items for this processor need an interface with zope.schema fields.
38    iface = IDepartment
39   
40    # The name must be the same as the util_name attribute in order to
41    # register this utility correctly.
42    grok.name(util_name)
43
44    # Headers needed to locate items...
45    location_fields = ['code', 'faculty_code']
46   
47    # A factory with this name must be registered...
48    factory_name = 'waeup.Department'
49
50    @property
51    def required_fields(self):
52        """Required fields that have no default.
53
54        A list of names of field, whose value cannot be set if not
55        given during creation. Therefore these fields must exist in
56        input.
57
58        Fields with a default != missing_value do not belong to this
59        category.
60        """
61        result = []
62        for key, field in getFields(self.iface).items():
63            if key in self.location_fields:
64                continue
65            if field.default is not field.missing_value:
66                continue
67            if field.required:
68                result.append(key)
69        return result
70       
71    @property
72    def req(self):
73        result = dict(
74            create = self.location_fields + self.required_fields,
75            update = self.location_fields,
76            remove = self.location_fields,
77        )
78        return result
79
80    @property
81    def available_fields(self):
82        result = []
83        return sorted(list(set(
84                    self.location_fields + getFields(self.iface).keys())))
85   
86    def getHeaders(self, mode='create'):
87        return self.available_fields
88
89    def checkHeaders(self, headerfields, mode='create'):
90        req = self.req[mode]
91        # Check for required fields...
92        for field in req:
93            if not field in headerfields:
94                raise FatalCSVError(
95                    "Need at least columns %s for import!" %
96                    ', '.join(["'%s'" % x for x in req]))
97        # Check for double fields...
98        not_ignored_fields = [x for x in headerfields
99                              if not x.startswith('--')]
100        if len(set(not_ignored_fields)) < len(not_ignored_fields):
101            raise FatalCSVError(
102                "Double headers: each column name may only appear once.")
103        return True
104
105    def applyMapping(self, row, mapping):
106        """Apply mapping to a row of CSV data.
107        """
108        result = dict()
109        for key, replacement in mapping.items():
110            result[replacement] = row[key]
111        return result
112       
113    def getMapping(self, path, headerfields, mode):
114        """Get a mapping from CSV file headerfields to actually used
115           fieldnames.
116        """
117        result = dict()
118        reader = csv.reader(open(path, 'rb'))
119        raw_header = reader.next()
120        for num, field in enumerate(headerfields):
121            if field not in self.location_fields and mode == 'remove':
122                # Ignore non-location fields when removing...
123                field = '--IGNORE--'
124            result[raw_header[num]] = field
125        return result
126
127    def getFieldConverters(self, fieldnames):
128        """Get converters for fieldnames.
129        """
130        result = dict()
131        for key, field in getFields(self.iface).items():
132            if key not in fieldnames:
133                continue
134            converter = ISchemaTypeConverter(field)
135            result[key] = converter
136        return result
137   
138    def convertToTypes(self, row, converter_dict):
139        """Convert values in given row to destination type.
140        """
141        if '--IGNORE--' in row.keys():
142            del row['--IGNORE--']
143        warnings = []
144        for key, value in row.items():
145            converter = converter_dict.get(key, None)
146            if converter:
147                try:
148                    row.update({key:converter_dict[key].fromString(value)})
149                except:
150                    msg = "conversion error: field %s: %r" % (
151                        key, sys.exc_info()[1])
152                    warnings.append(msg)
153        return (row, warnings)
154
155    def callFactory(self, *args, **kw):
156        return createObject(self.factory_name)
157
158    def parentsExist(self, row, site):
159        """Tell whether the parent object for data in ``row`` exists.
160        """
161        raise NotImplementedError('method not implemented')
162
163    def entryExists(self, row, site):
164        """Tell whether there already exists an entry for ``row`` data.
165        """
166        raise NotImplementedError('method not implemented')
167
168    def getParent(self, row, site):
169        """Get the parent object for the entry in ``row``.
170        """
171        raise NotImplementedError('method not implemented')
172   
173    def addEntry(self, obj, row, site):
174        """Add the entry given given by ``row`` data.
175        """
176        raise NotImplementedError('method not implemented')
177
178    def delEntry(self, row, site):
179        """Delete entry given by ``row`` data.
180        """
181        raise NotImplementedError('method not implemented')       
182
183    def updateEntry(self, obj, row, site):
184        """Update obj to the values given in row.
185        """
186        for key, value in row.items():
187            setattr(obj, key, value)
188        return
189
190    def createLogfile(self, path, fail_path, num, warnings, mode, user,
191                      timedelta, logger=None):
192        """Write to log file.
193        """
194        if logger is None:
195            return
196        status = 'OK'
197        if warnings > 0:
198            status = 'FAILED'
199        logger.info("-" * 20)
200        logger.info("%s: Batch processing finished: %s" % (user, status))
201        logger.info("%s: Source: %s" % (user, path))
202        logger.info("%s: Mode: %s" % (user, mode))
203        logger.info("%s: User: %s" % (user, user))
204        if warnings > 0:
205            logger.info("%s: Failed datasets: %s" % (
206                    user, os.path.basename(fail_path)))
207        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
208                user, timedelta, timedelta/(num or 1)))
209        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
210                user, num, num - warnings, warnings
211                ))
212        logger.info("-" * 20)
213        return
214
215    def writeFailedRow(self, writer, row, warnings):
216        """Write a row with error messages to error CSV.
217
218        If warnings is a list of strings, they will be concatenated.
219        """
220        error_col = warnings
221        if isinstance(warnings, list):
222            error_col = ' / '.join(warnings)
223        row['--ERRORS--'] = error_col
224        writer.writerow(row)
225        return
226   
227    def doImport(self, path, headerfields, mode='create', user='Unknown',
228                 logger=None):
229        """Perform actual import.
230        """
231        time_start = time.time()
232        self.checkHeaders(headerfields, mode)
233        mapping = self.getMapping(path, headerfields, mode)
234        converters = self.getFieldConverters(headerfields)
235        reader = csv.DictReader(open(path, 'rb'))
236
237        temp_dir = tempfile.mkdtemp()
238       
239        (base, ext) = os.path.splitext(path)
240        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
241        failed_headers = mapping.keys()
242        failed_headers.append('--ERRORS--')
243        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
244                                       failed_headers)
245        first_row = mapping.items()
246        first_row.append(("--ERRORS--", "--ERRORS--"),)
247        failed_writer.writerow(dict(first_row))
248
249        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
250        finished_headers = [x for x in mapping.values()]
251        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
252                                         finished_headers)
253        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
254       
255        num =0
256        num_warns = 0
257        site = grok.getSite()
258        for raw_row in reader:
259            num += 1
260            string_row = self.applyMapping(raw_row, mapping)
261            row, conv_warnings = self.convertToTypes(
262                copy.deepcopy(string_row), converters)
263            if len(conv_warnings):
264                num_warns += 1
265                self.writeFailedRow(failed_writer, raw_row, conv_warnings)
266                continue
267               
268            if mode == 'create':
269                if not self.parentsExist(row, site):
270                    num_warns += 1
271                    self.writeFailedRow(
272                        failed_writer, raw_row,
273                        "Not all parents do exist yet. Skipping")
274                    continue
275                if self.entryExists(row, site):
276                    num_warns += 1
277                    self.writeFailedRow(
278                        failed_writer, raw_row,
279                        "This object already exists. Skipping.")
280                    continue
281                obj = self.callFactory()
282                for key, value in row.items():
283                    setattr(obj, key, value)
284                self.addEntry(obj, row, site)
285            elif mode == 'remove':
286                if not self.entryExists(row, site):
287                    num_warns += 1
288                    self.writeFailedRow(
289                        failed_writer, raw_row,
290                        "Cannot remove: no such entry.")
291                    continue
292                self.delEntry(row, site)
293            elif mode == 'update':
294                obj = self.getEntry(row, site)
295                if obj is None:
296                    num_warns += 1
297                    self.writeFailedRow(
298                        failed_writer, raw_row,
299                        "Cannot update: no such entry.")
300                    continue
301                self.updateEntry(obj, row, site)
302            finished_writer.writerow(string_row)
303
304        time_end = time.time()
305        timedelta = time_end - time_start
306       
307        self.createLogfile(path, failed_path, num, num_warns, mode, user,
308                           timedelta, logger=logger)
309        failed_path = os.path.abspath(failed_path)
310        if num_warns == 0:
311            del failed_writer
312            os.unlink(failed_path)
313            failed_path = None
314        return (num, num_warns,
315                os.path.abspath(finished_path), failed_path)
316
317class FacultyProcessor(BatchProcessor):
318    """A batch processor for IFaculty objects.
319    """
320    grok.provides(IBatchProcessor)
321    grok.context(Interface)
322    util_name = 'facultyimporter'
323    grok.name(util_name)
324
325    name = u'Faculty importer'
326    iface = IFaculty
327
328    location_fields = ['code',]
329    factory_name = 'waeup.Faculty'
330
331    def parentsExist(self, row, site):
332        return 'faculties' in site.keys()
333
334    def entryExists(self, row, site):
335        return row['code'] in site['faculties'].keys()
336
337    def getParent(self, row, site):
338        return site['faculties']
339
340    def getEntry(self, row, site):
341        if not self.entryExists(row, site):
342            return None
343        parent = self.getParent(row, site)
344        return parent.get(row['code'])
345   
346    def addEntry(self, obj, row, site):
347        parent = self.getParent(row, site)
348        parent.addFaculty(obj)
349        return
350
351    def delEntry(self, row, site):
352        parent = self.getParent(row, site)
353        del parent[row['code']]
354        pass
355
356class DepartmentProcessor(BatchProcessor):
357    """A batch processor for IDepartment objects.
358    """
359    grok.provides(IBatchProcessor)
360    grok.context(Interface)
361    util_name = 'departmentimporter'
362    grok.name(util_name)
363
364    name = u'Department importer'
365    iface = IDepartment
366
367    location_fields = ['code', 'faculty_code']
368    factory_name = 'waeup.Department'
369
370    def parentsExist(self, row, site):
371        if not 'faculties' in site.keys():
372            return False
373        return row['faculty_code'] in site['faculties']
374
375    def entryExists(self, row, site):
376        if not self.parentsExist(row, site):
377            return False
378        parent = self.getParent(row, site)
379        return row['code'] in parent.keys()
380
381    def getParent(self, row, site):
382        return site['faculties'][row['faculty_code']]
383
384    def getEntry(self, row, site):
385        if not self.entryExists(row, site):
386            return None
387        parent = self.getParent(row, site)
388        return parent.get(row['code'])
389   
390    def addEntry(self, obj, row, site):
391        parent = self.getParent(row, site)
392        parent.addDepartment(obj)
393        return
394
395    def delEntry(self, row, site):
396        parent = self.getParent(row, site)
397        del parent[row['code']]
398        return
399
400class CourseProcessor(BatchProcessor):
401    """A batch processor for ICourse objects.
402    """
403    grok.provides(IBatchProcessor)
404    grok.context(Interface)
405    util_name = 'courseimporter'
406    grok.name(util_name)
407
408    name = u'Course importer'
409    iface = ICourse
410
411    location_fields = ['code', 'faculty_code', 'department_code']
412    factory_name = 'waeup.Course'
413
414    def parentsExist(self, row, site):
415        if not 'faculties' in site.keys():
416            return False
417        if not row['faculty_code'] in site['faculties'].keys():
418            return False
419        faculty = site['faculties'][row['faculty_code']]
420        return row['department_code'] in faculty.keys()
421
422    def entryExists(self, row, site):
423        if not self.parentsExist(row, site):
424            return False
425        parent = self.getParent(row, site)
426        return row['code'] in parent.keys()
427
428    def getParent(self, row, site):
429        dept = site['faculties'][row['faculty_code']][row['department_code']]
430        return dept.courses
431
432    def getEntry(self, row, site):
433        if not self.entryExists(row, site):
434            return None
435        parent = self.getParent(row, site)
436        return parent.get(row['code'])
437   
438    def addEntry(self, obj, row, site):
439        parent = self.getParent(row, site)
440        parent.addCourse(obj)
441        return
442
443    def delEntry(self, row, site):
444        parent = self.getParent(row, site)
445        del parent[row['code']]
446        return
447
448class CertificateProcessor(BatchProcessor):
449    """A batch processor for ICertificate objects.
450    """
451    grok.provides(IBatchProcessor)
452    grok.context(Interface)
453    util_name = 'certificateimporter'
454    grok.name(util_name)
455
456    name = u'Certificate importer'
457    iface = ICertificate
458
459    location_fields = ['code', 'faculty_code', 'department_code']
460    factory_name = 'waeup.Certificate'
461
462    def parentsExist(self, row, site):
463        if not 'faculties' in site.keys():
464            return False
465        if not row['faculty_code'] in site['faculties'].keys():
466            return False
467        faculty = site['faculties'][row['faculty_code']]
468        return row['department_code'] in faculty.keys()
469
470    def entryExists(self, row, site):
471        if not self.parentsExist(row, site):
472            return False
473        parent = self.getParent(row, site)
474        return row['code'] in parent.keys()
475
476    def getParent(self, row, site):
477        dept = site['faculties'][row['faculty_code']][row['department_code']]
478        return dept.certificates
479
480    def getEntry(self, row, site):
481        if not self.entryExists(row, site):
482            return None
483        parent = self.getParent(row, site)
484        return parent.get(row['code'])
485
486    def addEntry(self, obj, row, site):
487        parent = self.getParent(row, site)
488        parent.addCertificate(obj)
489        return
490
491    def delEntry(self, row, site):
492        parent = self.getParent(row, site)
493        del parent[row['code']]
494        return
495
496class CertificateCourseProcessor(BatchProcessor):
497    """A batch processor for ICertificateCourse objects.
498    """
499    grok.provides(IBatchProcessor)
500    grok.context(Interface)
501    util_name = 'certificatecourseimporter'
502    grok.name(util_name)
503
504    name = u'CertificateCourse importer'
505    iface = ICertificateCourse
506
507    location_fields = ['course', 'level', 'faculty_code', 'department_code',
508                       'certificate_code',]
509    factory_name = 'waeup.CertificateCourse'
510
511    def parentsExist(self, row, site):
512        if not 'faculties' in site.keys():
513            return False
514        if not row['faculty_code'] in site['faculties'].keys():
515            return False
516        faculty = site['faculties'][row['faculty_code']]
517        if not row['department_code'] in faculty.keys():
518            return False
519        dept = faculty[row['department_code']]
520        return row['certificate_code'] in dept.certificates.keys()
521
522    def entryExists(self, row, site):
523        if not self.parentsExist(row, site):
524            return False
525        parent = self.getParent(row, site)
526        code = "%s_%s" % (row['course'].code, row['level'])
527        return code in parent.keys()
528
529    def getParent(self, row, site):
530        dept = site['faculties'][row['faculty_code']][row['department_code']]
531        return dept.certificates[row['certificate_code']]
532
533    def getEntry(self, row, site):
534        if not self.entryExists(row, site):
535            return None
536        parent = self.getParent(row, site)
537        return parent.get(row['course'])
538
539    def addEntry(self, obj, row, site):
540        parent = self.getParent(row, site)
541        parent.addCourseRef(row['course'],
542                            row['level'], row['core_or_elective'])
543        return
544
545    def delEntry(self, row, site):
546        parent = self.getParent(row, site)
547        parent.delCourseRef(row['course'].code, row['level'])
548        return
Note: See TracBrowser for help on using the repository browser.