source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 4960

Last change on this file since 4960 was 4920, checked in by uli, 15 years ago

Make unit tests run again with the new package layout.

File size: 18.2 KB
Line 
1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
7import copy
8import csv
9import os
10import sys
11import tempfile
12import time
13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from zope.schema.interfaces import IText
17from waeup.sirp.interfaces import (IBatchProcessor, IFacultyContainer, IFaculty,
18                              ICourse, IDepartment, ICertificate,
19                              ICertificateCourse, ISchemaTypeConverter,
20                              FatalCSVError)
21
22class BatchProcessor(grok.GlobalUtility):
23    """A processor to add, update, or remove data.
24
25    This is a non-active baseclass.
26    """
27    grok.provides(IBatchProcessor)
28    grok.context(Interface)
29    grok.baseclass()
30
31    # Name used in pages and forms...
32    name = u'Department importer'
33   
34    # Internal name...
35    util_name = 'departmentimporter'
36   
37    # Items for this processor need an interface with zope.schema fields.
38    iface = IDepartment
39   
40    # The name must be the same as the util_name attribute in order to
41    # register this utility correctly.
42    grok.name(util_name)
43
44    # Headers needed to locate items...
45    location_fields = ['code', 'faculty_code']
46   
47    # A factory with this name must be registered...
48    factory_name = 'waeup.Department'
49
50    @property
51    def required_fields(self):
52        """Required fields that have no default.
53
54        A list of names of field, whose value cannot be set if not
55        given during creation. Therefore these fields must exist in
56        input.
57
58        Fields with a default != missing_value do not belong to this
59        category.
60        """
61        result = []
62        for key, field in getFields(self.iface).items():
63            if key in self.location_fields:
64                continue
65            if field.default is not field.missing_value:
66                continue
67            if field.required:
68                result.append(key)
69        return result
70       
71    @property
72    def req(self):
73        result = dict(
74            create = self.location_fields + self.required_fields,
75            update = self.location_fields,
76            remove = self.location_fields,
77        )
78        return result
79
80    @property
81    def available_fields(self):
82        result = []
83        return sorted(list(set(
84                    self.location_fields + getFields(self.iface).keys())))
85   
86    def getHeaders(self, mode='create'):
87        return self.available_fields
88
89    def checkHeaders(self, headerfields, mode='create'):
90        req = self.req[mode]
91        # Check for required fields...
92        for field in req:
93            if not field in headerfields:
94                raise FatalCSVError(
95                    "Need at least columns %s for import!" %
96                    ', '.join(["'%s'" % x for x in req]))
97        # Check for double fields...
98        not_ignored_fields = [x for x in headerfields
99                              if not x.startswith('--')]
100        if len(set(not_ignored_fields)) < len(not_ignored_fields):
101            raise FatalCSVError(
102                "Double headers: each column name may only appear once.")
103        return True
104
105    def applyMapping(self, row, mapping):
106        """Apply mapping to a row of CSV data.
107        """
108        result = dict()
109        for key, replacement in mapping.items():
110            result[replacement] = row[key]
111        return result
112       
113    def getMapping(self, path, headerfields, mode):
114        """Get a mapping from CSV file headerfields to actually used
115           fieldnames.
116        """
117        result = dict()
118        reader = csv.reader(open(path, 'rb'))
119        raw_header = reader.next()
120        for num, field in enumerate(headerfields):
121            if field not in self.location_fields and mode == 'remove':
122                # Ignore non-location fields when removing...
123                field = '--IGNORE--'
124            result[raw_header[num]] = field
125        return result
126
127    def getFieldConverters(self, fieldnames):
128        """Get converters for fieldnames.
129        """
130        result = dict()
131        for key, field in getFields(self.iface).items():
132            if key not in fieldnames:
133                continue
134            converter = ISchemaTypeConverter(field)
135            result[key] = converter
136        return result
137   
138    def convertToTypes(self, row, converter_dict):
139        """Convert values in given row to destination type.
140        """
141        if '--IGNORE--' in row.keys():
142            del row['--IGNORE--']
143        warnings = []
144        for key, value in row.items():
145            converter = converter_dict.get(key, None)
146            if converter:
147                try:
148                    row.update({key:converter_dict[key].fromString(value)})
149                except:
150                    msg = "conversion error: field %s: %r" % (
151                        key, sys.exc_info()[1])
152                    warnings.append(msg)
153        return (row, warnings)
154
155    def callFactory(self, *args, **kw):
156        return createObject(self.factory_name)
157
158    def parentsExist(self, row, site):
159        """Tell whether the parent object for data in ``row`` exists.
160        """
161        raise NotImplementedError('method not implemented')
162
163    def entryExists(self, row, site):
164        """Tell whether there already exists an entry for ``row`` data.
165        """
166        raise NotImplementedError('method not implemented')
167
168    def getParent(self, row, site):
169        """Get the parent object for the entry in ``row``.
170        """
171        raise NotImplementedError('method not implemented')
172   
173    def addEntry(self, obj, row, site):
174        """Add the entry given given by ``row`` data.
175        """
176        raise NotImplementedError('method not implemented')
177
178    def delEntry(self, row, site):
179        """Delete entry given by ``row`` data.
180        """
181        raise NotImplementedError('method not implemented')       
182
183    def updateEntry(self, obj, row, site):
184        for key, value in row.items():
185            setattr(obj, key, value)
186        parent = self.getParent(row, site)
187        parent[row['name']] = obj
188        return
189
190    def createLogfile(self, path, fail_path, num, warnings, mode, user,
191                      timedelta, logger=None):
192        """Write to log file.
193        """
194        if logger is None:
195            return
196        status = 'OK'
197        if warnings > 0:
198            status = 'FAILED'
199        logger.info("-" * 20)
200        logger.info("%s: Batch processing finished: %s" % (user, status))
201        logger.info("%s: Source: %s" % (user, path))
202        logger.info("%s: Mode: %s" % (user, mode))
203        logger.info("%s: User: %s" % (user, user))
204        if warnings > 0:
205            logger.info("%s: Failed datasets: %s" % (
206                    user, os.path.basename(fail_path)))
207        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
208                user, timedelta, timedelta/(num or 1)))
209        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
210                user, num, num - warnings, warnings
211                ))
212        logger.info("-" * 20)
213        return
214
215    def writeFailedRow(self, writer, row, warnings):
216        """Write a row with error messages to error CSV.
217
218        If warnings is a list of strings, they will be concatenated.
219        """
220        error_col = warnings
221        if isinstance(warnings, list):
222            error_col = ' / '.join(warnings)
223        row['--ERRORS--'] = error_col
224        writer.writerow(row)
225        return
226   
227    def doImport(self, path, headerfields, mode='create', user='Unknown',
228                 logger=None):
229        """Perform actual import.
230        """
231        time_start = time.time()
232        self.checkHeaders(headerfields, mode)
233        mapping = self.getMapping(path, headerfields, mode)
234        converters = self.getFieldConverters(headerfields)
235        reader = csv.DictReader(open(path, 'rb'))
236
237        temp_dir = tempfile.mkdtemp()
238       
239        (base, ext) = os.path.splitext(path)
240        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
241        failed_headers = mapping.keys()
242        failed_headers.append('--ERRORS--')
243        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
244                                       failed_headers)
245        first_row = mapping.items()
246        first_row.append(("--ERRORS--", "--ERRORS--"),)
247        failed_writer.writerow(dict(first_row))
248
249        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
250        finished_headers = [x for x in mapping.values()]
251        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
252                                         finished_headers)
253        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
254       
255        num =0
256        num_warns = 0
257        site = grok.getSite()
258        for raw_row in reader:
259            num += 1
260            string_row = self.applyMapping(raw_row, mapping)
261            row, conv_warnings = self.convertToTypes(
262                copy.deepcopy(string_row), converters)
263            if len(conv_warnings):
264                num_warns += 1
265                self.writeFailedRow(failed_writer, raw_row, conv_warnings)
266                continue
267               
268            if mode == 'create':
269                if not self.parentsExist(row, site):
270                    num_warns += 1
271                    self.writeFailedRow(
272                        failed_writer, raw_row,
273                        "Not all parents do exist yet. Skipping")
274                    continue
275                if self.entryExists(row, site):
276                    num_warns += 1
277                    self.writeFailedRow(
278                        failed_writer, raw_row,
279                        "This object already exists. Skipping.")
280                    continue
281                obj = self.callFactory()
282                for key, value in row.items():
283                    setattr(obj, key, value)
284                self.addEntry(obj, row, site)
285            elif mode == 'remove':
286                if not self.entryExists(row, site):
287                    num_warns += 1
288                    self.writeFailedRow(
289                        failed_writer, raw_row,
290                        "Cannot remove: no such entry.")
291                    continue
292                self.delEntry(row, site)
293            elif mode == 'update':
294                obj = self.getEntry(row, site)
295                if obj is None:
296                    num_warns += 1
297                    self.writeFailedRow(
298                        failed_writer, raw_row,
299                        "Cannot update: no such entry.")
300                    continue
301                self.updateEntry(obj, row, site)
302            finished_writer.writerow(string_row)
303
304        time_end = time.time()
305        timedelta = time_end - time_start
306       
307        if path.endswith('.pending.csv'):
308            os.unlink(path)
309            os.rename(failed_path, path)
310            failed_path = path
311        self.createLogfile(path, failed_path, num, num_warns, mode, user,
312                           timedelta, logger=logger)
313        failed_path = os.path.abspath(failed_path)
314        if num_warns == 0:
315            del failed_writer
316            os.unlink(failed_path)
317            failed_path = None
318        return (num, num_warns,
319                os.path.abspath(finished_path), failed_path)
320
321class FacultyProcessor(BatchProcessor):
322    """A batch processor for IFaculty objects.
323    """
324    grok.provides(IBatchProcessor)
325    grok.context(Interface)
326    util_name = 'facultyimporter'
327    grok.name(util_name)
328
329    name = u'Faculty importer'
330    iface = IFaculty
331
332    location_fields = ['code',]
333    factory_name = 'waeup.Faculty'
334
335    def parentsExist(self, row, site):
336        return 'faculties' in site.keys()
337
338    def entryExists(self, row, site):
339        return row['code'] in site['faculties'].keys()
340
341    def getParent(self, row, site):
342        return site['faculties']
343
344    def getEntry(self, row, site):
345        if not self.entryExists(row, site):
346            return None
347        parent = self.getParent(row, site)
348        return parent.get(row['code'])
349   
350    def addEntry(self, obj, row, site):
351        parent = self.getParent(row, site)
352        parent.addFaculty(obj)
353        return
354
355    def delEntry(self, row, site):
356        parent = self.getParent(row, site)
357        del parent[row['code']]
358        pass
359
360class DepartmentProcessor(BatchProcessor):
361    """A batch processor for IDepartment objects.
362    """
363    grok.provides(IBatchProcessor)
364    grok.context(Interface)
365    util_name = 'departmentimporter'
366    grok.name(util_name)
367
368    name = u'Department importer'
369    iface = IDepartment
370
371    location_fields = ['code', 'faculty_code']
372    factory_name = 'waeup.Department'
373
374    def parentsExist(self, row, site):
375        if not 'faculties' in site.keys():
376            return False
377        return row['faculty_code'] in site['faculties']
378
379    def entryExists(self, row, site):
380        if not self.parentsExist(row, site):
381            return False
382        parent = self.getParent(row, site)
383        return row['code'] in parent.keys()
384
385    def getParent(self, row, site):
386        return site['faculties'][row['faculty_code']]
387
388    def getEntry(self, row, site):
389        if not self.entryExists(row, site):
390            return None
391        parent = self.getParent(row, site)
392        return parent.get(row['code'])
393   
394    def addEntry(self, obj, row, site):
395        parent = self.getParent(row, site)
396        parent.addDepartment(obj)
397        return
398
399    def delEntry(self, row, site):
400        parent = self.getParent(row, site)
401        del parent[row['code']]
402        return
403
404class CourseProcessor(BatchProcessor):
405    """A batch processor for ICourse objects.
406    """
407    grok.provides(IBatchProcessor)
408    grok.context(Interface)
409    util_name = 'courseimporter'
410    grok.name(util_name)
411
412    name = u'Course importer'
413    iface = ICourse
414
415    location_fields = ['code', 'faculty_code', 'department_code']
416    factory_name = 'waeup.Course'
417
418    def parentsExist(self, row, site):
419        if not 'faculties' in site.keys():
420            return False
421        if not row['faculty_code'] in site['faculties'].keys():
422            return False
423        faculty = site['faculties'][row['faculty_code']]
424        return row['department_code'] in faculty.keys()
425
426    def entryExists(self, row, site):
427        if not self.parentsExist(row, site):
428            return False
429        parent = self.getParent(row, site)
430        return row['code'] in parent.keys()
431
432    def getParent(self, row, site):
433        dept = site['faculties'][row['faculty_code']][row['department_code']]
434        return dept.courses
435
436    def getEntry(self, row, site):
437        if not self.entryExists(row, site):
438            return None
439        parent = self.getParent(row, site)
440        return parent.get(row['code'])
441   
442    def addEntry(self, obj, row, site):
443        parent = self.getParent(row, site)
444        parent.addCourse(obj)
445        return
446
447    def delEntry(self, row, site):
448        parent = self.getParent(row, site)
449        del parent[row['code']]
450        return
451
452class CertificateProcessor(BatchProcessor):
453    """A batch processor for ICertificate objects.
454    """
455    grok.provides(IBatchProcessor)
456    grok.context(Interface)
457    util_name = 'certificateimporter'
458    grok.name(util_name)
459
460    name = u'Certificate importer'
461    iface = ICertificate
462
463    location_fields = ['code', 'faculty_code', 'department_code']
464    factory_name = 'waeup.Certificate'
465
466    def parentsExist(self, row, site):
467        if not 'faculties' in site.keys():
468            return False
469        if not row['faculty_code'] in site['faculties'].keys():
470            return False
471        faculty = site['faculties'][row['faculty_code']]
472        return row['department_code'] in faculty.keys()
473
474    def entryExists(self, row, site):
475        if not self.parentsExist(row, site):
476            return False
477        parent = self.getParent(row, site)
478        return row['code'] in parent.keys()
479
480    def getParent(self, row, site):
481        dept = site['faculties'][row['faculty_code']][row['department_code']]
482        return dept.certificates
483
484    def getEntry(self, row, site):
485        if not self.entryExists(row, site):
486            return None
487        parent = self.getParent(row, site)
488        return parent.get(row['code'])
489
490    def addEntry(self, obj, row, site):
491        parent = self.getParent(row, site)
492        parent.addCertificate(obj)
493        return
494
495    def delEntry(self, row, site):
496        parent = self.getParent(row, site)
497        del parent[row['code']]
498        return
499
500class CertificateCourseProcessor(BatchProcessor):
501    """A batch processor for ICertificateCourse objects.
502    """
503    grok.provides(IBatchProcessor)
504    grok.context(Interface)
505    util_name = 'certificatecourseimporter'
506    grok.name(util_name)
507
508    name = u'CertificateCourse importer'
509    iface = ICertificateCourse
510
511    location_fields = ['course', 'level', 'faculty_code', 'department_code',
512                       'certificate_code',]
513    factory_name = 'waeup.CertificateCourse'
514
515    def parentsExist(self, row, site):
516        if not 'faculties' in site.keys():
517            return False
518        if not row['faculty_code'] in site['faculties'].keys():
519            return False
520        faculty = site['faculties'][row['faculty_code']]
521        if not row['department_code'] in faculty.keys():
522            return False
523        dept = faculty[row['department_code']]
524        return row['certificate_code'] in dept.certificates.keys()
525
526    def entryExists(self, row, site):
527        if not self.parentsExist(row, site):
528            return False
529        parent = self.getParent(row, site)
530        code = "%s_%s" % (row['course'].code, row['level'])
531        return code in parent.keys()
532
533    def getParent(self, row, site):
534        dept = site['faculties'][row['faculty_code']][row['department_code']]
535        return dept.certificates[row['certificate_code']]
536
537    def getEntry(self, row, site):
538        if not self.entryExists(row, site):
539            return None
540        parent = self.getParent(row, site)
541        return parent.get(row['course'])
542
543    def addEntry(self, obj, row, site):
544        parent = self.getParent(row, site)
545        parent.addCourseRef(row['course'],
546                            row['level'], row['core_or_elective'])
547        return
548
549    def delEntry(self, row, site):
550        parent = self.getParent(row, site)
551        parent.delCourseRef(row['course'].code, row['level'])
552        return
Note: See TracBrowser for help on using the repository browser.