source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9030

Last change on this file since 9030 was 8905, checked in by Henrik Bettermann, 12 years ago

Sett file permission flags of temporarily merged finished file properly before moving to final location.

  • Property svn:keywords set to Id
File size: 17.4 KB
Line 
1## $Id: batching.py 8905 2012-07-04 07:28:09Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import tempfile
28import time
29from cStringIO import StringIO
30from zope.component import createObject
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.event import notify
34from waeup.kofa.interfaces import (
35    IBatchProcessor, FatalCSVError, IObjectConverter,
36    ICSVExporter, IGNORE_MARKER, DuplicationError)
37
38class BatchProcessor(grok.GlobalUtility):
39    """A processor to add, update, or remove data.
40
41    This is a non-active baseclass.
42    """
43    grok.implements(IBatchProcessor)
44    grok.context(Interface)
45    grok.baseclass()
46
47    # Name used in pages and forms...
48    name = u'Non-registered base processor'
49
50    # Internal name...
51    util_name = 'baseprocessor'
52
53    # Items for this processor need an interface with zope.schema fields.
54    iface = Interface
55
56    # The name must be the same as the util_name attribute in order to
57    # register this utility correctly.
58    grok.name(util_name)
59
60    # Headers needed to locate items...
61    location_fields = ['code', 'faculty_code']
62
63    # A factory with this name must be registered...
64    factory_name = 'waeup.Department'
65
66    @property
67    def required_fields(self):
68        """Required fields that have no default.
69
70        A list of names of field, whose value cannot be set if not
71        given during creation. Therefore these fields must exist in
72        input.
73
74        Fields with a default != missing_value do not belong to this
75        category.
76        """
77        result = []
78        for key, field in getFields(self.iface).items():
79            if key in self.location_fields:
80                continue
81            if field.default is not field.missing_value:
82                continue
83            if field.required:
84                result.append(key)
85        return result
86
87    @property
88    def req(self):
89        result = dict(
90            create = self.location_fields + self.required_fields,
91            update = self.location_fields,
92            remove = self.location_fields,
93        )
94        return result
95
96    @property
97    def available_fields(self):
98        return sorted(list(set(
99                    self.location_fields + getFields(self.iface).keys())))
100
101    def getHeaders(self, mode='create'):
102        return self.available_fields
103
104    def checkHeaders(self, headerfields, mode='create'):
105        req = self.req[mode]
106        # Check for required fields...
107        for field in req:
108            if not field in headerfields:
109                raise FatalCSVError(
110                    "Need at least columns %s for import!" %
111                    ', '.join(["'%s'" % x for x in req]))
112        # Check for double fields. Cannot happen because this error is
113        # already catched in views
114        not_ignored_fields = [x for x in headerfields
115                              if not x.startswith('--')]
116        if len(set(not_ignored_fields)) < len(not_ignored_fields):
117            raise FatalCSVError(
118                "Double headers: each column name may only appear once.")
119        return True
120
121    def applyMapping(self, row, mapping):
122        """Apply mapping to a row of CSV data.
123
124        """
125        result = dict()
126        for key, replacement in mapping.items():
127            if replacement == u'--IGNORE--':
128                # Skip ignored columns in failed and finished data files.
129                continue
130            result[replacement] = row[key]
131        return result
132
133    def getMapping(self, path, headerfields, mode):
134        """Get a mapping from CSV file headerfields to actually used fieldnames.
135
136        """
137        result = dict()
138        reader = csv.reader(open(path, 'rb'))
139        raw_header = reader.next()
140        for num, field in enumerate(headerfields):
141            if field not in self.location_fields and mode == 'remove':
142                # Skip non-location fields when removing.
143                continue
144            if field == u'--IGNORE--':
145                # Skip ignored columns in failed and finished data files.
146                continue
147            result[raw_header[num]] = field
148        return result
149
150    def stringFromErrs(self, errors, inv_errors):
151        result = []
152        for err in errors:
153            fieldname, message = err
154            result.append("%s: %s" % (fieldname, message))
155        for err in inv_errors:
156            result.append("invariant: %s" % err)
157        return '; '.join(result)
158
159    def callFactory(self, *args, **kw):
160        return createObject(self.factory_name)
161
162    def parentsExist(self, row, site):
163        """Tell whether the parent object for data in ``row`` exists.
164        """
165        raise NotImplementedError('method not implemented')
166
167    def entryExists(self, row, site):
168        """Tell whether there already exists an entry for ``row`` data.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def getParent(self, row, site):
173        """Get the parent object for the entry in ``row``.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getEntry(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def addEntry(self, obj, row, site):
183        """Add the entry given given by ``row`` data.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def delEntry(self, row, site):
188        """Delete entry given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def checkUpdateRequirements(self, obj, row, site):
193        """Checks requirements the object must fulfill when being updated.
194
195        This method is not used in case of deleting or adding objects.
196
197        Returns error messages as strings in case of requirement
198        problems.
199        """
200        return None
201
202    def updateEntry(self, obj, row, site):
203        """Update obj to the values given in row.
204
205        Returns a string describing the fields changed.
206        """
207        changed = []
208        for key, value in row.items():
209            # Skip fields to be ignored.
210            if value == IGNORE_MARKER:
211                continue
212            # Skip fields not declared in interface and which are
213            # not yet attributes of existing objects. We can thus not
214            # add non-existing attributes here.
215            if not hasattr(obj, key):
216                continue
217            setattr(obj, key, value)
218            log_value = getattr(value, 'code', value)
219            changed.append('%s=%s' % (key, log_value))
220
221        # If any catalog is involved it must be updated.
222        #
223        # XXX: The event is also triggered when creating objects as
224        # updateEntry is called also when creating entries resulting
225        # in objectAdded and additional objectModified events.
226        if len(changed):
227            notify(grok.ObjectModifiedEvent(obj))
228
229        return ', '.join(changed)
230
231    def createLogfile(self, path, fail_path, num, warnings, mode, user,
232                      timedelta, logger=None):
233        """Write to log file.
234        """
235        if logger is None:
236            return
237        status = 'OK'
238        if warnings > 0:
239            status = 'FAILED'
240        logger.info("-" * 20)
241        logger.info("%s: Batch processing finished: %s" % (user, status))
242        logger.info("%s: Source: %s" % (user, path))
243        logger.info("%s: Mode: %s" % (user, mode))
244        logger.info("%s: User: %s" % (user, user))
245        if warnings > 0:
246            logger.info("%s: Failed datasets: %s" % (
247                    user, os.path.basename(fail_path)))
248        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
249                user, timedelta, timedelta/(num or 1)))
250        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
251                user, num, num - warnings, warnings
252                ))
253        logger.info("-" * 20)
254        return
255
256    def writeFailedRow(self, writer, row, warnings):
257        """Write a row with error messages to error CSV.
258
259        If warnings is a list of strings, they will be concatenated.
260        """
261        error_col = warnings
262        if isinstance(warnings, list):
263            error_col = ' / '.join(warnings)
264        row['--ERRORS--'] = error_col
265        writer.writerow(row)
266        return
267
268    def checkConversion(self, row, mode='ignore', ignore_empty=True):
269        """Validates all values in row.
270        """
271        converter = IObjectConverter(self.iface)
272        errs, inv_errs, conv_dict =  converter.fromStringDict(
273            row, self.factory_name, mode=mode)
274        return errs, inv_errs, conv_dict
275
276    def doImport(self, path, headerfields, mode='create', user='Unknown',
277                 logger=None, ignore_empty=True):
278        """Perform actual import.
279        """
280        time_start = time.time()
281        self.checkHeaders(headerfields, mode)
282        mapping = self.getMapping(path, headerfields, mode)
283        reader = csv.DictReader(open(path, 'rb'))
284
285        temp_dir = tempfile.mkdtemp()
286
287        base = os.path.basename(path)
288        (base, ext) = os.path.splitext(base)
289        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
290        failed_headers = mapping.values()
291        failed_headers.append('--ERRORS--')
292        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
293                                       failed_headers)
294        os.chmod(failed_path, 0664)
295        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
296
297        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
298        finished_headers = mapping.values()
299        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
300                                         finished_headers)
301        os.chmod(finished_path, 0664)
302        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
303
304        num =0
305        num_warns = 0
306        site = grok.getSite()
307
308        for raw_row in reader:
309            num += 1
310            string_row = self.applyMapping(raw_row, mapping)
311            if ignore_empty and mode in ('update',):
312                # replace empty strings with ignore-markers
313                for key, val in string_row.items():
314                    if val == '':
315                        string_row[key] = IGNORE_MARKER
316            row = dict(string_row.items()) # create deep copy
317            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
318            if errs or inv_errs:
319                num_warns += 1
320                conv_warnings = self.stringFromErrs(errs, inv_errs)
321                self.writeFailedRow(
322                    failed_writer, string_row, conv_warnings)
323                continue
324            row.update(conv_dict)
325
326            if mode == 'create':
327                if not self.parentsExist(row, site):
328                    num_warns += 1
329                    self.writeFailedRow(
330                        failed_writer, string_row,
331                        "Not all parents do exist yet. Skipping")
332                    continue
333                if self.entryExists(row, site):
334                    num_warns += 1
335                    self.writeFailedRow(
336                        failed_writer, string_row,
337                        "This object already exists. Skipping.")
338                    continue
339                obj = self.callFactory()
340                # Override all values in row, also
341                # student_ids and applicant_ids which have been
342                # generated in the respective __init__ methods before.
343                self.updateEntry(obj, row, site)
344                try:
345                    self.addEntry(obj, row, site)
346                except KeyError, error:
347                    num_warns += 1
348                    self.writeFailedRow(
349                        failed_writer, string_row,
350                        "%s Skipping." % error.message)
351                    continue
352                except DuplicationError, error:
353                    num_warns += 1
354                    self.writeFailedRow(
355                        failed_writer, string_row,
356                        "%s Skipping." % error.msg)
357                    continue
358            elif mode == 'remove':
359                if not self.entryExists(row, site):
360                    num_warns += 1
361                    self.writeFailedRow(
362                        failed_writer, string_row,
363                        "Cannot remove: no such entry.")
364                    continue
365                self.delEntry(row, site)
366            elif mode == 'update':
367                obj = self.getEntry(row, site)
368                if obj is None:
369                    num_warns += 1
370                    self.writeFailedRow(
371                        failed_writer, string_row,
372                        "Cannot update: no such entry.")
373                    continue
374                update_errors = self.checkUpdateRequirements(obj, row, site)
375                if update_errors is not None:
376                    num_warns += 1
377                    self.writeFailedRow(
378                        failed_writer, string_row, update_errors)
379                    continue
380                self.updateEntry(obj, row, site)
381            finished_writer.writerow(string_row)
382
383        time_end = time.time()
384        timedelta = time_end - time_start
385
386        self.createLogfile(path, failed_path, num, num_warns, mode, user,
387                           timedelta, logger=logger)
388        failed_path = os.path.abspath(failed_path)
389        if num_warns == 0:
390            del failed_writer
391            os.unlink(failed_path)
392            failed_path = None
393        return (num, num_warns,
394                os.path.abspath(finished_path), failed_path)
395
396class ExporterBase(object):
397    """A base for exporters.
398    """
399    grok.implements(ICSVExporter)
400
401    #: Fieldnames considered by this exporter
402    fields = ('code', 'title', 'title_prefix')
403
404    #: The title under which this exporter will be displayed
405    #: (if registered as a utility)
406    title = 'Override this title'
407
408    def mangle_value(self, value, name, context=None):
409        """Hook for mangling values in derived classes
410        """
411        if isinstance(value, bool):
412            value = value and '1' or '0'
413        elif isinstance(value, unicode):
414            # CSV writers like byte streams better than unicode
415            value = value.encode('utf-8')
416        elif isinstance(value, datetime.datetime):
417            value = str(value)
418        elif isinstance(value, datetime.date):
419            # Order is important here: check for date after datetime as
420            # datetimes are also dates.
421            #
422            # Append hash '#' to dates to circumvent unwanted excel automatic
423            value = str('%s#' % value)
424        elif value is None:
425            # None is not really representable in CSV files
426            value = ''
427        return value
428
429    def get_csv_writer(self, filepath=None):
430        """Get a CSV dict writer instance open for writing.
431
432        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
433        :class:`csv.DictWriter` instance and outfile is the real file
434        which is written to. The latter is important when writing to
435        StringIO and can normally be ignored otherwise.
436
437        The returned file will already be filled with the header row.
438
439        Please note that if you give a filepath, the returned outfile
440        is open for writing only and you might have to close it before
441        reopening it for reading.
442        """
443        if filepath is None:
444            outfile = StringIO()
445        else:
446            outfile = open(filepath, 'wb')
447        writer = csv.DictWriter(outfile, self.fields)
448        writer.writerow(dict(zip(self.fields, self.fields))) # header
449        return writer, outfile
450
451    def write_item(self, obj, writer):
452        """Write a row extracted from `obj` into CSV file using `writer`.
453        """
454        row = {}
455        for name in self.fields:
456            value = getattr(obj, name, None)
457            value = self.mangle_value(value, name, obj)
458            row[name] = value
459        writer.writerow(row)
460        return
461
462    def close_outfile(self, filepath, outfile):
463        """Close outfile.
464
465        If filepath is None, the contents of outfile is returned.
466        """
467        outfile.seek(0)
468        if filepath is None:
469            return outfile.read()
470        outfile.close()
471        return
472
473    def export(self, iterable, filepath=None):
474        """Export `iterable` as CSV file.
475
476        If `filepath` is ``None``, a raw string with CSV data should
477        be returned.
478        """
479        raise NotImplementedError
480
481    def export_all(self, site, filepath=None):
482        """Export all appropriate objects in `site` into `filepath` as
483        CSV data.
484
485        If `filepath` is ``None``, a raw string with CSV data should
486        be returned.
487        """
488        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.