source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12000

Last change on this file since 12000 was 11997, checked in by Henrik Bettermann, 10 years ago

propset svn:keywords "Id"

  • Property svn:keywords set to Id
File size: 17.0 KB
RevLine 
[11958]1## $Id: batching.py 11997 2014-11-19 17:02:48Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
[11985]27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
[11958]28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
[11964]39    IObjectHistory, IGNORE_MARKER)
[11958]40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.customers.interfaces import (
[11995]43    ICustomer, ICustomerUpdateByRegNo,
44    ICustomerDocument)
[11958]45from waeup.ikoba.customers.workflow import  (
46    IMPORTABLE_STATES, IMPORTABLE_TRANSITIONS)
47from waeup.ikoba.utils.batching import BatchProcessor
48
[11985]49
[11958]50class CustomerProcessor(BatchProcessor):
51    """A batch processor for ICustomer objects.
52    """
53    grok.implements(IBatchProcessor)
54    grok.provides(IBatchProcessor)
55    grok.context(Interface)
56    util_name = 'customerprocessor'
57    grok.name(util_name)
58
59    name = _('Customer Processor')
60    iface = ICustomer
61    iface_byregnumber = ICustomerUpdateByRegNo
62
63    location_fields = []
64    factory_name = 'waeup.Customer'
65
66    @property
67    def available_fields(self):
68        fields = getFields(self.iface)
69        return sorted(list(set(
[11985]70            ['customer_id', 'reg_number',
[11958]71            'password', 'state', 'transition'] + fields.keys())))
72
73    def checkHeaders(self, headerfields, mode='create'):
74        if 'state' in headerfields and 'transition' in headerfields:
75            raise FatalCSVError(
76                "State and transition can't be  imported at the same time!")
77        if not 'reg_number' in headerfields and not 'customer_id' \
78            in headerfields:
79            raise FatalCSVError(
80                "Need at least columns customer_id or reg_number " +
81                "for import!")
82        if mode == 'create':
83            for field in self.required_fields:
84                if not field in headerfields:
85                    raise FatalCSVError(
86                        "Need at least columns %s for import!" %
87                        ', '.join(["'%s'" % x for x in self.required_fields]))
88        # Check for fields to be ignored...
89        not_ignored_fields = [x for x in headerfields
90                              if not x.startswith('--')]
91        if len(set(not_ignored_fields)) < len(not_ignored_fields):
92            raise FatalCSVError(
93                "Double headers: each column name may only appear once.")
94        return True
95
96    def parentsExist(self, row, site):
97        return 'customers' in site.keys()
98
99    def getLocator(self, row):
[11985]100        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
[11958]101            return 'customer_id'
[11985]102        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
[11958]103            return 'reg_number'
104        else:
105            return None
106
107    # The entry never exists in create mode.
108    def entryExists(self, row, site):
109        return self.getEntry(row, site) is not None
110
111    def getParent(self, row, site):
112        return site['customers']
113
114    def getEntry(self, row, site):
115        if not 'customers' in site.keys():
116            return None
117        if self.getLocator(row) == 'customer_id':
118            if row['customer_id'] in site['customers']:
119                customer = site['customers'][row['customer_id']]
120                return customer
121        elif self.getLocator(row) == 'reg_number':
122            reg_number = row['reg_number']
123            cat = queryUtility(ICatalog, name='customers_catalog')
124            results = list(
125                cat.searchResults(reg_number=(reg_number, reg_number)))
126            if results:
127                return results[0]
128        return None
129
130    def addEntry(self, obj, row, site):
131        parent = self.getParent(row, site)
132        parent.addCustomer(obj)
133        # Reset _curr_cust_id if customer_id has been imported
134        if self.getLocator(row) == 'customer_id':
135            parent._curr_cust_id -= 1
136        # We have to log this if state is provided. If not,
137        # logging is done by the event handler handle_customer_added
138        if 'state' in row:
139            parent.logger.info('%s - Customer record created' % obj.customer_id)
140        history = IObjectHistory(obj)
141        history.addMessage(_('Customer record created'))
142        return
143
144    def delEntry(self, row, site):
145        customer = self.getEntry(row, site)
146        if customer is not None:
147            parent = self.getParent(row, site)
148            parent.logger.info('%s - Customer removed' % customer.customer_id)
149            del parent[customer.customer_id]
150        pass
151
152    def checkUpdateRequirements(self, obj, row, site):
153        """Checks requirements the object must fulfill when being updated.
154
155        This method is not used in case of deleting or adding objects.
156
157        Returns error messages as strings in case of requirement
158        problems.
159        """
160        transition = row.get('transition', IGNORE_MARKER)
161        if transition not in (IGNORE_MARKER, ''):
162            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
163            if transition not in allowed_transitions:
164                return 'Transition not allowed.'
165        return None
166
167    def updateEntry(self, obj, row, site, filename):
168        """Update obj to the values given in row.
169        """
170        items_changed = ''
171
172        # Remove customer_id from row if empty
173        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
174            row.pop('customer_id')
175
176        # Update password
177        # XXX: Take DELETION_MARKER into consideration
178        if 'password' in row:
179            passwd = row.get('password', IGNORE_MARKER)
180            if passwd not in ('', IGNORE_MARKER):
181                if passwd.startswith('{SSHA}'):
182                    # already encrypted password
183                    obj.password = passwd
184                else:
185                    # not yet encrypted password
186                    IUserAccount(obj).setPassword(passwd)
187                items_changed += ('%s=%s, ' % ('password', passwd))
188            row.pop('password')
189
190        # Update registration state
191        if 'state' in row:
192            state = row.get('state', IGNORE_MARKER)
193            if state not in (IGNORE_MARKER, ''):
194                value = row['state']
195                IWorkflowState(obj).setState(value)
[11985]196                msg = _("State '${a}' set", mapping={'a': value})
[11958]197                history = IObjectHistory(obj)
198                history.addMessage(msg)
199                items_changed += ('%s=%s, ' % ('state', state))
200            row.pop('state')
201
202        if 'transition' in row:
203            transition = row.get('transition', IGNORE_MARKER)
204            if transition not in (IGNORE_MARKER, ''):
205                value = row['transition']
206                IWorkflowInfo(obj).fireTransition(value)
207                items_changed += ('%s=%s, ' % ('transition', transition))
208            row.pop('transition')
209
210        # apply other values...
211        items_changed += super(CustomerProcessor, self).updateEntry(
212            obj, row, site, filename)
213
214        # Log actions...
215        parent = self.getParent(row, site)
[11985]216        if hasattr(obj, 'customer_id'):
[11958]217            # Update mode: the customer exists and we can get the customer_id.
218            # Create mode: the record contains the customer_id
219            parent.logger.info(
220                '%s - %s - %s - updated: %s'
221                % (self.name, filename, obj.customer_id, items_changed))
222        else:
223            # Create mode: the customer does not yet exist
224            # XXX: It seems that this never happens because customer_id
225            # is always set.
226            parent.logger.info(
[11985]227                '%s - %s - %s - imported: %s'
[11958]228                % (self.name, filename, obj.customer_id, items_changed))
229        return items_changed
230
231    def getMapping(self, path, headerfields, mode):
232        """Get a mapping from CSV file headerfields to actually used fieldnames.
233        """
234        result = dict()
235        reader = csv.reader(open(path, 'rb'))
236        raw_header = reader.next()
237        for num, field in enumerate(headerfields):
238            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
239                continue
240            if field == u'--IGNORE--':
241                # Skip ignored columns in failed and finished data files.
242                continue
243            result[raw_header[num]] = field
244        return result
245
246    def checkConversion(self, row, mode='create'):
247        """Validates all values in row.
248        """
249        iface = self.iface
250        if mode in ['update', 'remove']:
251            if self.getLocator(row) == 'reg_number':
252                iface = self.iface_byregnumber
253        converter = IObjectConverter(iface)
[11985]254        errs, inv_errs, conv_dict = converter.fromStringDict(
[11958]255            row, self.factory_name, mode=mode)
256        if 'transition' in row:
257            if row['transition'] not in IMPORTABLE_TRANSITIONS:
258                if row['transition'] not in (IGNORE_MARKER, ''):
[11985]259                    errs.append(('transition', 'not allowed'))
[11958]260        if 'state' in row:
261            if row['state'] not in IMPORTABLE_STATES:
262                if row['state'] not in (IGNORE_MARKER, ''):
[11985]263                    errs.append(('state', 'not allowed'))
[11958]264                else:
265                    # State is an attribute of Customer and must not
266                    # be changed if empty.
267                    conv_dict['state'] = IGNORE_MARKER
268        try:
269            # Correct cust_id counter. As the IConverter for customers
270            # creates customer objects that are not used afterwards, we
271            # have to fix the site-wide customer_id counter.
272            site = grok.getSite()
273            customers = site['customers']
274            customers._curr_cust_id -= 1
275        except (KeyError, TypeError, AttributeError):
276                pass
277        return errs, inv_errs, conv_dict
278
279
280class CustomerProcessorBase(BatchProcessor):
[11995]281    """A base for customer subitem processors.
[11958]282
283    Helps reducing redundancy.
284    """
285    grok.baseclass()
286
287    # additional available fields
288    # beside 'customer_id', 'reg_number' and 'matric_number'
289    additional_fields = []
290
291    #: header fields additionally required
292    additional_headers = []
293
294    @property
295    def available_fields(self):
[11985]296        fields = ['customer_id', 'reg_number'] + self.additional_fields
[11958]297        return sorted(list(set(fields + getFields(
298                self.iface).keys())))
299
300    def checkHeaders(self, headerfields, mode='ignore'):
301        if not 'reg_number' in headerfields and not 'customer_id' \
302            in headerfields:
303            raise FatalCSVError(
304                "Need at least columns customer_id " +
305                "or reg_number for import!")
306        for name in self.additional_headers:
307            if not name in headerfields:
308                raise FatalCSVError(
309                    "Need %s for import!" % name)
310
311        # Check for fields to be ignored...
312        not_ignored_fields = [x for x in headerfields
313                              if not x.startswith('--')]
314        if len(set(not_ignored_fields)) < len(not_ignored_fields):
315            raise FatalCSVError(
316                "Double headers: each column name may only appear once.")
317        return True
318
319    def _getCustomer(self, row, site):
320        NON_VALUES = ['', IGNORE_MARKER]
321        if not 'customers' in site.keys():
322            return None
323        if row.get('customer_id', '') not in NON_VALUES:
324            if row['customer_id'] in site['customers']:
325                customer = site['customers'][row['customer_id']]
326                return customer
327        elif row.get('reg_number', '') not in NON_VALUES:
328            reg_number = row['reg_number']
329            cat = queryUtility(ICatalog, name='customers_catalog')
330            results = list(
331                cat.searchResults(reg_number=(reg_number, reg_number)))
332            if results:
333                return results[0]
334        return None
335
336    def parentsExist(self, row, site):
337        return self.getParent(row, site) is not None
338
339    def entryExists(self, row, site):
340        return self.getEntry(row, site) is not None
341
342    def checkConversion(self, row, mode='ignore'):
343        """Validates all values in row.
344        """
345        converter = IObjectConverter(self.iface)
[11985]346        errs, inv_errs, conv_dict = converter.fromStringDict(
[11958]347            row, self.factory_name, mode=mode)
348        return errs, inv_errs, conv_dict
349
350    def getMapping(self, path, headerfields, mode):
351        """Get a mapping from CSV file headerfields to actually used fieldnames.
352        """
353        result = dict()
354        reader = csv.reader(open(path, 'rb'))
355        raw_header = reader.next()
356        for num, field in enumerate(headerfields):
[11995]357            if field not in ['customer_id', 'reg_number', 'id', 'code', 'level'
[11958]358                             ] and mode == 'remove':
359                continue
360            if field == u'--IGNORE--':
361                # Skip ignored columns in failed and finished data files.
362                continue
363            result[raw_header[num]] = field
364        return result
[11995]365
366class CustomerDocumentProcessor(CustomerProcessorBase):
367    """A batch processor for ICustomerDocument objects.
368    """
369    grok.implements(IBatchProcessor)
370    grok.provides(IBatchProcessor)
371    grok.context(Interface)
372    util_name = 'customerdocumentprocessor'
373    grok.name(util_name)
374
375    name = _('CustomerDocument Processor')
376    iface = ICustomerDocument
377    factory_name = 'waeup.CustomerDocument'
378
379    location_fields = []
380    additional_fields = ['id']
381    additional_headers = []
382
383    def checkHeaders(self, headerfields, mode='ignore'):
384        super(CustomerDocumentProcessor, self).checkHeaders(headerfields)
385        if mode in ('update', 'remove') and not 'id' in headerfields:
386            raise FatalCSVError(
387                "Need id for import in update and remove modes!")
388        return True
389
390    def getParent(self, row, site):
391        customer = self._getCustomer(row, site)
392        if customer is None:
393            return None
394        return customer['documents']
395
396    def getEntry(self, row, site):
397        documents = self.getParent(row, site)
398        if documents is None:
399            return None
400        id = row.get('id', None)
401        if id is None:
402            return None
403        entry = documents.get(id)
404        return entry
405
406    def updateEntry(self, obj, row, site, filename):
407        """Update obj to the values given in row.
408        """
409        items_changed = super(CustomerDocumentProcessor, self).updateEntry(
410            obj, row, site, filename)
411        customer = self.getParent(row, site).__parent__
412        customer.__parent__.logger.info(
413            '%s - %s - %s - updated: %s'
414            % (self.name, filename, customer.customer_id, items_changed))
415        return
416
417    def addEntry(self, obj, row, site):
418        parent = self.getParent(row, site)
419        id = row['id'].strip('#')
420        parent[id] = obj
421        return
422
423    def delEntry(self, row, site):
424        document = self.getEntry(row, site)
425        parent = self.getParent(row, site)
426        if document is not None:
427            customer = self._getCustomer(row, site)
428            customer.__parent__.logger.info('%s - Document removed: %s'
429                % (customer.customer_id, document.id))
430            del parent[document.id]
431        return
432
433    def checkConversion(self, row, mode='ignore'):
434        """Validates all values in row.
435        """
436        errs, inv_errs, conv_dict = super(
437            CustomerDocumentProcessor, self).checkConversion(row, mode=mode)
438
439        # We have to check id.
440        id = row.get('id', None)
441        if not id:
442            timestamp = ("%d" % int(time()*10000))[1:]
443            id = "d%s" % timestamp
444            conv_dict['id'] = id
445            return errs, inv_errs, conv_dict
446        if not id.startswith('d') or len(id) != 14:
447            errs.append(('id','invalid format'))
448        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.