source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8786

Last change on this file since 8786 was 8573, checked in by Henrik Bettermann, 12 years ago

Perform chmod after uplpoding files into datacenter, creating media files or creating pending files in datacenter.

  • Property svn:keywords set to Id
File size: 17.4 KB
Line 
1## $Id: batching.py 8573 2012-05-31 14:16:51Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import tempfile
28import time
29from cStringIO import StringIO
30from zope.component import createObject
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.event import notify
34from waeup.kofa.interfaces import (
35    IBatchProcessor, FatalCSVError, IObjectConverter,
36    ICSVExporter, IGNORE_MARKER, DuplicationError)
37
38class BatchProcessor(grok.GlobalUtility):
39    """A processor to add, update, or remove data.
40
41    This is a non-active baseclass.
42    """
43    grok.implements(IBatchProcessor)
44    grok.context(Interface)
45    grok.baseclass()
46
47    # Name used in pages and forms...
48    name = u'Non-registered base processor'
49
50    # Internal name...
51    util_name = 'baseprocessor'
52
53    # Items for this processor need an interface with zope.schema fields.
54    iface = Interface
55
56    # The name must be the same as the util_name attribute in order to
57    # register this utility correctly.
58    grok.name(util_name)
59
60    # Headers needed to locate items...
61    location_fields = ['code', 'faculty_code']
62
63    # A factory with this name must be registered...
64    factory_name = 'waeup.Department'
65
66    @property
67    def required_fields(self):
68        """Required fields that have no default.
69
70        A list of names of field, whose value cannot be set if not
71        given during creation. Therefore these fields must exist in
72        input.
73
74        Fields with a default != missing_value do not belong to this
75        category.
76        """
77        result = []
78        for key, field in getFields(self.iface).items():
79            if key in self.location_fields:
80                continue
81            if field.default is not field.missing_value:
82                continue
83            if field.required:
84                result.append(key)
85        return result
86
87    @property
88    def req(self):
89        result = dict(
90            create = self.location_fields + self.required_fields,
91            update = self.location_fields,
92            remove = self.location_fields,
93        )
94        return result
95
96    @property
97    def available_fields(self):
98        return sorted(list(set(
99                    self.location_fields + getFields(self.iface).keys())))
100
101    def getHeaders(self, mode='create'):
102        return self.available_fields
103
104    def checkHeaders(self, headerfields, mode='create'):
105        req = self.req[mode]
106        # Check for required fields...
107        for field in req:
108            if not field in headerfields:
109                raise FatalCSVError(
110                    "Need at least columns %s for import!" %
111                    ', '.join(["'%s'" % x for x in req]))
112        # Check for double fields. Cannot happen because this error is
113        # already catched in views
114        not_ignored_fields = [x for x in headerfields
115                              if not x.startswith('--')]
116        if len(set(not_ignored_fields)) < len(not_ignored_fields):
117            raise FatalCSVError(
118                "Double headers: each column name may only appear once.")
119        return True
120
121    def applyMapping(self, row, mapping):
122        """Apply mapping to a row of CSV data.
123
124        """
125        result = dict()
126        for key, replacement in mapping.items():
127            if replacement == u'--IGNORE--':
128                # Skip ignored columns in failed and finished data files.
129                continue
130            result[replacement] = row[key]
131        return result
132
133    def getMapping(self, path, headerfields, mode):
134        """Get a mapping from CSV file headerfields to actually used fieldnames.
135
136        """
137        result = dict()
138        reader = csv.reader(open(path, 'rb'))
139        raw_header = reader.next()
140        for num, field in enumerate(headerfields):
141            if field not in self.location_fields and mode == 'remove':
142                # Skip non-location fields when removing.
143                continue
144            if field == u'--IGNORE--':
145                # Skip ignored columns in failed and finished data files.
146                continue
147            result[raw_header[num]] = field
148        return result
149
150    def stringFromErrs(self, errors, inv_errors):
151        result = []
152        for err in errors:
153            fieldname, message = err
154            result.append("%s: %s" % (fieldname, message))
155        for err in inv_errors:
156            result.append("invariant: %s" % err)
157        return '; '.join(result)
158
159    def callFactory(self, *args, **kw):
160        return createObject(self.factory_name)
161
162    def parentsExist(self, row, site):
163        """Tell whether the parent object for data in ``row`` exists.
164        """
165        raise NotImplementedError('method not implemented')
166
167    def entryExists(self, row, site):
168        """Tell whether there already exists an entry for ``row`` data.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def getParent(self, row, site):
173        """Get the parent object for the entry in ``row``.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getEntry(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def addEntry(self, obj, row, site):
183        """Add the entry given given by ``row`` data.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def delEntry(self, row, site):
188        """Delete entry given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def checkUpdateRequirements(self, obj, row, site):
193        """Checks requirements the object must fulfill when being updated.
194
195        This method is not used in case of deleting or adding objects.
196
197        Returns error messages as strings in case of requirement
198        problems.
199        """
200        return None
201
202    def updateEntry(self, obj, row, site):
203        """Update obj to the values given in row.
204
205        Returns a string describing the fields changed.
206        """
207        changed = []
208        for key, value in row.items():
209            # Skip fields to be ignored.
210            if value == IGNORE_MARKER:
211                continue
212            # Skip fields not declared in interface and which are
213            # not yet attributes of existing objects. We can thus not
214            # add non-existing attributes here.
215            if not hasattr(obj, key):
216                continue
217            setattr(obj, key, value)
218            log_value = getattr(value, 'code', value)
219            changed.append('%s=%s' % (key, log_value))
220
221        # If any catalog is involved it must be updated.
222        #
223        # XXX: The event is also triggered when creating objects as
224        # updateEntry is called also when creating entries resulting
225        # in objectAdded and additional objectModified events.
226        if len(changed):
227            notify(grok.ObjectModifiedEvent(obj))
228
229        return ', '.join(changed)
230
231    def createLogfile(self, path, fail_path, num, warnings, mode, user,
232                      timedelta, logger=None):
233        """Write to log file.
234        """
235        if logger is None:
236            return
237        status = 'OK'
238        if warnings > 0:
239            status = 'FAILED'
240        logger.info("-" * 20)
241        logger.info("%s: Batch processing finished: %s" % (user, status))
242        logger.info("%s: Source: %s" % (user, path))
243        logger.info("%s: Mode: %s" % (user, mode))
244        logger.info("%s: User: %s" % (user, user))
245        if warnings > 0:
246            logger.info("%s: Failed datasets: %s" % (
247                    user, os.path.basename(fail_path)))
248        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
249                user, timedelta, timedelta/(num or 1)))
250        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
251                user, num, num - warnings, warnings
252                ))
253        logger.info("-" * 20)
254        return
255
256    def writeFailedRow(self, writer, row, warnings):
257        """Write a row with error messages to error CSV.
258
259        If warnings is a list of strings, they will be concatenated.
260        """
261        error_col = warnings
262        if isinstance(warnings, list):
263            error_col = ' / '.join(warnings)
264        row['--ERRORS--'] = error_col
265        writer.writerow(row)
266        return
267
268    def checkConversion(self, row, mode='ignore', ignore_empty=True):
269        """Validates all values in row.
270        """
271        converter = IObjectConverter(self.iface)
272        errs, inv_errs, conv_dict =  converter.fromStringDict(
273            row, self.factory_name, mode=mode)
274        return errs, inv_errs, conv_dict
275
276    def doImport(self, path, headerfields, mode='create', user='Unknown',
277                 logger=None, ignore_empty=True):
278        """Perform actual import.
279        """
280        time_start = time.time()
281        self.checkHeaders(headerfields, mode)
282        mapping = self.getMapping(path, headerfields, mode)
283        reader = csv.DictReader(open(path, 'rb'))
284
285        temp_dir = tempfile.mkdtemp()
286
287        base = os.path.basename(path)
288        (base, ext) = os.path.splitext(base)
289        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
290        failed_headers = mapping.values()
291        failed_headers.append('--ERRORS--')
292        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
293                                       failed_headers)
294        os.chmod(failed_path, 0664)
295        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
296
297        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
298        finished_headers = mapping.values()
299        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
300                                         finished_headers)
301        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
302
303        num =0
304        num_warns = 0
305        site = grok.getSite()
306
307        for raw_row in reader:
308            num += 1
309            string_row = self.applyMapping(raw_row, mapping)
310            if ignore_empty and mode in ('update',):
311                # replace empty strings with ignore-markers
312                for key, val in string_row.items():
313                    if val == '':
314                        string_row[key] = IGNORE_MARKER
315            row = dict(string_row.items()) # create deep copy
316            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
317            if errs or inv_errs:
318                num_warns += 1
319                conv_warnings = self.stringFromErrs(errs, inv_errs)
320                self.writeFailedRow(
321                    failed_writer, string_row, conv_warnings)
322                continue
323            row.update(conv_dict)
324
325            if mode == 'create':
326                if not self.parentsExist(row, site):
327                    num_warns += 1
328                    self.writeFailedRow(
329                        failed_writer, string_row,
330                        "Not all parents do exist yet. Skipping")
331                    continue
332                if self.entryExists(row, site):
333                    num_warns += 1
334                    self.writeFailedRow(
335                        failed_writer, string_row,
336                        "This object already exists. Skipping.")
337                    continue
338                obj = self.callFactory()
339                # Override all values in row, also
340                # student_ids and applicant_ids which have been
341                # generated in the respective __init__ methods before.
342                self.updateEntry(obj, row, site)
343                try:
344                    self.addEntry(obj, row, site)
345                except KeyError, error:
346                    num_warns += 1
347                    self.writeFailedRow(
348                        failed_writer, string_row,
349                        "%s Skipping." % error.message)
350                    continue
351                except DuplicationError, error:
352                    num_warns += 1
353                    self.writeFailedRow(
354                        failed_writer, string_row,
355                        "%s Skipping." % error.msg)
356                    continue
357            elif mode == 'remove':
358                if not self.entryExists(row, site):
359                    num_warns += 1
360                    self.writeFailedRow(
361                        failed_writer, string_row,
362                        "Cannot remove: no such entry.")
363                    continue
364                self.delEntry(row, site)
365            elif mode == 'update':
366                obj = self.getEntry(row, site)
367                if obj is None:
368                    num_warns += 1
369                    self.writeFailedRow(
370                        failed_writer, string_row,
371                        "Cannot update: no such entry.")
372                    continue
373                update_errors = self.checkUpdateRequirements(obj, row, site)
374                if update_errors is not None:
375                    num_warns += 1
376                    self.writeFailedRow(
377                        failed_writer, string_row, update_errors)
378                    continue
379                self.updateEntry(obj, row, site)
380            finished_writer.writerow(string_row)
381
382        time_end = time.time()
383        timedelta = time_end - time_start
384
385        self.createLogfile(path, failed_path, num, num_warns, mode, user,
386                           timedelta, logger=logger)
387        failed_path = os.path.abspath(failed_path)
388        if num_warns == 0:
389            del failed_writer
390            os.unlink(failed_path)
391            failed_path = None
392        return (num, num_warns,
393                os.path.abspath(finished_path), failed_path)
394
395class ExporterBase(object):
396    """A base for exporters.
397    """
398    grok.implements(ICSVExporter)
399
400    #: Fieldnames considered by this exporter
401    fields = ('code', 'title', 'title_prefix')
402
403    #: The title under which this exporter will be displayed
404    #: (if registered as a utility)
405    title = 'Override this title'
406
407    def mangle_value(self, value, name, context=None):
408        """Hook for mangling values in derived classes
409        """
410        if isinstance(value, bool):
411            value = value and '1' or '0'
412        elif isinstance(value, unicode):
413            # CSV writers like byte streams better than unicode
414            value = value.encode('utf-8')
415        elif isinstance(value, datetime.datetime):
416            value = str(value)
417        elif isinstance(value, datetime.date):
418            # Order is important here: check for date after datetime as
419            # datetimes are also dates.
420            #
421            # Append hash '#' to dates to circumvent unwanted excel automatic
422            value = str('%s#' % value)
423        elif value is None:
424            # None is not really representable in CSV files
425            value = ''
426        return value
427
428    def get_csv_writer(self, filepath=None):
429        """Get a CSV dict writer instance open for writing.
430
431        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
432        :class:`csv.DictWriter` instance and outfile is the real file
433        which is written to. The latter is important when writing to
434        StringIO and can normally be ignored otherwise.
435
436        The returned file will already be filled with the header row.
437
438        Please note that if you give a filepath, the returned outfile
439        is open for writing only and you might have to close it before
440        reopening it for reading.
441        """
442        if filepath is None:
443            outfile = StringIO()
444        else:
445            outfile = open(filepath, 'wb')
446        writer = csv.DictWriter(outfile, self.fields)
447        writer.writerow(dict(zip(self.fields, self.fields))) # header
448        return writer, outfile
449
450    def write_item(self, obj, writer):
451        """Write a row extracted from `obj` into CSV file using `writer`.
452        """
453        row = {}
454        for name in self.fields:
455            value = getattr(obj, name, None)
456            value = self.mangle_value(value, name, obj)
457            row[name] = value
458        writer.writerow(row)
459        return
460
461    def close_outfile(self, filepath, outfile):
462        """Close outfile.
463
464        If filepath is None, the contents of outfile is returned.
465        """
466        outfile.seek(0)
467        if filepath is None:
468            return outfile.read()
469        outfile.close()
470        return
471
472    def export(self, iterable, filepath=None):
473        """Export `iterable` as CSV file.
474
475        If `filepath` is ``None``, a raw string with CSV data should
476        be returned.
477        """
478        raise NotImplementedError
479
480    def export_all(self, site, filepath=None):
481        """Export all appropriate objects in `site` into `filepath` as
482        CSV data.
483
484        If `filepath` is ``None``, a raw string with CSV data should
485        be returned.
486        """
487        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.