source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9815

Last change on this file since 9815 was 9797, checked in by uli, 12 years ago

Local exports for departments.

  • Property svn:keywords set to Id
File size: 26.5 KB
Line 
1## $Id: batching.py 9797 2012-12-13 15:39:31Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface, implementer
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob, IExportContainerFinder)
42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
48    grok.implements(IBatchProcessor)
49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
53    name = u'Non-registered base processor'
54
55    # Internal name...
56    util_name = 'baseprocessor'
57
58    # Items for this processor need an interface with zope.schema fields.
59    iface = Interface
60
61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
67
68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
86            if field.default is not field.missing_value:
87                continue
88            if field.required:
89                result.append(key)
90        return result
91
92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
105
106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
127        """Apply mapping to a row of CSV data.
128
129        """
130        result = dict()
131        for key, replacement in mapping.items():
132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
135            result[replacement] = row[key]
136        return result
137
138    def getMapping(self, path, headerfields, mode):
139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
141        """
142        result = dict()
143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
152            result[raw_header[num]] = field
153        return result
154
155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
168        """Tell whether the parent object for data in ``row`` exists.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
173        """Tell whether there already exists an entry for ``row`` data.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def addEntry(self, obj, row, site):
188        """Add the entry given given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
193        """Delete entry given by ``row`` data.
194        """
195        raise NotImplementedError('method not implemented')
196
197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
199
200        This method is not used in case of deleting or adding objects.
201
202        Returns error messages as strings in case of requirement
203        problems.
204        """
205        return None
206
207    def updateEntry(self, obj, row, site, filename):
208        """Update obj to the values given in row.
209
210        Returns a string describing the fields changed.
211        """
212        changed = []
213        for key, value in row.items():
214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
220            if not hasattr(obj, key):
221                continue
222            try:
223                setattr(obj, key, value)
224            except AttributeError:
225                # Computed attributes can't be set.
226                continue
227            log_value = getattr(value, 'code', value)
228            changed.append('%s=%s' % (key, log_value))
229
230        # If any catalog is involved it must be updated.
231        #
232        # XXX: The event is also triggered when creating objects as
233        # updateEntry is called also when creating entries resulting
234        # in objectAdded and additional objectModified events.
235        if len(changed):
236            notify(grok.ObjectModifiedEvent(obj))
237
238        return ', '.join(changed)
239
240    def createLogfile(self, path, fail_path, num, warnings, mode, user,
241                      timedelta, logger=None):
242        """Write to log file.
243        """
244        if logger is None:
245            return
246        logger.info(
247            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
248            "%0.3f s (%0.4f s/item)" % (
249            path, mode, num, num - warnings, warnings,
250            timedelta, timedelta/(num or 1)))
251        return
252
253    def writeFailedRow(self, writer, row, warnings):
254        """Write a row with error messages to error CSV.
255
256        If warnings is a list of strings, they will be concatenated.
257        """
258        error_col = warnings
259        if isinstance(warnings, list):
260            error_col = ' / '.join(warnings)
261        row['--ERRORS--'] = error_col
262        writer.writerow(row)
263        return
264
265    def checkConversion(self, row, mode='ignore', ignore_empty=True):
266        """Validates all values in row.
267        """
268        converter = IObjectConverter(self.iface)
269        errs, inv_errs, conv_dict =  converter.fromStringDict(
270            row, self.factory_name, mode=mode)
271        return errs, inv_errs, conv_dict
272
273    def doImport(self, path, headerfields, mode='create', user='Unknown',
274                 logger=None, ignore_empty=True):
275        """Perform actual import.
276        """
277        time_start = time.time()
278        self.checkHeaders(headerfields, mode)
279        mapping = self.getMapping(path, headerfields, mode)
280        reader = csv.DictReader(open(path, 'rb'))
281
282        temp_dir = tempfile.mkdtemp()
283
284        base = os.path.basename(path)
285        (base, ext) = os.path.splitext(base)
286        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
287        failed_headers = mapping.values()
288        failed_headers.append('--ERRORS--')
289        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
290                                       failed_headers)
291        os.chmod(failed_path, 0664)
292        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
293
294        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
295        finished_headers = mapping.values()
296        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
297                                         finished_headers)
298        os.chmod(finished_path, 0664)
299        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
300
301        num =0
302        num_warns = 0
303        site = grok.getSite()
304
305        for raw_row in reader:
306            num += 1
307            string_row = self.applyMapping(raw_row, mapping)
308            if ignore_empty and mode in ('update',):
309                # replace empty strings with ignore-markers
310                for key, val in string_row.items():
311                    if val == '':
312                        string_row[key] = IGNORE_MARKER
313            row = dict(string_row.items()) # create deep copy
314            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
315            if errs or inv_errs:
316                num_warns += 1
317                conv_warnings = self.stringFromErrs(errs, inv_errs)
318                self.writeFailedRow(
319                    failed_writer, string_row, conv_warnings)
320                continue
321            row.update(conv_dict)
322
323            if mode == 'create':
324                if not self.parentsExist(row, site):
325                    num_warns += 1
326                    self.writeFailedRow(
327                        failed_writer, string_row,
328                        "Not all parents do exist yet. Skipping")
329                    continue
330                if self.entryExists(row, site):
331                    num_warns += 1
332                    self.writeFailedRow(
333                        failed_writer, string_row,
334                        "This object already exists. Skipping.")
335                    continue
336                obj = self.callFactory()
337                # Override all values in row, also
338                # student_ids and applicant_ids which have been
339                # generated in the respective __init__ methods before.
340                self.updateEntry(obj, row, site, base)
341                try:
342                    self.addEntry(obj, row, site)
343                except KeyError, error:
344                    num_warns += 1
345                    self.writeFailedRow(
346                        failed_writer, string_row,
347                        "%s Skipping." % error.message)
348                    continue
349                except DuplicationError, error:
350                    num_warns += 1
351                    self.writeFailedRow(
352                        failed_writer, string_row,
353                        "%s Skipping." % error.msg)
354                    continue
355            elif mode == 'remove':
356                if not self.entryExists(row, site):
357                    num_warns += 1
358                    self.writeFailedRow(
359                        failed_writer, string_row,
360                        "Cannot remove: no such entry")
361                    continue
362                self.delEntry(row, site)
363            elif mode == 'update':
364                obj = self.getEntry(row, site)
365                if obj is None:
366                    num_warns += 1
367                    self.writeFailedRow(
368                        failed_writer, string_row,
369                        "Cannot update: no such entry")
370                    continue
371                update_errors = self.checkUpdateRequirements(obj, row, site)
372                if update_errors is not None:
373                    num_warns += 1
374                    self.writeFailedRow(
375                        failed_writer, string_row, update_errors)
376                    continue
377                self.updateEntry(obj, row, site, base)
378            finished_writer.writerow(string_row)
379
380        time_end = time.time()
381        timedelta = time_end - time_start
382
383        self.createLogfile(path, failed_path, num, num_warns, mode, user,
384                           timedelta, logger=logger)
385        failed_path = os.path.abspath(failed_path)
386        if num_warns == 0:
387            del failed_writer
388            os.unlink(failed_path)
389            failed_path = None
390        return (num, num_warns,
391                os.path.abspath(finished_path), failed_path)
392
393    def get_csv_skeleton(self):
394        """Export CSV file only with a header of available fields.
395
396        A raw string with CSV data should be returned.
397        """
398        outfile = StringIO()
399        writer = csv.DictWriter(outfile, self.available_fields)
400        writer.writerow(
401            dict(zip(self.available_fields, self.available_fields))) # header
402        outfile.seek(0)
403        return outfile.read()
404
405class ExporterBase(object):
406    """A base for exporters.
407    """
408    grok.implements(ICSVExporter)
409
410    #: Fieldnames considered by this exporter
411    fields = ('code', 'title', 'title_prefix')
412
413    #: The title under which this exporter will be displayed
414    #: (if registered as a utility)
415    title = 'Override this title'
416
417    def mangle_value(self, value, name, context=None):
418        """Hook for mangling values in derived classes
419        """
420        if isinstance(value, bool):
421            value = value and '1' or '0'
422        elif isinstance(value, unicode):
423            # CSV writers like byte streams better than unicode
424            value = value.encode('utf-8')
425        elif isinstance(value, datetime.datetime):
426            value = str(value)
427        elif isinstance(value, datetime.date):
428            # Order is important here: check for date after datetime as
429            # datetimes are also dates.
430            #
431            # Append hash '#' to dates to circumvent unwanted excel automatic
432            value = str('%s#' % value)
433        elif value is None:
434            # None is not really representable in CSV files
435            value = ''
436        return value
437
438    def get_csv_writer(self, filepath=None):
439        """Get a CSV dict writer instance open for writing.
440
441        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
442        :class:`csv.DictWriter` instance and outfile is the real file
443        which is written to. The latter is important when writing to
444        StringIO and can normally be ignored otherwise.
445
446        The returned file will already be filled with the header row.
447
448        Please note that if you give a filepath, the returned outfile
449        is open for writing only and you might have to close it before
450        reopening it for reading.
451        """
452        if filepath is None:
453            outfile = StringIO()
454        else:
455            outfile = open(filepath, 'wb')
456        writer = csv.DictWriter(outfile, self.fields)
457        writer.writerow(dict(zip(self.fields, self.fields))) # header
458        return writer, outfile
459
460    def write_item(self, obj, writer):
461        """Write a row extracted from `obj` into CSV file using `writer`.
462        """
463        row = {}
464        for name in self.fields:
465            value = getattr(obj, name, None)
466            value = self.mangle_value(value, name, obj)
467            row[name] = value
468        writer.writerow(row)
469        return
470
471    def close_outfile(self, filepath, outfile):
472        """Close outfile.
473
474        If filepath is None, the contents of outfile is returned.
475        """
476        outfile.seek(0)
477        if filepath is None:
478            return outfile.read()
479        outfile.close()
480        return
481
482    def get_filtered(self, site, **kw):
483        """Get datasets to export filtered by keyword arguments.
484
485        Returns an iterable.
486        """
487        raise NotImplementedError
488
489    def export(self, iterable, filepath=None):
490        """Export `iterable` as CSV file.
491
492        If `filepath` is ``None``, a raw string with CSV data should
493        be returned.
494        """
495        raise NotImplementedError
496
497    def export_all(self, site, filepath=None):
498        """Export all appropriate objects in `site` into `filepath` as
499        CSV data.
500
501        If `filepath` is ``None``, a raw string with CSV data should
502        be returned.
503        """
504        raise NotImplementedError
505
506    def export_filtered(self, site, filepath=None, **kw):
507        """Export items denoted by `args` and `kw`.
508
509        If `filepath` is ``None``, a raw string with CSV data should
510        be returned.
511        """
512        data = self.get_filtered(site, **kw)
513        return self.export(data, filepath=filepath)
514
515def export_job(site, exporter_name, **kw):
516    """Export all entries delivered by exporter and store it in a temp file.
517
518    `site` gives the site to search. It will be passed to the exporter
519    and also be set as 'current site' as the function is used in
520    asynchronous jobs which run in their own threads and have no site
521    set initially. Therefore `site` must also be a valid value for use
522    with `zope.component.hooks.setSite()`.
523
524    `exporter_name` is the utility name under which the desired
525    exporter was registered with the ZCA.
526
527    The resulting CSV file will be stored in a new temporary directory
528    (using :func:`tempfile.mkdtemp`). It will be named after the
529    exporter used with `.csv` filename extension.
530
531    Returns the path to the created CSV file.
532
533    .. note:: It is the callers responsibility to clean up the used
534              file and its parent directory.
535    """
536    setSite(site)
537    exporter = getUtility(ICSVExporter, name=exporter_name)
538    output_dir = tempfile.mkdtemp()
539    filename = '%s.csv' % exporter_name
540    output_path = os.path.join(output_dir, filename)
541    if kw == {}:
542        exporter.export_all(site, filepath=output_path)
543    else:
544        exporter.export_filtered(site, filepath=output_path, **kw)
545    return output_path
546
547class AsyncExportJob(AsyncJob):
548    """An IJob that exports data to CSV files.
549
550    `AsyncExportJob` instances are regular `AsyncJob` instances with a
551    different constructor API. Instead of a callable to execute, you
552    must pass a `site` and some `exporter_name` to trigger an export.
553
554    The real work is done when an instance of this class is put into a
555    queue. See :mod:`waeup.kofa.async` to learn more about
556    asynchronous jobs.
557
558    The `exporter_name` must be the name under which an ICSVExporter
559    utility was registered with the ZCA.
560
561    The `site` must be a valid site  or ``None``.
562
563    The result of an `AsyncExportJob` is the path to generated CSV
564    file. The file will reside in a temporary directory that should be
565    removed after being used.
566    """
567    grok.implements(IExportJob)
568
569    def __init__(self, site, exporter_name, *args, **kwargs):
570        super(AsyncExportJob, self).__init__(
571            export_job, site, exporter_name, *args, **kwargs)
572
573class ExportJobContainer(object):
574    """A mix-in that provides functionality for asynchronous export jobs.
575    """
576    grok.implements(IExportJobContainer)
577    running_exports = PersistentList()
578
579    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
580        """Start asynchronous export job.
581
582        `exporter_name` is the name of an exporter utility to be used.
583
584        `user_id` is the ID of the user that triggers the export.
585
586        The job_id is stored along with exporter name and user id in a
587        persistent list.
588
589        The method supports additional positional and keyword
590        arguments, which are passed as-is to the respective
591        :class:`AsyncExportJob`.
592
593        Returns the job ID of the job started.
594        """
595        site = grok.getSite()
596        manager = getUtility(IJobManager)
597        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
598        job_id = manager.put(job)
599        # Make sure that the persisted list is stored in ZODB
600        self.running_exports = PersistentList(self.running_exports)
601        self.running_exports.append((job_id, exporter_name, user_id))
602        return job_id
603
604    def get_running_export_jobs(self, user_id=None):
605        """Get export jobs for user with `user_id` as list of tuples.
606
607        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
608        that order. The ``<exporter_name>`` is the utility name of the
609        used exporter.
610
611        If `user_id` is ``None``, all running jobs are returned.
612        """
613        entries = []
614        to_delete = []
615        manager = getUtility(IJobManager)
616        for entry in self.running_exports:
617            if user_id is not None and entry[2] != user_id:
618                continue
619            if manager.get(entry[0]) is None:
620                to_delete.append(entry)
621                continue
622            entries.append(entry)
623        if to_delete:
624            self.running_exports = PersistentList(
625                [x for x in self.running_exports if x not in to_delete])
626        return entries
627
628    def get_export_jobs_status(self, user_id=None):
629        """Get running/completed export jobs for `user_id` as list of tuples.
630
631        Each tuple holds ``<raw status>, <status translated>,
632        <exporter title>`` in that order, where ``<status
633        translated>`` and ``<exporter title>`` are translated strings
634        representing the status of the job and the human readable
635        title of the exporter used.
636        """
637        entries = self.get_running_export_jobs(user_id)
638        result = []
639        manager = getUtility(IJobManager)
640        for entry in entries:
641            job = manager.get(entry[0])
642            if job is None:
643                continue
644            status, status_translated = JOB_STATUS_MAP[job.status]
645            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
646            result.append((status, status_translated, exporter_name))
647        return result
648
649    def delete_export_entry(self, entry):
650        """Delete the export denoted by `entry`.
651
652        Removes given entry from the local `running_exports` list and also
653        removes the regarding job via the local job manager.
654
655        `entry` must be a tuple ``(<job id>, <exporter name>, <user
656        id>)`` as created by :meth:`start_export_job` or returned by
657        :meth:`get_running_export_jobs`.
658        """
659        manager = getUtility(IJobManager)
660        job = manager.get(entry[0])
661        if job is not None:
662            # remove created export file
663            if isinstance(job.result, basestring):
664                if os.path.exists(os.path.dirname(job.result)):
665                    shutil.rmtree(os.path.dirname(job.result))
666        manager.remove(entry[0], self)
667        new_entries = [x for x in self.running_exports
668                       if x != entry]
669        self.running_exports = PersistentList(new_entries)
670        return
671
672    def entry_from_job_id(self, job_id):
673        """Get entry tuple for `job_id`.
674
675        Returns ``None`` if no such entry can be found.
676        """
677        for entry in self.running_exports:
678            if entry[0] == job_id:
679                return entry
680        return None
681
682class VirtualExportJobContainer(ExportJobContainer):
683    """A virtual export job container.
684
685    Virtual ExportJobContainers can be used as a mixin just like real
686    ExportJobContainer.
687
688    They retrieve and store data in the site-wide ExportJobContainer.
689
690    Functionality is currently entirely as for regular
691    ExportJobContainers, except that data is stored elsewhere.
692
693    VirtualExportJobContainers need a registered
694    IExportContainerFinder utility to find a suitable container for
695    storing data.
696    """
697    grok.implements(IExportJobContainer)
698
699    @property
700    def _site_container(self):
701        return getUtility(IExportContainerFinder)()
702
703    # The following is a simple trick. While ExportJobContainers store
704    # only one attribute in ZODB, it is sufficient to replace this
705    # attribute `running_exports` with a suitable manager to make the
706    # whole virtual container work like the original but with the data
707    # stored in the site-wide exports container. This way, virtual
708    # export containers provide the whole functionality of a regular
709    # exports container but store no data at all with themselves.
710    @property
711    def running_exports(self):
712        """Exports stored in the site-wide exports container.
713        """
714        return self._site_container.running_exports
715
716    @running_exports.setter
717    def running_exports(self, value):
718        self._site_container.running_exports = value
719
720    @running_exports.deleter
721    def running_exports(self):
722        del self._site_container.running_exports
723
724
725@implementer(IExportContainerFinder)
726class ExportContainerFinder(grok.GlobalUtility):
727    """Finder for local (site-wide) export container.
728    """
729
730    def __call__(self):
731        """Get the local export container-
732
733        If no site can be determined or the site provides no export
734        container, None is returned.
735        """
736        site = grok.getSite()
737        if site is None:
738            return None
739        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.