source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/batching.py @ 9089

Last change on this file since 9089 was 9089, checked in by uli, 12 years ago

Add async functionality for exporters:

  • an exporter function usable with decoupled threads
  • a mix-in class for components storing export jobs
  • specialized async-aware job implementation
  • Property svn:keywords set to Id
File size: 22.4 KB
Line 
1## $Id: batching.py 9089 2012-08-08 13:19:41Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP)
41
42class BatchProcessor(grok.GlobalUtility):
43    """A processor to add, update, or remove data.
44
45    This is a non-active baseclass.
46    """
47    grok.implements(IBatchProcessor)
48    grok.context(Interface)
49    grok.baseclass()
50
51    # Name used in pages and forms...
52    name = u'Non-registered base processor'
53
54    # Internal name...
55    util_name = 'baseprocessor'
56
57    # Items for this processor need an interface with zope.schema fields.
58    iface = Interface
59
60    # The name must be the same as the util_name attribute in order to
61    # register this utility correctly.
62    grok.name(util_name)
63
64    # Headers needed to locate items...
65    location_fields = ['code', 'faculty_code']
66
67    # A factory with this name must be registered...
68    factory_name = 'waeup.Department'
69
70    @property
71    def required_fields(self):
72        """Required fields that have no default.
73
74        A list of names of field, whose value cannot be set if not
75        given during creation. Therefore these fields must exist in
76        input.
77
78        Fields with a default != missing_value do not belong to this
79        category.
80        """
81        result = []
82        for key, field in getFields(self.iface).items():
83            if key in self.location_fields:
84                continue
85            if field.default is not field.missing_value:
86                continue
87            if field.required:
88                result.append(key)
89        return result
90
91    @property
92    def req(self):
93        result = dict(
94            create = self.location_fields + self.required_fields,
95            update = self.location_fields,
96            remove = self.location_fields,
97        )
98        return result
99
100    @property
101    def available_fields(self):
102        return sorted(list(set(
103                    self.location_fields + getFields(self.iface).keys())))
104
105    def getHeaders(self, mode='create'):
106        return self.available_fields
107
108    def checkHeaders(self, headerfields, mode='create'):
109        req = self.req[mode]
110        # Check for required fields...
111        for field in req:
112            if not field in headerfields:
113                raise FatalCSVError(
114                    "Need at least columns %s for import!" %
115                    ', '.join(["'%s'" % x for x in req]))
116        # Check for double fields. Cannot happen because this error is
117        # already catched in views
118        not_ignored_fields = [x for x in headerfields
119                              if not x.startswith('--')]
120        if len(set(not_ignored_fields)) < len(not_ignored_fields):
121            raise FatalCSVError(
122                "Double headers: each column name may only appear once.")
123        return True
124
125    def applyMapping(self, row, mapping):
126        """Apply mapping to a row of CSV data.
127
128        """
129        result = dict()
130        for key, replacement in mapping.items():
131            if replacement == u'--IGNORE--':
132                # Skip ignored columns in failed and finished data files.
133                continue
134            result[replacement] = row[key]
135        return result
136
137    def getMapping(self, path, headerfields, mode):
138        """Get a mapping from CSV file headerfields to actually used fieldnames.
139
140        """
141        result = dict()
142        reader = csv.reader(open(path, 'rb'))
143        raw_header = reader.next()
144        for num, field in enumerate(headerfields):
145            if field not in self.location_fields and mode == 'remove':
146                # Skip non-location fields when removing.
147                continue
148            if field == u'--IGNORE--':
149                # Skip ignored columns in failed and finished data files.
150                continue
151            result[raw_header[num]] = field
152        return result
153
154    def stringFromErrs(self, errors, inv_errors):
155        result = []
156        for err in errors:
157            fieldname, message = err
158            result.append("%s: %s" % (fieldname, message))
159        for err in inv_errors:
160            result.append("invariant: %s" % err)
161        return '; '.join(result)
162
163    def callFactory(self, *args, **kw):
164        return createObject(self.factory_name)
165
166    def parentsExist(self, row, site):
167        """Tell whether the parent object for data in ``row`` exists.
168        """
169        raise NotImplementedError('method not implemented')
170
171    def entryExists(self, row, site):
172        """Tell whether there already exists an entry for ``row`` data.
173        """
174        raise NotImplementedError('method not implemented')
175
176    def getParent(self, row, site):
177        """Get the parent object for the entry in ``row``.
178        """
179        raise NotImplementedError('method not implemented')
180
181    def getEntry(self, row, site):
182        """Get the parent object for the entry in ``row``.
183        """
184        raise NotImplementedError('method not implemented')
185
186    def addEntry(self, obj, row, site):
187        """Add the entry given given by ``row`` data.
188        """
189        raise NotImplementedError('method not implemented')
190
191    def delEntry(self, row, site):
192        """Delete entry given by ``row`` data.
193        """
194        raise NotImplementedError('method not implemented')
195
196    def checkUpdateRequirements(self, obj, row, site):
197        """Checks requirements the object must fulfill when being updated.
198
199        This method is not used in case of deleting or adding objects.
200
201        Returns error messages as strings in case of requirement
202        problems.
203        """
204        return None
205
206    def updateEntry(self, obj, row, site):
207        """Update obj to the values given in row.
208
209        Returns a string describing the fields changed.
210        """
211        changed = []
212        for key, value in row.items():
213            # Skip fields to be ignored.
214            if value == IGNORE_MARKER:
215                continue
216            # Skip fields not declared in interface and which are
217            # not yet attributes of existing objects. We can thus not
218            # add non-existing attributes here.
219            if not hasattr(obj, key):
220                continue
221            setattr(obj, key, value)
222            log_value = getattr(value, 'code', value)
223            changed.append('%s=%s' % (key, log_value))
224
225        # If any catalog is involved it must be updated.
226        #
227        # XXX: The event is also triggered when creating objects as
228        # updateEntry is called also when creating entries resulting
229        # in objectAdded and additional objectModified events.
230        if len(changed):
231            notify(grok.ObjectModifiedEvent(obj))
232
233        return ', '.join(changed)
234
235    def createLogfile(self, path, fail_path, num, warnings, mode, user,
236                      timedelta, logger=None):
237        """Write to log file.
238        """
239        if logger is None:
240            return
241        status = 'OK'
242        if warnings > 0:
243            status = 'FAILED'
244        logger.info("-" * 20)
245        logger.info("%s: Batch processing finished: %s" % (user, status))
246        logger.info("%s: Source: %s" % (user, path))
247        logger.info("%s: Mode: %s" % (user, mode))
248        logger.info("%s: User: %s" % (user, user))
249        if warnings > 0:
250            logger.info("%s: Failed datasets: %s" % (
251                    user, os.path.basename(fail_path)))
252        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
253                user, timedelta, timedelta/(num or 1)))
254        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
255                user, num, num - warnings, warnings
256                ))
257        logger.info("-" * 20)
258        return
259
260    def writeFailedRow(self, writer, row, warnings):
261        """Write a row with error messages to error CSV.
262
263        If warnings is a list of strings, they will be concatenated.
264        """
265        error_col = warnings
266        if isinstance(warnings, list):
267            error_col = ' / '.join(warnings)
268        row['--ERRORS--'] = error_col
269        writer.writerow(row)
270        return
271
272    def checkConversion(self, row, mode='ignore', ignore_empty=True):
273        """Validates all values in row.
274        """
275        converter = IObjectConverter(self.iface)
276        errs, inv_errs, conv_dict =  converter.fromStringDict(
277            row, self.factory_name, mode=mode)
278        return errs, inv_errs, conv_dict
279
280    def doImport(self, path, headerfields, mode='create', user='Unknown',
281                 logger=None, ignore_empty=True):
282        """Perform actual import.
283        """
284        time_start = time.time()
285        self.checkHeaders(headerfields, mode)
286        mapping = self.getMapping(path, headerfields, mode)
287        reader = csv.DictReader(open(path, 'rb'))
288
289        temp_dir = tempfile.mkdtemp()
290
291        base = os.path.basename(path)
292        (base, ext) = os.path.splitext(base)
293        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
294        failed_headers = mapping.values()
295        failed_headers.append('--ERRORS--')
296        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
297                                       failed_headers)
298        os.chmod(failed_path, 0664)
299        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
300
301        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
302        finished_headers = mapping.values()
303        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
304                                         finished_headers)
305        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
306
307        num =0
308        num_warns = 0
309        site = grok.getSite()
310
311        for raw_row in reader:
312            num += 1
313            string_row = self.applyMapping(raw_row, mapping)
314            if ignore_empty and mode in ('update',):
315                # replace empty strings with ignore-markers
316                for key, val in string_row.items():
317                    if val == '':
318                        string_row[key] = IGNORE_MARKER
319            row = dict(string_row.items()) # create deep copy
320            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
321            if errs or inv_errs:
322                num_warns += 1
323                conv_warnings = self.stringFromErrs(errs, inv_errs)
324                self.writeFailedRow(
325                    failed_writer, string_row, conv_warnings)
326                continue
327            row.update(conv_dict)
328
329            if mode == 'create':
330                if not self.parentsExist(row, site):
331                    num_warns += 1
332                    self.writeFailedRow(
333                        failed_writer, string_row,
334                        "Not all parents do exist yet. Skipping")
335                    continue
336                if self.entryExists(row, site):
337                    num_warns += 1
338                    self.writeFailedRow(
339                        failed_writer, string_row,
340                        "This object already exists. Skipping.")
341                    continue
342                obj = self.callFactory()
343                # Override all values in row, also
344                # student_ids and applicant_ids which have been
345                # generated in the respective __init__ methods before.
346                self.updateEntry(obj, row, site)
347                try:
348                    self.addEntry(obj, row, site)
349                except KeyError, error:
350                    num_warns += 1
351                    self.writeFailedRow(
352                        failed_writer, string_row,
353                        "%s Skipping." % error.message)
354                    continue
355                except DuplicationError, error:
356                    num_warns += 1
357                    self.writeFailedRow(
358                        failed_writer, string_row,
359                        "%s Skipping." % error.msg)
360                    continue
361            elif mode == 'remove':
362                if not self.entryExists(row, site):
363                    num_warns += 1
364                    self.writeFailedRow(
365                        failed_writer, string_row,
366                        "Cannot remove: no such entry.")
367                    continue
368                self.delEntry(row, site)
369            elif mode == 'update':
370                obj = self.getEntry(row, site)
371                if obj is None:
372                    num_warns += 1
373                    self.writeFailedRow(
374                        failed_writer, string_row,
375                        "Cannot update: no such entry.")
376                    continue
377                update_errors = self.checkUpdateRequirements(obj, row, site)
378                if update_errors is not None:
379                    num_warns += 1
380                    self.writeFailedRow(
381                        failed_writer, string_row, update_errors)
382                    continue
383                self.updateEntry(obj, row, site)
384            finished_writer.writerow(string_row)
385
386        time_end = time.time()
387        timedelta = time_end - time_start
388
389        self.createLogfile(path, failed_path, num, num_warns, mode, user,
390                           timedelta, logger=logger)
391        failed_path = os.path.abspath(failed_path)
392        if num_warns == 0:
393            del failed_writer
394            os.unlink(failed_path)
395            failed_path = None
396        return (num, num_warns,
397                os.path.abspath(finished_path), failed_path)
398
399class ExporterBase(object):
400    """A base for exporters.
401    """
402    grok.implements(ICSVExporter)
403
404    #: Fieldnames considered by this exporter
405    fields = ('code', 'title', 'title_prefix')
406
407    #: The title under which this exporter will be displayed
408    #: (if registered as a utility)
409    title = 'Override this title'
410
411    def mangle_value(self, value, name, context=None):
412        """Hook for mangling values in derived classes
413        """
414        if isinstance(value, bool):
415            value = value and '1' or '0'
416        elif isinstance(value, unicode):
417            # CSV writers like byte streams better than unicode
418            value = value.encode('utf-8')
419        elif isinstance(value, datetime.datetime):
420            value = str(value)
421        elif isinstance(value, datetime.date):
422            # Order is important here: check for date after datetime as
423            # datetimes are also dates.
424            #
425            # Append hash '#' to dates to circumvent unwanted excel automatic
426            value = str('%s#' % value)
427        elif value is None:
428            # None is not really representable in CSV files
429            value = ''
430        return value
431
432    def get_csv_writer(self, filepath=None):
433        """Get a CSV dict writer instance open for writing.
434
435        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
436        :class:`csv.DictWriter` instance and outfile is the real file
437        which is written to. The latter is important when writing to
438        StringIO and can normally be ignored otherwise.
439
440        The returned file will already be filled with the header row.
441
442        Please note that if you give a filepath, the returned outfile
443        is open for writing only and you might have to close it before
444        reopening it for reading.
445        """
446        if filepath is None:
447            outfile = StringIO()
448        else:
449            outfile = open(filepath, 'wb')
450        writer = csv.DictWriter(outfile, self.fields)
451        writer.writerow(dict(zip(self.fields, self.fields))) # header
452        return writer, outfile
453
454    def write_item(self, obj, writer):
455        """Write a row extracted from `obj` into CSV file using `writer`.
456        """
457        row = {}
458        for name in self.fields:
459            value = getattr(obj, name, None)
460            value = self.mangle_value(value, name, obj)
461            row[name] = value
462        writer.writerow(row)
463        return
464
465    def close_outfile(self, filepath, outfile):
466        """Close outfile.
467
468        If filepath is None, the contents of outfile is returned.
469        """
470        outfile.seek(0)
471        if filepath is None:
472            return outfile.read()
473        outfile.close()
474        return
475
476    def export(self, iterable, filepath=None):
477        """Export `iterable` as CSV file.
478
479        If `filepath` is ``None``, a raw string with CSV data should
480        be returned.
481        """
482        raise NotImplementedError
483
484    def export_all(self, site, filepath=None):
485        """Export all appropriate objects in `site` into `filepath` as
486        CSV data.
487
488        If `filepath` is ``None``, a raw string with CSV data should
489        be returned.
490        """
491        raise NotImplementedError
492
493
494def export_job(site, exporter_name):
495    """Export all entries delivered by exporter and store it in a temp file.
496
497    `site` gives the site to search. It will be passed to the exporter
498    and also be set as 'current site' as the function is used in
499    asynchronous jobs which run in their own threads and have no site
500    set initially. Therefore `site` must also be a valid value for use
501    with `zope.component.hooks.setSite()`.
502
503    `exporter_name` is the utility name under which the desired
504    exporter was registered with the ZCA.
505
506    The resulting CSV file will be stored in a new temporary directory
507    (using :func:`tempfile.mkdtemp`).
508
509    Returns the path to the created CSV file.
510
511    .. note:: It is the callers responsibility to clean up the used
512              file and its parent directory.
513    """
514    setSite(site)
515    exporter = getUtility(ICSVExporter, name=exporter_name)
516    output_dir = tempfile.mkdtemp()
517    output_path = os.path.join(output_dir, 'export.csv')
518    exporter.export_all(site, filepath=output_path)
519    return output_path
520
521class AsyncExportJob(AsyncJob):
522
523    def __init__(self, site, exporter_name):
524        super(AsyncExportJob, self).__init__(
525            export_job, site, exporter_name)
526
527class ExportJobContainer(object):
528    """A mix-in that provides functionality for asynchronous export jobs.
529    """
530    running_exports = PersistentList()
531
532    def start_export_job(self, exporter_name, user_id):
533        """Start asynchronous export job.
534
535        `exporter_name` is the name of an exporter utility to be used.
536
537        `user_id` is the ID of the user that triggers the export.
538
539        The job_id is stored along with exporter name and user id in a
540        persistent list.
541
542        Returns the job ID of the job started.
543        """
544        site = grok.getSite()
545        manager = getUtility(IJobManager)
546        job = AsyncExportJob(site, exporter_name)
547        job_id = manager.put(job)
548        self.running_exports.append((job_id, exporter_name, user_id))
549        # Make sure that the persisted list is stored in ZODB
550        self.running_exports = PersistentList(self.running_exports)
551        return job_id
552
553    def get_running_export_jobs(self, user_id=None):
554        """Get export jobs for user with `user_id` as list of tuples.
555
556        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
557        that order. The ``<exporter_name>`` is the utility name of the
558        used exporter.
559
560        If `user_id` is ``None``, all running jobs are returned.
561        """
562        entries = []
563        to_delete = []
564        manager = getUtility(IJobManager)
565        for entry in self.running_exports:
566            if user_id is not None and entry[2] != user_id:
567                continue
568            if manager.get(entry[0]) is None:
569                to_delete.append(entry)
570                continue
571            entries.append(entry)
572        if to_delete:
573            self.running_exports = PersistentList(
574                [x for x in self.running_exports if x not in to_delete])
575        return entries
576
577    def get_export_jobs_status(self, user_id=None):
578        """Get running/completed export jobs for `user_id` as list of tuples.
579
580        Each tuple holds ``<raw status>, <status translated>,
581        <exporter title>`` in that order, where ``<status
582        translated>`` and ``<exporter title>`` are translated strings
583        representing the status of the job and the human readable
584        title of the exporter used.
585        """
586        entries = self.get_running_export_jobs(user_id)
587        result = []
588        manager = getUtility(IJobManager)
589        for entry in entries:
590            job = manager.get(entry[0])
591            if job is None:
592                continue
593            status, status_translated = JOB_STATUS_MAP[job.status]
594            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
595            result.append((status, status_translated, exporter_name))
596        return result
597
598    def delete_export_entry(self, entry):
599        """Delete the export denoted by `entry`.
600
601        Removes `entry` from the local `running_exports` list and also
602        removes the regarding job via the local job manager.
603
604        `entry` is a tuple ``(<job id>, <exporter name>, <user id>)``
605        as created by :meth:`start_export_job` or returned by
606        :meth:`get_running_export_jobs`.
607        """
608        manager = getUtility(IJobManager)
609        job = manager.get(entry[0])
610        if job is not None:
611            # remove created export file
612            if isinstance(job.result, basestring):
613                if os.path.exists(os.path.dirname(job.result)):
614                    shutil.rmtree(os.path.dirname(job.result))
615        manager.remove(entry[0], self)
616        new_entries = [x for x in self.running_exports
617                       if x != entry]
618        self.running_exports = PersistentList(new_entries)
619        return
Note: See TracBrowser for help on using the repository browser.