source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 4983

Last change on this file since 4983 was 4983, checked in by uli, 15 years ago

Rely on Python/ZODB weak references when updating entries.

File size: 18.0 KB
Line 
1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
7import copy
8import csv
9import os
10import sys
11import tempfile
12import time
13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from zope.schema.interfaces import IText
17from waeup.sirp.interfaces import (IBatchProcessor, IFacultyContainer, IFaculty,
18                              ICourse, IDepartment, ICertificate,
19                              ICertificateCourse, ISchemaTypeConverter,
20                              FatalCSVError)
21
22class BatchProcessor(grok.GlobalUtility):
23    """A processor to add, update, or remove data.
24
25    This is a non-active baseclass.
26    """
27    grok.provides(IBatchProcessor)
28    grok.context(Interface)
29    grok.baseclass()
30
31    # Name used in pages and forms...
32    name = u'Department importer'
33   
34    # Internal name...
35    util_name = 'departmentimporter'
36   
37    # Items for this processor need an interface with zope.schema fields.
38    iface = IDepartment
39   
40    # The name must be the same as the util_name attribute in order to
41    # register this utility correctly.
42    grok.name(util_name)
43
44    # Headers needed to locate items...
45    location_fields = ['code', 'faculty_code']
46   
47    # A factory with this name must be registered...
48    factory_name = 'waeup.Department'
49
50    @property
51    def required_fields(self):
52        """Required fields that have no default.
53
54        A list of names of field, whose value cannot be set if not
55        given during creation. Therefore these fields must exist in
56        input.
57
58        Fields with a default != missing_value do not belong to this
59        category.
60        """
61        result = []
62        for key, field in getFields(self.iface).items():
63            if key in self.location_fields:
64                continue
65            if field.default is not field.missing_value:
66                continue
67            if field.required:
68                result.append(key)
69        return result
70       
71    @property
72    def req(self):
73        result = dict(
74            create = self.location_fields + self.required_fields,
75            update = self.location_fields,
76            remove = self.location_fields,
77        )
78        return result
79
80    @property
81    def available_fields(self):
82        result = []
83        return sorted(list(set(
84                    self.location_fields + getFields(self.iface).keys())))
85   
86    def getHeaders(self, mode='create'):
87        return self.available_fields
88
89    def checkHeaders(self, headerfields, mode='create'):
90        req = self.req[mode]
91        # Check for required fields...
92        for field in req:
93            if not field in headerfields:
94                raise FatalCSVError(
95                    "Need at least columns %s for import!" %
96                    ', '.join(["'%s'" % x for x in req]))
97        # Check for double fields...
98        not_ignored_fields = [x for x in headerfields
99                              if not x.startswith('--')]
100        if len(set(not_ignored_fields)) < len(not_ignored_fields):
101            raise FatalCSVError(
102                "Double headers: each column name may only appear once.")
103        return True
104
105    def applyMapping(self, row, mapping):
106        """Apply mapping to a row of CSV data.
107        """
108        result = dict()
109        for key, replacement in mapping.items():
110            result[replacement] = row[key]
111        return result
112       
113    def getMapping(self, path, headerfields, mode):
114        """Get a mapping from CSV file headerfields to actually used
115           fieldnames.
116        """
117        result = dict()
118        reader = csv.reader(open(path, 'rb'))
119        raw_header = reader.next()
120        for num, field in enumerate(headerfields):
121            if field not in self.location_fields and mode == 'remove':
122                # Ignore non-location fields when removing...
123                field = '--IGNORE--'
124            result[raw_header[num]] = field
125        return result
126
127    def getFieldConverters(self, fieldnames):
128        """Get converters for fieldnames.
129        """
130        result = dict()
131        for key, field in getFields(self.iface).items():
132            if key not in fieldnames:
133                continue
134            converter = ISchemaTypeConverter(field)
135            result[key] = converter
136        return result
137   
138    def convertToTypes(self, row, converter_dict):
139        """Convert values in given row to destination type.
140        """
141        if '--IGNORE--' in row.keys():
142            del row['--IGNORE--']
143        warnings = []
144        for key, value in row.items():
145            converter = converter_dict.get(key, None)
146            if converter:
147                try:
148                    row.update({key:converter_dict[key].fromString(value)})
149                except:
150                    msg = "conversion error: field %s: %r" % (
151                        key, sys.exc_info()[1])
152                    warnings.append(msg)
153        return (row, warnings)
154
155    def callFactory(self, *args, **kw):
156        return createObject(self.factory_name)
157
158    def parentsExist(self, row, site):
159        """Tell whether the parent object for data in ``row`` exists.
160        """
161        raise NotImplementedError('method not implemented')
162
163    def entryExists(self, row, site):
164        """Tell whether there already exists an entry for ``row`` data.
165        """
166        raise NotImplementedError('method not implemented')
167
168    def getParent(self, row, site):
169        """Get the parent object for the entry in ``row``.
170        """
171        raise NotImplementedError('method not implemented')
172   
173    def addEntry(self, obj, row, site):
174        """Add the entry given given by ``row`` data.
175        """
176        raise NotImplementedError('method not implemented')
177
178    def delEntry(self, row, site):
179        """Delete entry given by ``row`` data.
180        """
181        raise NotImplementedError('method not implemented')       
182
183    def updateEntry(self, obj, row, site):
184        for key, value in row.items():
185            setattr(obj, key, value)
186        return
187
188    def createLogfile(self, path, fail_path, num, warnings, mode, user,
189                      timedelta, logger=None):
190        """Write to log file.
191        """
192        if logger is None:
193            return
194        status = 'OK'
195        if warnings > 0:
196            status = 'FAILED'
197        logger.info("-" * 20)
198        logger.info("%s: Batch processing finished: %s" % (user, status))
199        logger.info("%s: Source: %s" % (user, path))
200        logger.info("%s: Mode: %s" % (user, mode))
201        logger.info("%s: User: %s" % (user, user))
202        if warnings > 0:
203            logger.info("%s: Failed datasets: %s" % (
204                    user, os.path.basename(fail_path)))
205        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
206                user, timedelta, timedelta/(num or 1)))
207        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
208                user, num, num - warnings, warnings
209                ))
210        logger.info("-" * 20)
211        return
212
213    def writeFailedRow(self, writer, row, warnings):
214        """Write a row with error messages to error CSV.
215
216        If warnings is a list of strings, they will be concatenated.
217        """
218        error_col = warnings
219        if isinstance(warnings, list):
220            error_col = ' / '.join(warnings)
221        row['--ERRORS--'] = error_col
222        writer.writerow(row)
223        return
224   
225    def doImport(self, path, headerfields, mode='create', user='Unknown',
226                 logger=None):
227        """Perform actual import.
228        """
229        time_start = time.time()
230        self.checkHeaders(headerfields, mode)
231        mapping = self.getMapping(path, headerfields, mode)
232        converters = self.getFieldConverters(headerfields)
233        reader = csv.DictReader(open(path, 'rb'))
234
235        temp_dir = tempfile.mkdtemp()
236       
237        (base, ext) = os.path.splitext(path)
238        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
239        failed_headers = mapping.keys()
240        failed_headers.append('--ERRORS--')
241        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
242                                       failed_headers)
243        first_row = mapping.items()
244        first_row.append(("--ERRORS--", "--ERRORS--"),)
245        failed_writer.writerow(dict(first_row))
246
247        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
248        finished_headers = [x for x in mapping.values()]
249        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
250                                         finished_headers)
251        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
252       
253        num =0
254        num_warns = 0
255        site = grok.getSite()
256        for raw_row in reader:
257            num += 1
258            string_row = self.applyMapping(raw_row, mapping)
259            row, conv_warnings = self.convertToTypes(
260                copy.deepcopy(string_row), converters)
261            if len(conv_warnings):
262                num_warns += 1
263                self.writeFailedRow(failed_writer, raw_row, conv_warnings)
264                continue
265               
266            if mode == 'create':
267                if not self.parentsExist(row, site):
268                    num_warns += 1
269                    self.writeFailedRow(
270                        failed_writer, raw_row,
271                        "Not all parents do exist yet. Skipping")
272                    continue
273                if self.entryExists(row, site):
274                    num_warns += 1
275                    self.writeFailedRow(
276                        failed_writer, raw_row,
277                        "This object already exists. Skipping.")
278                    continue
279                obj = self.callFactory()
280                for key, value in row.items():
281                    setattr(obj, key, value)
282                self.addEntry(obj, row, site)
283            elif mode == 'remove':
284                if not self.entryExists(row, site):
285                    num_warns += 1
286                    self.writeFailedRow(
287                        failed_writer, raw_row,
288                        "Cannot remove: no such entry.")
289                    continue
290                self.delEntry(row, site)
291            elif mode == 'update':
292                obj = self.getEntry(row, site)
293                if obj is None:
294                    num_warns += 1
295                    self.writeFailedRow(
296                        failed_writer, raw_row,
297                        "Cannot update: no such entry.")
298                    continue
299                self.updateEntry(obj, row, site)
300            finished_writer.writerow(string_row)
301
302        time_end = time.time()
303        timedelta = time_end - time_start
304       
305        self.createLogfile(path, failed_path, num, num_warns, mode, user,
306                           timedelta, logger=logger)
307        failed_path = os.path.abspath(failed_path)
308        if num_warns == 0:
309            del failed_writer
310            os.unlink(failed_path)
311            failed_path = None
312        return (num, num_warns,
313                os.path.abspath(finished_path), failed_path)
314
315class FacultyProcessor(BatchProcessor):
316    """A batch processor for IFaculty objects.
317    """
318    grok.provides(IBatchProcessor)
319    grok.context(Interface)
320    util_name = 'facultyimporter'
321    grok.name(util_name)
322
323    name = u'Faculty importer'
324    iface = IFaculty
325
326    location_fields = ['code',]
327    factory_name = 'waeup.Faculty'
328
329    def parentsExist(self, row, site):
330        return 'faculties' in site.keys()
331
332    def entryExists(self, row, site):
333        return row['code'] in site['faculties'].keys()
334
335    def getParent(self, row, site):
336        return site['faculties']
337
338    def getEntry(self, row, site):
339        if not self.entryExists(row, site):
340            return None
341        parent = self.getParent(row, site)
342        return parent.get(row['code'])
343   
344    def addEntry(self, obj, row, site):
345        parent = self.getParent(row, site)
346        parent.addFaculty(obj)
347        return
348
349    def delEntry(self, row, site):
350        parent = self.getParent(row, site)
351        del parent[row['code']]
352        pass
353
354class DepartmentProcessor(BatchProcessor):
355    """A batch processor for IDepartment objects.
356    """
357    grok.provides(IBatchProcessor)
358    grok.context(Interface)
359    util_name = 'departmentimporter'
360    grok.name(util_name)
361
362    name = u'Department importer'
363    iface = IDepartment
364
365    location_fields = ['code', 'faculty_code']
366    factory_name = 'waeup.Department'
367
368    def parentsExist(self, row, site):
369        if not 'faculties' in site.keys():
370            return False
371        return row['faculty_code'] in site['faculties']
372
373    def entryExists(self, row, site):
374        if not self.parentsExist(row, site):
375            return False
376        parent = self.getParent(row, site)
377        return row['code'] in parent.keys()
378
379    def getParent(self, row, site):
380        return site['faculties'][row['faculty_code']]
381
382    def getEntry(self, row, site):
383        if not self.entryExists(row, site):
384            return None
385        parent = self.getParent(row, site)
386        return parent.get(row['code'])
387   
388    def addEntry(self, obj, row, site):
389        parent = self.getParent(row, site)
390        parent.addDepartment(obj)
391        return
392
393    def delEntry(self, row, site):
394        parent = self.getParent(row, site)
395        del parent[row['code']]
396        return
397
398class CourseProcessor(BatchProcessor):
399    """A batch processor for ICourse objects.
400    """
401    grok.provides(IBatchProcessor)
402    grok.context(Interface)
403    util_name = 'courseimporter'
404    grok.name(util_name)
405
406    name = u'Course importer'
407    iface = ICourse
408
409    location_fields = ['code', 'faculty_code', 'department_code']
410    factory_name = 'waeup.Course'
411
412    def parentsExist(self, row, site):
413        if not 'faculties' in site.keys():
414            return False
415        if not row['faculty_code'] in site['faculties'].keys():
416            return False
417        faculty = site['faculties'][row['faculty_code']]
418        return row['department_code'] in faculty.keys()
419
420    def entryExists(self, row, site):
421        if not self.parentsExist(row, site):
422            return False
423        parent = self.getParent(row, site)
424        return row['code'] in parent.keys()
425
426    def getParent(self, row, site):
427        dept = site['faculties'][row['faculty_code']][row['department_code']]
428        return dept.courses
429
430    def getEntry(self, row, site):
431        if not self.entryExists(row, site):
432            return None
433        parent = self.getParent(row, site)
434        return parent.get(row['code'])
435   
436    def addEntry(self, obj, row, site):
437        parent = self.getParent(row, site)
438        parent.addCourse(obj)
439        return
440
441    def delEntry(self, row, site):
442        parent = self.getParent(row, site)
443        del parent[row['code']]
444        return
445
446class CertificateProcessor(BatchProcessor):
447    """A batch processor for ICertificate objects.
448    """
449    grok.provides(IBatchProcessor)
450    grok.context(Interface)
451    util_name = 'certificateimporter'
452    grok.name(util_name)
453
454    name = u'Certificate importer'
455    iface = ICertificate
456
457    location_fields = ['code', 'faculty_code', 'department_code']
458    factory_name = 'waeup.Certificate'
459
460    def parentsExist(self, row, site):
461        if not 'faculties' in site.keys():
462            return False
463        if not row['faculty_code'] in site['faculties'].keys():
464            return False
465        faculty = site['faculties'][row['faculty_code']]
466        return row['department_code'] in faculty.keys()
467
468    def entryExists(self, row, site):
469        if not self.parentsExist(row, site):
470            return False
471        parent = self.getParent(row, site)
472        return row['code'] in parent.keys()
473
474    def getParent(self, row, site):
475        dept = site['faculties'][row['faculty_code']][row['department_code']]
476        return dept.certificates
477
478    def getEntry(self, row, site):
479        if not self.entryExists(row, site):
480            return None
481        parent = self.getParent(row, site)
482        return parent.get(row['code'])
483
484    def addEntry(self, obj, row, site):
485        parent = self.getParent(row, site)
486        parent.addCertificate(obj)
487        return
488
489    def delEntry(self, row, site):
490        parent = self.getParent(row, site)
491        del parent[row['code']]
492        return
493
494class CertificateCourseProcessor(BatchProcessor):
495    """A batch processor for ICertificateCourse objects.
496    """
497    grok.provides(IBatchProcessor)
498    grok.context(Interface)
499    util_name = 'certificatecourseimporter'
500    grok.name(util_name)
501
502    name = u'CertificateCourse importer'
503    iface = ICertificateCourse
504
505    location_fields = ['course', 'level', 'faculty_code', 'department_code',
506                       'certificate_code',]
507    factory_name = 'waeup.CertificateCourse'
508
509    def parentsExist(self, row, site):
510        if not 'faculties' in site.keys():
511            return False
512        if not row['faculty_code'] in site['faculties'].keys():
513            return False
514        faculty = site['faculties'][row['faculty_code']]
515        if not row['department_code'] in faculty.keys():
516            return False
517        dept = faculty[row['department_code']]
518        return row['certificate_code'] in dept.certificates.keys()
519
520    def entryExists(self, row, site):
521        if not self.parentsExist(row, site):
522            return False
523        parent = self.getParent(row, site)
524        code = "%s_%s" % (row['course'].code, row['level'])
525        return code in parent.keys()
526
527    def getParent(self, row, site):
528        dept = site['faculties'][row['faculty_code']][row['department_code']]
529        return dept.certificates[row['certificate_code']]
530
531    def getEntry(self, row, site):
532        if not self.entryExists(row, site):
533            return None
534        parent = self.getParent(row, site)
535        return parent.get(row['course'])
536
537    def addEntry(self, obj, row, site):
538        parent = self.getParent(row, site)
539        parent.addCourseRef(row['course'],
540                            row['level'], row['core_or_elective'])
541        return
542
543    def delEntry(self, row, site):
544        parent = self.getParent(row, site)
545        parent.delCourseRef(row['course'].code, row['level'])
546        return
Note: See TracBrowser for help on using the repository browser.