source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9220

Last change on this file since 9220 was 9219, checked in by Henrik Bettermann, 12 years ago

Minor changes.

  • Property svn:keywords set to Id
File size: 24.0 KB
Line 
1## $Id: batching.py 9219 2012-09-21 16:09:16Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob)
42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
48    grok.implements(IBatchProcessor)
49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
53    name = u'Non-registered base processor'
54
55    # Internal name...
56    util_name = 'baseprocessor'
57
58    # Items for this processor need an interface with zope.schema fields.
59    iface = Interface
60
61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
67
68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
86            if field.default is not field.missing_value:
87                continue
88            if field.required:
89                result.append(key)
90        return result
91
92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
105
106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
127        """Apply mapping to a row of CSV data.
128
129        """
130        result = dict()
131        for key, replacement in mapping.items():
132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
135            result[replacement] = row[key]
136        return result
137
138    def getMapping(self, path, headerfields, mode):
139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
141        """
142        result = dict()
143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
152            result[raw_header[num]] = field
153        return result
154
155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
168        """Tell whether the parent object for data in ``row`` exists.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
173        """Tell whether there already exists an entry for ``row`` data.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def addEntry(self, obj, row, site):
188        """Add the entry given given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
193        """Delete entry given by ``row`` data.
194        """
195        raise NotImplementedError('method not implemented')
196
197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
199
200        This method is not used in case of deleting or adding objects.
201
202        Returns error messages as strings in case of requirement
203        problems.
204        """
205        return None
206
207    def updateEntry(self, obj, row, site):
208        """Update obj to the values given in row.
209
210        Returns a string describing the fields changed.
211        """
212        changed = []
213        for key, value in row.items():
214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
220            if not hasattr(obj, key):
221                continue
222            setattr(obj, key, value)
223            log_value = getattr(value, 'code', value)
224            changed.append('%s=%s' % (key, log_value))
225
226        # If any catalog is involved it must be updated.
227        #
228        # XXX: The event is also triggered when creating objects as
229        # updateEntry is called also when creating entries resulting
230        # in objectAdded and additional objectModified events.
231        if len(changed):
232            notify(grok.ObjectModifiedEvent(obj))
233
234        return ', '.join(changed)
235
236    def createLogfile(self, path, fail_path, num, warnings, mode, user,
237                      timedelta, logger=None):
238        """Write to log file.
239        """
240        if logger is None:
241            return
242        status = 'OK'
243        if warnings > 0:
244            status = 'FAILED'
245        logger.info("-" * 20)
246        logger.info("%s: Batch processing finished: %s" % (user, status))
247        logger.info("%s: Source: %s" % (user, path))
248        logger.info("%s: Mode: %s" % (user, mode))
249        logger.info("%s: User: %s" % (user, user))
250        if warnings > 0:
251            logger.info("%s: Failed datasets: %s" % (
252                    user, os.path.basename(fail_path)))
253        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
254                user, timedelta, timedelta/(num or 1)))
255        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
256                user, num, num - warnings, warnings
257                ))
258        logger.info("-" * 20)
259        return
260
261    def writeFailedRow(self, writer, row, warnings):
262        """Write a row with error messages to error CSV.
263
264        If warnings is a list of strings, they will be concatenated.
265        """
266        error_col = warnings
267        if isinstance(warnings, list):
268            error_col = ' / '.join(warnings)
269        row['--ERRORS--'] = error_col
270        writer.writerow(row)
271        return
272
273    def checkConversion(self, row, mode='ignore', ignore_empty=True):
274        """Validates all values in row.
275        """
276        converter = IObjectConverter(self.iface)
277        errs, inv_errs, conv_dict =  converter.fromStringDict(
278            row, self.factory_name, mode=mode)
279        return errs, inv_errs, conv_dict
280
281    def doImport(self, path, headerfields, mode='create', user='Unknown',
282                 logger=None, ignore_empty=True):
283        """Perform actual import.
284        """
285        time_start = time.time()
286        self.checkHeaders(headerfields, mode)
287        mapping = self.getMapping(path, headerfields, mode)
288        reader = csv.DictReader(open(path, 'rb'))
289
290        temp_dir = tempfile.mkdtemp()
291
292        base = os.path.basename(path)
293        (base, ext) = os.path.splitext(base)
294        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
295        failed_headers = mapping.values()
296        failed_headers.append('--ERRORS--')
297        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
298                                       failed_headers)
299        os.chmod(failed_path, 0664)
300        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
301
302        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
303        finished_headers = mapping.values()
304        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
305                                         finished_headers)
306        os.chmod(finished_path, 0664)
307        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
308
309        num =0
310        num_warns = 0
311        site = grok.getSite()
312
313        for raw_row in reader:
314            num += 1
315            string_row = self.applyMapping(raw_row, mapping)
316            if ignore_empty and mode in ('update',):
317                # replace empty strings with ignore-markers
318                for key, val in string_row.items():
319                    if val == '':
320                        string_row[key] = IGNORE_MARKER
321            row = dict(string_row.items()) # create deep copy
322            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
323            if errs or inv_errs:
324                num_warns += 1
325                conv_warnings = self.stringFromErrs(errs, inv_errs)
326                self.writeFailedRow(
327                    failed_writer, string_row, conv_warnings)
328                continue
329            row.update(conv_dict)
330
331            if mode == 'create':
332                if not self.parentsExist(row, site):
333                    num_warns += 1
334                    self.writeFailedRow(
335                        failed_writer, string_row,
336                        "Not all parents do exist yet. Skipping")
337                    continue
338                if self.entryExists(row, site):
339                    num_warns += 1
340                    self.writeFailedRow(
341                        failed_writer, string_row,
342                        "This object already exists. Skipping.")
343                    continue
344                obj = self.callFactory()
345                # Override all values in row, also
346                # student_ids and applicant_ids which have been
347                # generated in the respective __init__ methods before.
348                self.updateEntry(obj, row, site)
349                try:
350                    self.addEntry(obj, row, site)
351                except KeyError, error:
352                    num_warns += 1
353                    self.writeFailedRow(
354                        failed_writer, string_row,
355                        "%s Skipping." % error.message)
356                    continue
357                except DuplicationError, error:
358                    num_warns += 1
359                    self.writeFailedRow(
360                        failed_writer, string_row,
361                        "%s Skipping." % error.msg)
362                    continue
363            elif mode == 'remove':
364                if not self.entryExists(row, site):
365                    num_warns += 1
366                    self.writeFailedRow(
367                        failed_writer, string_row,
368                        "Cannot remove: no such entry")
369                    continue
370                self.delEntry(row, site)
371            elif mode == 'update':
372                obj = self.getEntry(row, site)
373                if obj is None:
374                    num_warns += 1
375                    self.writeFailedRow(
376                        failed_writer, string_row,
377                        "Cannot update: no such entry")
378                    continue
379                update_errors = self.checkUpdateRequirements(obj, row, site)
380                if update_errors is not None:
381                    num_warns += 1
382                    self.writeFailedRow(
383                        failed_writer, string_row, update_errors)
384                    continue
385                self.updateEntry(obj, row, site)
386            finished_writer.writerow(string_row)
387
388        time_end = time.time()
389        timedelta = time_end - time_start
390
391        self.createLogfile(path, failed_path, num, num_warns, mode, user,
392                           timedelta, logger=logger)
393        failed_path = os.path.abspath(failed_path)
394        if num_warns == 0:
395            del failed_writer
396            os.unlink(failed_path)
397            failed_path = None
398        return (num, num_warns,
399                os.path.abspath(finished_path), failed_path)
400
401    def get_csv_skeleton(self):
402        """Export CSV file only with a header of available fields.
403
404        A raw string with CSV data should be returned.
405        """
406        outfile = StringIO()
407        writer = csv.DictWriter(outfile, self.available_fields)
408        writer.writerow(dict(zip(self.available_fields, self.available_fields))) # header
409        outfile.seek(0)
410        return outfile.read()
411
412class ExporterBase(object):
413    """A base for exporters.
414    """
415    grok.implements(ICSVExporter)
416
417    #: Fieldnames considered by this exporter
418    fields = ('code', 'title', 'title_prefix')
419
420    #: The title under which this exporter will be displayed
421    #: (if registered as a utility)
422    title = 'Override this title'
423
424    def mangle_value(self, value, name, context=None):
425        """Hook for mangling values in derived classes
426        """
427        if isinstance(value, bool):
428            value = value and '1' or '0'
429        elif isinstance(value, unicode):
430            # CSV writers like byte streams better than unicode
431            value = value.encode('utf-8')
432        elif isinstance(value, datetime.datetime):
433            value = str(value)
434        elif isinstance(value, datetime.date):
435            # Order is important here: check for date after datetime as
436            # datetimes are also dates.
437            #
438            # Append hash '#' to dates to circumvent unwanted excel automatic
439            value = str('%s#' % value)
440        elif value is None:
441            # None is not really representable in CSV files
442            value = ''
443        return value
444
445    def get_csv_writer(self, filepath=None):
446        """Get a CSV dict writer instance open for writing.
447
448        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
449        :class:`csv.DictWriter` instance and outfile is the real file
450        which is written to. The latter is important when writing to
451        StringIO and can normally be ignored otherwise.
452
453        The returned file will already be filled with the header row.
454
455        Please note that if you give a filepath, the returned outfile
456        is open for writing only and you might have to close it before
457        reopening it for reading.
458        """
459        if filepath is None:
460            outfile = StringIO()
461        else:
462            outfile = open(filepath, 'wb')
463        writer = csv.DictWriter(outfile, self.fields)
464        writer.writerow(dict(zip(self.fields, self.fields))) # header
465        return writer, outfile
466
467    def write_item(self, obj, writer):
468        """Write a row extracted from `obj` into CSV file using `writer`.
469        """
470        row = {}
471        for name in self.fields:
472            value = getattr(obj, name, None)
473            value = self.mangle_value(value, name, obj)
474            row[name] = value
475        writer.writerow(row)
476        return
477
478    def close_outfile(self, filepath, outfile):
479        """Close outfile.
480
481        If filepath is None, the contents of outfile is returned.
482        """
483        outfile.seek(0)
484        if filepath is None:
485            return outfile.read()
486        outfile.close()
487        return
488
489    def export(self, iterable, filepath=None):
490        """Export `iterable` as CSV file.
491
492        If `filepath` is ``None``, a raw string with CSV data should
493        be returned.
494        """
495        raise NotImplementedError
496
497    def export_all(self, site, filepath=None):
498        """Export all appropriate objects in `site` into `filepath` as
499        CSV data.
500
501        If `filepath` is ``None``, a raw string with CSV data should
502        be returned.
503        """
504        raise NotImplementedError
505
506
507def export_job(site, exporter_name):
508    """Export all entries delivered by exporter and store it in a temp file.
509
510    `site` gives the site to search. It will be passed to the exporter
511    and also be set as 'current site' as the function is used in
512    asynchronous jobs which run in their own threads and have no site
513    set initially. Therefore `site` must also be a valid value for use
514    with `zope.component.hooks.setSite()`.
515
516    `exporter_name` is the utility name under which the desired
517    exporter was registered with the ZCA.
518
519    The resulting CSV file will be stored in a new temporary directory
520    (using :func:`tempfile.mkdtemp`). It will be named after the
521    exporter used with `.csv` filename extension.
522
523    Returns the path to the created CSV file.
524
525    .. note:: It is the callers responsibility to clean up the used
526              file and its parent directory.
527    """
528    setSite(site)
529    exporter = getUtility(ICSVExporter, name=exporter_name)
530    output_dir = tempfile.mkdtemp()
531    filename = '%s.csv' % exporter_name
532    output_path = os.path.join(output_dir, filename)
533    exporter.export_all(site, filepath=output_path)
534    return output_path
535
536class AsyncExportJob(AsyncJob):
537    """An IJob that exports data to CSV files.
538
539    `AsyncExportJob` instances are regular `AsyncJob` instances with a
540    different constructor API. Instead of a callable to execute, you
541    must pass a `site` and some `exporter_name` to trigger an export.
542
543    The real work is done when an instance of this class is put into a
544    queue. See :mod:`waeup.kofa.async` to learn more about
545    asynchronous jobs.
546
547    The `exporter_name` must be the name under which an ICSVExporter
548    utility was registered with the ZCA.
549
550    The `site` must be a valid site  or ``None``.
551
552    The result of an `AsyncExportJob` is the path to generated CSV
553    file. The file will reside in a temporary directory that should be
554    removed after being used.
555    """
556    grok.implements(IExportJob)
557
558    def __init__(self, site, exporter_name):
559        super(AsyncExportJob, self).__init__(
560            export_job, site, exporter_name)
561
562class ExportJobContainer(object):
563    """A mix-in that provides functionality for asynchronous export jobs.
564    """
565    grok.implements(IExportJobContainer)
566    running_exports = PersistentList()
567
568    def start_export_job(self, exporter_name, user_id):
569        """Start asynchronous export job.
570
571        `exporter_name` is the name of an exporter utility to be used.
572
573        `user_id` is the ID of the user that triggers the export.
574
575        The job_id is stored along with exporter name and user id in a
576        persistent list.
577
578        Returns the job ID of the job started.
579        """
580        site = grok.getSite()
581        manager = getUtility(IJobManager)
582        job = AsyncExportJob(site, exporter_name)
583        job_id = manager.put(job)
584        # Make sure that the persisted list is stored in ZODB
585        self.running_exports = PersistentList(self.running_exports)
586        self.running_exports.append((job_id, exporter_name, user_id))
587        return job_id
588
589    def get_running_export_jobs(self, user_id=None):
590        """Get export jobs for user with `user_id` as list of tuples.
591
592        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
593        that order. The ``<exporter_name>`` is the utility name of the
594        used exporter.
595
596        If `user_id` is ``None``, all running jobs are returned.
597        """
598        entries = []
599        to_delete = []
600        manager = getUtility(IJobManager)
601        for entry in self.running_exports:
602            if user_id is not None and entry[2] != user_id:
603                continue
604            if manager.get(entry[0]) is None:
605                to_delete.append(entry)
606                continue
607            entries.append(entry)
608        if to_delete:
609            self.running_exports = PersistentList(
610                [x for x in self.running_exports if x not in to_delete])
611        return entries
612
613    def get_export_jobs_status(self, user_id=None):
614        """Get running/completed export jobs for `user_id` as list of tuples.
615
616        Each tuple holds ``<raw status>, <status translated>,
617        <exporter title>`` in that order, where ``<status
618        translated>`` and ``<exporter title>`` are translated strings
619        representing the status of the job and the human readable
620        title of the exporter used.
621        """
622        entries = self.get_running_export_jobs(user_id)
623        result = []
624        manager = getUtility(IJobManager)
625        for entry in entries:
626            job = manager.get(entry[0])
627            if job is None:
628                continue
629            status, status_translated = JOB_STATUS_MAP[job.status]
630            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
631            result.append((status, status_translated, exporter_name))
632        return result
633
634    def delete_export_entry(self, entry):
635        """Delete the export denoted by `entry`.
636
637        Removes given entry from the local `running_exports` list and also
638        removes the regarding job via the local job manager.
639
640        `entry` must be a tuple ``(<job id>, <exporter name>, <user
641        id>)`` as created by :meth:`start_export_job` or returned by
642        :meth:`get_running_export_jobs`.
643        """
644        manager = getUtility(IJobManager)
645        job = manager.get(entry[0])
646        if job is not None:
647            # remove created export file
648            if isinstance(job.result, basestring):
649                if os.path.exists(os.path.dirname(job.result)):
650                    shutil.rmtree(os.path.dirname(job.result))
651        manager.remove(entry[0], self)
652        new_entries = [x for x in self.running_exports
653                       if x != entry]
654        self.running_exports = PersistentList(new_entries)
655        return
656
657    def entry_from_job_id(self, job_id):
658        """Get entry tuple for `job_id`.
659
660        Returns ``None`` if no such entry can be found.
661        """
662        for entry in self.running_exports:
663            if entry[0] == job_id:
664                return entry
665        return None
Note: See TracBrowser for help on using the repository browser.