source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 5008

Last change on this file since 5008 was 5005, checked in by uli, 15 years ago

Fix references to academics stuff interfaces. This is the first step to make academics stuff pluggable.

File size: 18.1 KB
Line 
1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
7import copy
8import csv
9import os
10import sys
11import tempfile
12import time
13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from zope.schema.interfaces import IText
17from waeup.sirp.interfaces import (
18    IBatchProcessor, ISchemaTypeConverter, FatalCSVError)
19from waeup.sirp.university.interfaces import (
20    IFacultyContainer, IFaculty, ICourse, IDepartment, ICertificate,
21    ICertificateCourse)
22
23class BatchProcessor(grok.GlobalUtility):
24    """A processor to add, update, or remove data.
25
26    This is a non-active baseclass.
27    """
28    grok.provides(IBatchProcessor)
29    grok.context(Interface)
30    grok.baseclass()
31
32    # Name used in pages and forms...
33    name = u'Department importer'
34   
35    # Internal name...
36    util_name = 'departmentimporter'
37   
38    # Items for this processor need an interface with zope.schema fields.
39    iface = IDepartment
40   
41    # The name must be the same as the util_name attribute in order to
42    # register this utility correctly.
43    grok.name(util_name)
44
45    # Headers needed to locate items...
46    location_fields = ['code', 'faculty_code']
47   
48    # A factory with this name must be registered...
49    factory_name = 'waeup.Department'
50
51    @property
52    def required_fields(self):
53        """Required fields that have no default.
54
55        A list of names of field, whose value cannot be set if not
56        given during creation. Therefore these fields must exist in
57        input.
58
59        Fields with a default != missing_value do not belong to this
60        category.
61        """
62        result = []
63        for key, field in getFields(self.iface).items():
64            if key in self.location_fields:
65                continue
66            if field.default is not field.missing_value:
67                continue
68            if field.required:
69                result.append(key)
70        return result
71       
72    @property
73    def req(self):
74        result = dict(
75            create = self.location_fields + self.required_fields,
76            update = self.location_fields,
77            remove = self.location_fields,
78        )
79        return result
80
81    @property
82    def available_fields(self):
83        result = []
84        return sorted(list(set(
85                    self.location_fields + getFields(self.iface).keys())))
86   
87    def getHeaders(self, mode='create'):
88        return self.available_fields
89
90    def checkHeaders(self, headerfields, mode='create'):
91        req = self.req[mode]
92        # Check for required fields...
93        for field in req:
94            if not field in headerfields:
95                raise FatalCSVError(
96                    "Need at least columns %s for import!" %
97                    ', '.join(["'%s'" % x for x in req]))
98        # Check for double fields...
99        not_ignored_fields = [x for x in headerfields
100                              if not x.startswith('--')]
101        if len(set(not_ignored_fields)) < len(not_ignored_fields):
102            raise FatalCSVError(
103                "Double headers: each column name may only appear once.")
104        return True
105
106    def applyMapping(self, row, mapping):
107        """Apply mapping to a row of CSV data.
108        """
109        result = dict()
110        for key, replacement in mapping.items():
111            result[replacement] = row[key]
112        return result
113       
114    def getMapping(self, path, headerfields, mode):
115        """Get a mapping from CSV file headerfields to actually used
116           fieldnames.
117        """
118        result = dict()
119        reader = csv.reader(open(path, 'rb'))
120        raw_header = reader.next()
121        for num, field in enumerate(headerfields):
122            if field not in self.location_fields and mode == 'remove':
123                # Ignore non-location fields when removing...
124                field = '--IGNORE--'
125            result[raw_header[num]] = field
126        return result
127
128    def getFieldConverters(self, fieldnames):
129        """Get converters for fieldnames.
130        """
131        result = dict()
132        for key, field in getFields(self.iface).items():
133            if key not in fieldnames:
134                continue
135            converter = ISchemaTypeConverter(field)
136            result[key] = converter
137        return result
138   
139    def convertToTypes(self, row, converter_dict):
140        """Convert values in given row to destination type.
141        """
142        if '--IGNORE--' in row.keys():
143            del row['--IGNORE--']
144        warnings = []
145        for key, value in row.items():
146            converter = converter_dict.get(key, None)
147            if converter:
148                try:
149                    row.update({key:converter_dict[key].fromString(value)})
150                except:
151                    msg = "conversion error: field %s: %s %r" % (
152                        key, sys.exc_info()[0], sys.exc_info()[1])
153                    warnings.append(msg)
154        return (row, warnings)
155
156    def callFactory(self, *args, **kw):
157        return createObject(self.factory_name)
158
159    def parentsExist(self, row, site):
160        """Tell whether the parent object for data in ``row`` exists.
161        """
162        raise NotImplementedError('method not implemented')
163
164    def entryExists(self, row, site):
165        """Tell whether there already exists an entry for ``row`` data.
166        """
167        raise NotImplementedError('method not implemented')
168
169    def getParent(self, row, site):
170        """Get the parent object for the entry in ``row``.
171        """
172        raise NotImplementedError('method not implemented')
173   
174    def addEntry(self, obj, row, site):
175        """Add the entry given given by ``row`` data.
176        """
177        raise NotImplementedError('method not implemented')
178
179    def delEntry(self, row, site):
180        """Delete entry given by ``row`` data.
181        """
182        raise NotImplementedError('method not implemented')       
183
184    def updateEntry(self, obj, row, site):
185        """Update obj to the values given in row.
186        """
187        for key, value in row.items():
188            setattr(obj, key, value)
189        return
190
191    def createLogfile(self, path, fail_path, num, warnings, mode, user,
192                      timedelta, logger=None):
193        """Write to log file.
194        """
195        if logger is None:
196            return
197        status = 'OK'
198        if warnings > 0:
199            status = 'FAILED'
200        logger.info("-" * 20)
201        logger.info("%s: Batch processing finished: %s" % (user, status))
202        logger.info("%s: Source: %s" % (user, path))
203        logger.info("%s: Mode: %s" % (user, mode))
204        logger.info("%s: User: %s" % (user, user))
205        if warnings > 0:
206            logger.info("%s: Failed datasets: %s" % (
207                    user, os.path.basename(fail_path)))
208        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
209                user, timedelta, timedelta/(num or 1)))
210        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
211                user, num, num - warnings, warnings
212                ))
213        logger.info("-" * 20)
214        return
215
216    def writeFailedRow(self, writer, row, warnings):
217        """Write a row with error messages to error CSV.
218
219        If warnings is a list of strings, they will be concatenated.
220        """
221        error_col = warnings
222        if isinstance(warnings, list):
223            error_col = ' / '.join(warnings)
224        row['--ERRORS--'] = error_col
225        writer.writerow(row)
226        return
227   
228    def doImport(self, path, headerfields, mode='create', user='Unknown',
229                 logger=None):
230        """Perform actual import.
231        """
232        time_start = time.time()
233        self.checkHeaders(headerfields, mode)
234        mapping = self.getMapping(path, headerfields, mode)
235        converters = self.getFieldConverters(headerfields)
236        reader = csv.DictReader(open(path, 'rb'))
237
238        temp_dir = tempfile.mkdtemp()
239       
240        (base, ext) = os.path.splitext(path)
241        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
242        failed_headers = mapping.keys()
243        failed_headers.append('--ERRORS--')
244        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
245                                       failed_headers)
246        first_row = mapping.items()
247        first_row.append(("--ERRORS--", "--ERRORS--"),)
248        failed_writer.writerow(dict(first_row))
249
250        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
251        finished_headers = [x for x in mapping.values()]
252        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
253                                         finished_headers)
254        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
255       
256        num =0
257        num_warns = 0
258        site = grok.getSite()
259        for raw_row in reader:
260            num += 1
261            string_row = self.applyMapping(raw_row, mapping)
262            row, conv_warnings = self.convertToTypes(
263                copy.deepcopy(string_row), converters)
264            if len(conv_warnings):
265                num_warns += 1
266                self.writeFailedRow(failed_writer, raw_row, conv_warnings)
267                continue
268               
269            if mode == 'create':
270                if not self.parentsExist(row, site):
271                    num_warns += 1
272                    self.writeFailedRow(
273                        failed_writer, raw_row,
274                        "Not all parents do exist yet. Skipping")
275                    continue
276                if self.entryExists(row, site):
277                    num_warns += 1
278                    self.writeFailedRow(
279                        failed_writer, raw_row,
280                        "This object already exists. Skipping.")
281                    continue
282                obj = self.callFactory()
283                for key, value in row.items():
284                    setattr(obj, key, value)
285                self.addEntry(obj, row, site)
286            elif mode == 'remove':
287                if not self.entryExists(row, site):
288                    num_warns += 1
289                    self.writeFailedRow(
290                        failed_writer, raw_row,
291                        "Cannot remove: no such entry.")
292                    continue
293                self.delEntry(row, site)
294            elif mode == 'update':
295                obj = self.getEntry(row, site)
296                if obj is None:
297                    num_warns += 1
298                    self.writeFailedRow(
299                        failed_writer, raw_row,
300                        "Cannot update: no such entry.")
301                    continue
302                self.updateEntry(obj, row, site)
303            finished_writer.writerow(string_row)
304
305        time_end = time.time()
306        timedelta = time_end - time_start
307       
308        self.createLogfile(path, failed_path, num, num_warns, mode, user,
309                           timedelta, logger=logger)
310        failed_path = os.path.abspath(failed_path)
311        if num_warns == 0:
312            del failed_writer
313            os.unlink(failed_path)
314            failed_path = None
315        return (num, num_warns,
316                os.path.abspath(finished_path), failed_path)
317
318class FacultyProcessor(BatchProcessor):
319    """A batch processor for IFaculty objects.
320    """
321    grok.provides(IBatchProcessor)
322    grok.context(Interface)
323    util_name = 'facultyimporter'
324    grok.name(util_name)
325
326    name = u'Faculty importer'
327    iface = IFaculty
328
329    location_fields = ['code',]
330    factory_name = 'waeup.Faculty'
331
332    def parentsExist(self, row, site):
333        return 'faculties' in site.keys()
334
335    def entryExists(self, row, site):
336        return row['code'] in site['faculties'].keys()
337
338    def getParent(self, row, site):
339        return site['faculties']
340
341    def getEntry(self, row, site):
342        if not self.entryExists(row, site):
343            return None
344        parent = self.getParent(row, site)
345        return parent.get(row['code'])
346   
347    def addEntry(self, obj, row, site):
348        parent = self.getParent(row, site)
349        parent.addFaculty(obj)
350        return
351
352    def delEntry(self, row, site):
353        parent = self.getParent(row, site)
354        del parent[row['code']]
355        pass
356
357class DepartmentProcessor(BatchProcessor):
358    """A batch processor for IDepartment objects.
359    """
360    grok.provides(IBatchProcessor)
361    grok.context(Interface)
362    util_name = 'departmentimporter'
363    grok.name(util_name)
364
365    name = u'Department importer'
366    iface = IDepartment
367
368    location_fields = ['code', 'faculty_code']
369    factory_name = 'waeup.Department'
370
371    def parentsExist(self, row, site):
372        if not 'faculties' in site.keys():
373            return False
374        return row['faculty_code'] in site['faculties']
375
376    def entryExists(self, row, site):
377        if not self.parentsExist(row, site):
378            return False
379        parent = self.getParent(row, site)
380        return row['code'] in parent.keys()
381
382    def getParent(self, row, site):
383        return site['faculties'][row['faculty_code']]
384
385    def getEntry(self, row, site):
386        if not self.entryExists(row, site):
387            return None
388        parent = self.getParent(row, site)
389        return parent.get(row['code'])
390   
391    def addEntry(self, obj, row, site):
392        parent = self.getParent(row, site)
393        parent.addDepartment(obj)
394        return
395
396    def delEntry(self, row, site):
397        parent = self.getParent(row, site)
398        del parent[row['code']]
399        return
400
401class CourseProcessor(BatchProcessor):
402    """A batch processor for ICourse objects.
403    """
404    grok.provides(IBatchProcessor)
405    grok.context(Interface)
406    util_name = 'courseimporter'
407    grok.name(util_name)
408
409    name = u'Course importer'
410    iface = ICourse
411
412    location_fields = ['code', 'faculty_code', 'department_code']
413    factory_name = 'waeup.Course'
414
415    def parentsExist(self, row, site):
416        if not 'faculties' in site.keys():
417            return False
418        if not row['faculty_code'] in site['faculties'].keys():
419            return False
420        faculty = site['faculties'][row['faculty_code']]
421        return row['department_code'] in faculty.keys()
422
423    def entryExists(self, row, site):
424        if not self.parentsExist(row, site):
425            return False
426        parent = self.getParent(row, site)
427        return row['code'] in parent.keys()
428
429    def getParent(self, row, site):
430        dept = site['faculties'][row['faculty_code']][row['department_code']]
431        return dept.courses
432
433    def getEntry(self, row, site):
434        if not self.entryExists(row, site):
435            return None
436        parent = self.getParent(row, site)
437        return parent.get(row['code'])
438   
439    def addEntry(self, obj, row, site):
440        parent = self.getParent(row, site)
441        parent.addCourse(obj)
442        return
443
444    def delEntry(self, row, site):
445        parent = self.getParent(row, site)
446        del parent[row['code']]
447        return
448
449class CertificateProcessor(BatchProcessor):
450    """A batch processor for ICertificate objects.
451    """
452    grok.provides(IBatchProcessor)
453    grok.context(Interface)
454    util_name = 'certificateimporter'
455    grok.name(util_name)
456
457    name = u'Certificate importer'
458    iface = ICertificate
459
460    location_fields = ['code', 'faculty_code', 'department_code']
461    factory_name = 'waeup.Certificate'
462
463    def parentsExist(self, row, site):
464        if not 'faculties' in site.keys():
465            return False
466        if not row['faculty_code'] in site['faculties'].keys():
467            return False
468        faculty = site['faculties'][row['faculty_code']]
469        return row['department_code'] in faculty.keys()
470
471    def entryExists(self, row, site):
472        if not self.parentsExist(row, site):
473            return False
474        parent = self.getParent(row, site)
475        return row['code'] in parent.keys()
476
477    def getParent(self, row, site):
478        dept = site['faculties'][row['faculty_code']][row['department_code']]
479        return dept.certificates
480
481    def getEntry(self, row, site):
482        if not self.entryExists(row, site):
483            return None
484        parent = self.getParent(row, site)
485        return parent.get(row['code'])
486
487    def addEntry(self, obj, row, site):
488        parent = self.getParent(row, site)
489        parent.addCertificate(obj)
490        return
491
492    def delEntry(self, row, site):
493        parent = self.getParent(row, site)
494        del parent[row['code']]
495        return
496
497class CertificateCourseProcessor(BatchProcessor):
498    """A batch processor for ICertificateCourse objects.
499    """
500    grok.provides(IBatchProcessor)
501    grok.context(Interface)
502    util_name = 'certificatecourseimporter'
503    grok.name(util_name)
504
505    name = u'CertificateCourse importer'
506    iface = ICertificateCourse
507
508    location_fields = ['course', 'level', 'faculty_code', 'department_code',
509                       'certificate_code',]
510    factory_name = 'waeup.CertificateCourse'
511
512    def parentsExist(self, row, site):
513        if not 'faculties' in site.keys():
514            return False
515        if not row['faculty_code'] in site['faculties'].keys():
516            return False
517        faculty = site['faculties'][row['faculty_code']]
518        if not row['department_code'] in faculty.keys():
519            return False
520        dept = faculty[row['department_code']]
521        return row['certificate_code'] in dept.certificates.keys()
522
523    def entryExists(self, row, site):
524        if not self.parentsExist(row, site):
525            return False
526        parent = self.getParent(row, site)
527        code = "%s_%s" % (row['course'].code, row['level'])
528        return code in parent.keys()
529
530    def getParent(self, row, site):
531        dept = site['faculties'][row['faculty_code']][row['department_code']]
532        return dept.certificates[row['certificate_code']]
533
534    def getEntry(self, row, site):
535        if not self.entryExists(row, site):
536            return None
537        parent = self.getParent(row, site)
538        return parent.get(row['course'])
539
540    def addEntry(self, obj, row, site):
541        parent = self.getParent(row, site)
542        parent.addCourseRef(row['course'],
543                            row['level'], row['core_or_elective'])
544        return
545
546    def delEntry(self, row, site):
547        parent = self.getParent(row, site)
548        parent.delCourseRef(row['course'].code, row['level'])
549        return
Note: See TracBrowser for help on using the repository browser.