source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12128

Last change on this file since 12128 was 12125, checked in by Henrik Bettermann, 10 years ago

Add batch processor for contracts.

  • Property svn:keywords set to Id
File size: 21.7 KB
RevLine 
[12004]1## $Id: batching.py 12125 2014-12-03 12:18:21Z henrik $
[11958]2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
[11985]27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
[11958]28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
[11964]39    IObjectHistory, IGNORE_MARKER)
[11958]40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
[12005]42from waeup.ikoba.documents.utils import generate_document_id
[12125]43from waeup.ikoba.customers.utils import generate_contract_id
[11958]44from waeup.ikoba.customers.interfaces import (
[11995]45    ICustomer, ICustomerUpdateByRegNo,
[12097]46    ICustomerDocument, IContract)
[11958]47from waeup.ikoba.customers.workflow import  (
[12089]48    IMPORTABLE_REGISTRATION_STATES, IMPORTABLE_REGISTRATION_TRANSITIONS,
[12097]49    IMPORTABLE_CONTRACT_STATES, IMPORTABLE_CONTRACT_TRANSITIONS)
[11958]50from waeup.ikoba.utils.batching import BatchProcessor
51
[11985]52
[11958]53class CustomerProcessor(BatchProcessor):
54    """A batch processor for ICustomer objects.
55    """
56    grok.implements(IBatchProcessor)
57    grok.provides(IBatchProcessor)
58    grok.context(Interface)
59    util_name = 'customerprocessor'
60    grok.name(util_name)
61
62    name = _('Customer Processor')
63    iface = ICustomer
64    iface_byregnumber = ICustomerUpdateByRegNo
65
66    location_fields = []
67    factory_name = 'waeup.Customer'
68
69    @property
70    def available_fields(self):
71        fields = getFields(self.iface)
72        return sorted(list(set(
[11985]73            ['customer_id', 'reg_number',
[11958]74            'password', 'state', 'transition'] + fields.keys())))
75
76    def checkHeaders(self, headerfields, mode='create'):
77        if 'state' in headerfields and 'transition' in headerfields:
78            raise FatalCSVError(
79                "State and transition can't be  imported at the same time!")
80        if not 'reg_number' in headerfields and not 'customer_id' \
81            in headerfields:
82            raise FatalCSVError(
83                "Need at least columns customer_id or reg_number " +
84                "for import!")
85        if mode == 'create':
86            for field in self.required_fields:
87                if not field in headerfields:
88                    raise FatalCSVError(
89                        "Need at least columns %s for import!" %
90                        ', '.join(["'%s'" % x for x in self.required_fields]))
91        # Check for fields to be ignored...
92        not_ignored_fields = [x for x in headerfields
93                              if not x.startswith('--')]
94        if len(set(not_ignored_fields)) < len(not_ignored_fields):
95            raise FatalCSVError(
96                "Double headers: each column name may only appear once.")
97        return True
98
99    def parentsExist(self, row, site):
100        return 'customers' in site.keys()
101
102    def getLocator(self, row):
[11985]103        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
[11958]104            return 'customer_id'
[11985]105        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
[11958]106            return 'reg_number'
107        else:
108            return None
109
110    # The entry never exists in create mode.
111    def entryExists(self, row, site):
112        return self.getEntry(row, site) is not None
113
114    def getParent(self, row, site):
115        return site['customers']
116
117    def getEntry(self, row, site):
118        if not 'customers' in site.keys():
119            return None
120        if self.getLocator(row) == 'customer_id':
121            if row['customer_id'] in site['customers']:
122                customer = site['customers'][row['customer_id']]
123                return customer
124        elif self.getLocator(row) == 'reg_number':
125            reg_number = row['reg_number']
126            cat = queryUtility(ICatalog, name='customers_catalog')
127            results = list(
128                cat.searchResults(reg_number=(reg_number, reg_number)))
129            if results:
130                return results[0]
131        return None
132
133    def addEntry(self, obj, row, site):
134        parent = self.getParent(row, site)
135        parent.addCustomer(obj)
136        # Reset _curr_cust_id if customer_id has been imported
137        if self.getLocator(row) == 'customer_id':
138            parent._curr_cust_id -= 1
139        # We have to log this if state is provided. If not,
140        # logging is done by the event handler handle_customer_added
141        if 'state' in row:
142            parent.logger.info('%s - Customer record created' % obj.customer_id)
143        history = IObjectHistory(obj)
144        history.addMessage(_('Customer record created'))
145        return
146
147    def delEntry(self, row, site):
148        customer = self.getEntry(row, site)
149        if customer is not None:
150            parent = self.getParent(row, site)
151            parent.logger.info('%s - Customer removed' % customer.customer_id)
152            del parent[customer.customer_id]
153        pass
154
155    def checkUpdateRequirements(self, obj, row, site):
156        """Checks requirements the object must fulfill when being updated.
157
158        This method is not used in case of deleting or adding objects.
159
160        Returns error messages as strings in case of requirement
161        problems.
162        """
163        transition = row.get('transition', IGNORE_MARKER)
164        if transition not in (IGNORE_MARKER, ''):
165            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
166            if transition not in allowed_transitions:
167                return 'Transition not allowed.'
168        return None
169
170    def updateEntry(self, obj, row, site, filename):
171        """Update obj to the values given in row.
172        """
173        items_changed = ''
174
175        # Remove customer_id from row if empty
176        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
177            row.pop('customer_id')
178
179        # Update password
180        # XXX: Take DELETION_MARKER into consideration
181        if 'password' in row:
182            passwd = row.get('password', IGNORE_MARKER)
183            if passwd not in ('', IGNORE_MARKER):
184                if passwd.startswith('{SSHA}'):
185                    # already encrypted password
186                    obj.password = passwd
187                else:
188                    # not yet encrypted password
189                    IUserAccount(obj).setPassword(passwd)
190                items_changed += ('%s=%s, ' % ('password', passwd))
191            row.pop('password')
192
193        # Update registration state
194        if 'state' in row:
195            state = row.get('state', IGNORE_MARKER)
196            if state not in (IGNORE_MARKER, ''):
197                value = row['state']
198                IWorkflowState(obj).setState(value)
[11985]199                msg = _("State '${a}' set", mapping={'a': value})
[11958]200                history = IObjectHistory(obj)
201                history.addMessage(msg)
202                items_changed += ('%s=%s, ' % ('state', state))
203            row.pop('state')
204
205        if 'transition' in row:
206            transition = row.get('transition', IGNORE_MARKER)
207            if transition not in (IGNORE_MARKER, ''):
208                value = row['transition']
209                IWorkflowInfo(obj).fireTransition(value)
210                items_changed += ('%s=%s, ' % ('transition', transition))
211            row.pop('transition')
212
213        # apply other values...
214        items_changed += super(CustomerProcessor, self).updateEntry(
215            obj, row, site, filename)
216
217        # Log actions...
218        parent = self.getParent(row, site)
[11985]219        if hasattr(obj, 'customer_id'):
[11958]220            # Update mode: the customer exists and we can get the customer_id.
221            # Create mode: the record contains the customer_id
222            parent.logger.info(
223                '%s - %s - %s - updated: %s'
224                % (self.name, filename, obj.customer_id, items_changed))
225        else:
226            # Create mode: the customer does not yet exist
227            # XXX: It seems that this never happens because customer_id
228            # is always set.
229            parent.logger.info(
[11985]230                '%s - %s - %s - imported: %s'
[11958]231                % (self.name, filename, obj.customer_id, items_changed))
232        return items_changed
233
234    def getMapping(self, path, headerfields, mode):
235        """Get a mapping from CSV file headerfields to actually used fieldnames.
236        """
237        result = dict()
238        reader = csv.reader(open(path, 'rb'))
239        raw_header = reader.next()
240        for num, field in enumerate(headerfields):
241            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
242                continue
243            if field == u'--IGNORE--':
244                # Skip ignored columns in failed and finished data files.
245                continue
246            result[raw_header[num]] = field
247        return result
248
249    def checkConversion(self, row, mode='create'):
250        """Validates all values in row.
251        """
252        iface = self.iface
253        if mode in ['update', 'remove']:
254            if self.getLocator(row) == 'reg_number':
255                iface = self.iface_byregnumber
256        converter = IObjectConverter(iface)
[11985]257        errs, inv_errs, conv_dict = converter.fromStringDict(
[11958]258            row, self.factory_name, mode=mode)
259        if 'transition' in row:
[12089]260            if row['transition'] not in IMPORTABLE_REGISTRATION_TRANSITIONS:
[11958]261                if row['transition'] not in (IGNORE_MARKER, ''):
[11985]262                    errs.append(('transition', 'not allowed'))
[11958]263        if 'state' in row:
[12089]264            if row['state'] not in IMPORTABLE_REGISTRATION_STATES:
[11958]265                if row['state'] not in (IGNORE_MARKER, ''):
[11985]266                    errs.append(('state', 'not allowed'))
[11958]267                else:
268                    # State is an attribute of Customer and must not
269                    # be changed if empty.
270                    conv_dict['state'] = IGNORE_MARKER
271        try:
272            # Correct cust_id counter. As the IConverter for customers
273            # creates customer objects that are not used afterwards, we
274            # have to fix the site-wide customer_id counter.
275            site = grok.getSite()
276            customers = site['customers']
277            customers._curr_cust_id -= 1
278        except (KeyError, TypeError, AttributeError):
279                pass
280        return errs, inv_errs, conv_dict
281
282
283class CustomerProcessorBase(BatchProcessor):
[11995]284    """A base for customer subitem processors.
[11958]285
286    Helps reducing redundancy.
287    """
288    grok.baseclass()
289
290    # additional available fields
291    # beside 'customer_id', 'reg_number' and 'matric_number'
292    additional_fields = []
293
294    #: header fields additionally required
295    additional_headers = []
296
297    @property
298    def available_fields(self):
[11985]299        fields = ['customer_id', 'reg_number'] + self.additional_fields
[11958]300        return sorted(list(set(fields + getFields(
301                self.iface).keys())))
302
303    def checkHeaders(self, headerfields, mode='ignore'):
304        if not 'reg_number' in headerfields and not 'customer_id' \
305            in headerfields:
306            raise FatalCSVError(
307                "Need at least columns customer_id " +
308                "or reg_number for import!")
309        for name in self.additional_headers:
310            if not name in headerfields:
311                raise FatalCSVError(
312                    "Need %s for import!" % name)
313
314        # Check for fields to be ignored...
315        not_ignored_fields = [x for x in headerfields
316                              if not x.startswith('--')]
317        if len(set(not_ignored_fields)) < len(not_ignored_fields):
318            raise FatalCSVError(
319                "Double headers: each column name may only appear once.")
320        return True
321
322    def _getCustomer(self, row, site):
323        NON_VALUES = ['', IGNORE_MARKER]
324        if not 'customers' in site.keys():
325            return None
326        if row.get('customer_id', '') not in NON_VALUES:
327            if row['customer_id'] in site['customers']:
328                customer = site['customers'][row['customer_id']]
329                return customer
330        elif row.get('reg_number', '') not in NON_VALUES:
331            reg_number = row['reg_number']
332            cat = queryUtility(ICatalog, name='customers_catalog')
333            results = list(
334                cat.searchResults(reg_number=(reg_number, reg_number)))
335            if results:
336                return results[0]
337        return None
338
339    def parentsExist(self, row, site):
340        return self.getParent(row, site) is not None
341
342    def entryExists(self, row, site):
343        return self.getEntry(row, site) is not None
344
345    def checkConversion(self, row, mode='ignore'):
346        """Validates all values in row.
347        """
348        converter = IObjectConverter(self.iface)
[11985]349        errs, inv_errs, conv_dict = converter.fromStringDict(
[11958]350            row, self.factory_name, mode=mode)
351        return errs, inv_errs, conv_dict
352
353    def getMapping(self, path, headerfields, mode):
354        """Get a mapping from CSV file headerfields to actually used fieldnames.
355        """
356        result = dict()
357        reader = csv.reader(open(path, 'rb'))
358        raw_header = reader.next()
359        for num, field in enumerate(headerfields):
[12125]360            if field not in ['customer_id', 'reg_number', 'document_id',
361                             'code', 'level', 'contract_id'
[11958]362                             ] and mode == 'remove':
363                continue
364            if field == u'--IGNORE--':
365                # Skip ignored columns in failed and finished data files.
366                continue
367            result[raw_header[num]] = field
368        return result
[11995]369
370class CustomerDocumentProcessor(CustomerProcessorBase):
371    """A batch processor for ICustomerDocument objects.
372    """
373    grok.implements(IBatchProcessor)
374    grok.provides(IBatchProcessor)
375    grok.context(Interface)
376    util_name = 'customerdocumentprocessor'
377    grok.name(util_name)
378
379    name = _('CustomerDocument Processor')
380    iface = ICustomerDocument
[12057]381    factory_name = 'waeup.CustomerSampleDocument'
[11995]382
383    location_fields = []
[12056]384    additional_fields = ['document_id', 'class_name']
385    additional_headers = ['class_name']
[11995]386
387    def checkHeaders(self, headerfields, mode='ignore'):
388        super(CustomerDocumentProcessor, self).checkHeaders(headerfields)
[12004]389        if mode in ('update', 'remove') and not 'document_id' in headerfields:
[11995]390            raise FatalCSVError(
[12004]391                "Need document_id for import in update and remove modes!")
[11995]392        return True
393
394    def getParent(self, row, site):
395        customer = self._getCustomer(row, site)
396        if customer is None:
397            return None
398        return customer['documents']
399
400    def getEntry(self, row, site):
401        documents = self.getParent(row, site)
402        if documents is None:
403            return None
[12004]404        document_id = row.get('document_id', None)
405        if document_id is None:
[11995]406            return None
[12004]407        entry = documents.get(document_id)
[11995]408        return entry
409
410    def updateEntry(self, obj, row, site, filename):
411        """Update obj to the values given in row.
412        """
413        items_changed = super(CustomerDocumentProcessor, self).updateEntry(
414            obj, row, site, filename)
415        customer = self.getParent(row, site).__parent__
416        customer.__parent__.logger.info(
417            '%s - %s - %s - updated: %s'
418            % (self.name, filename, customer.customer_id, items_changed))
419        return
420
421    def addEntry(self, obj, row, site):
422        parent = self.getParent(row, site)
[12004]423        document_id = row['document_id'].strip('#')
424        parent[document_id] = obj
[11995]425        return
426
427    def delEntry(self, row, site):
428        document = self.getEntry(row, site)
429        parent = self.getParent(row, site)
430        if document is not None:
431            customer = self._getCustomer(row, site)
432            customer.__parent__.logger.info('%s - Document removed: %s'
[12004]433                % (customer.customer_id, document.document_id))
434            del parent[document.document_id]
[11995]435        return
436
437    def checkConversion(self, row, mode='ignore'):
438        """Validates all values in row.
439        """
440        errs, inv_errs, conv_dict = super(
441            CustomerDocumentProcessor, self).checkConversion(row, mode=mode)
[12056]442        # We need to check if the class_name corresponds with the
[12054]443        # processor chosen. This is to avoid accidentally wrong imports.
444        if mode != 'remove':
[12056]445            class_name = row.get('class_name', None)
446            if class_name != self.factory_name.strip('waeup.'):
447                errs.append(('class_name','wrong processor'))
[12004]448        # We have to check document_id.
449        document_id = row.get('document_id', None)
[12005]450        if mode == 'create':
[12054]451            if not document_id:
452                document_id = generate_document_id()
453                conv_dict['document_id'] = document_id
454                return errs, inv_errs, conv_dict
[12005]455            cat = queryUtility(ICatalog, name='documents_catalog')
456            results = list(
457                cat.searchResults(document_id=(document_id, document_id)))
458            if results:
[12054]459                # document_id must not exist.
[12005]460                errs.append(('document_id','id exists'))
[12054]461        else:
462            if not document_id.startswith('d'):
463                errs.append(('document_id','invalid format'))
[11995]464        return errs, inv_errs, conv_dict
[12094]465
466
[12097]467class ContractProcessor(CustomerProcessorBase):
468    """A batch processor for IContract objects.
[12094]469    """
470    grok.implements(IBatchProcessor)
471    grok.provides(IBatchProcessor)
472    grok.context(Interface)
[12097]473    util_name = 'contractprocessor'
[12094]474    grok.name(util_name)
475
[12097]476    name = _('Contract Processor')
477    iface = IContract
478    factory_name = 'waeup.SampleContract'
[12094]479
480    location_fields = []
[12097]481    additional_fields = ['contract_id', 'class_name']
[12094]482    additional_headers = ['class_name']
483
484    def checkHeaders(self, headerfields, mode='ignore'):
[12097]485        super(ContractProcessor, self).checkHeaders(headerfields)
486        if mode in ('update', 'remove') and not 'contract_id' in headerfields:
[12094]487            raise FatalCSVError(
[12097]488                "Need contract_id for import in update and remove modes!")
[12094]489        return True
490
491    def getParent(self, row, site):
492        customer = self._getCustomer(row, site)
493        if customer is None:
494            return None
[12097]495        return customer['contracts']
[12094]496
497    def getEntry(self, row, site):
[12097]498        contracts = self.getParent(row, site)
499        if contracts is None:
[12094]500            return None
[12097]501        contract_id = row.get('contract_id', None)
502        if contract_id is None:
[12094]503            return None
[12097]504        entry = contracts.get(contract_id)
[12094]505        return entry
506
507    def updateEntry(self, obj, row, site, filename):
508        """Update obj to the values given in row.
509        """
[12097]510        items_changed = super(ContractProcessor, self).updateEntry(
[12094]511            obj, row, site, filename)
512        customer = self.getParent(row, site).__parent__
513        customer.__parent__.logger.info(
514            '%s - %s - %s - updated: %s'
515            % (self.name, filename, customer.customer_id, items_changed))
516        return
517
518    def addEntry(self, obj, row, site):
519        parent = self.getParent(row, site)
[12097]520        contract_id = row['contract_id'].strip('#')
521        parent[contract_id] = obj
[12094]522        return
523
524    def delEntry(self, row, site):
[12097]525        contract = self.getEntry(row, site)
[12094]526        parent = self.getParent(row, site)
[12097]527        if contract is not None:
[12094]528            customer = self._getCustomer(row, site)
[12097]529            customer.__parent__.logger.info('%s - Contract removed: %s'
530                % (customer.customer_id, contract.contract_id))
531            del parent[contract.contract_id]
[12094]532        return
533
534    def checkConversion(self, row, mode='ignore'):
535        """Validates all values in row.
536        """
537        errs, inv_errs, conv_dict = super(
[12097]538            ContractProcessor, self).checkConversion(row, mode=mode)
[12094]539        # We need to check if the class_name corresponds with the
540        # processor chosen. This is to avoid accidentally wrong imports.
541        if mode != 'remove':
542            class_name = row.get('class_name', None)
543            if class_name != self.factory_name.strip('waeup.'):
544                errs.append(('class_name','wrong processor'))
[12097]545        # We have to check contract_id.
546        contract_id = row.get('contract_id', None)
[12094]547        if mode == 'create':
[12097]548            if not contract_id:
549                contract_id = generate_contract_id()
550                conv_dict['contract_id'] = contract_id
[12094]551                return errs, inv_errs, conv_dict
[12097]552            cat = queryUtility(ICatalog, name='contracts_catalog')
[12094]553            results = list(
[12097]554                cat.searchResults(contract_id=(contract_id, contract_id)))
[12094]555            if results:
[12097]556                # contract_id must not exist.
557                errs.append(('contract_id','id exists'))
[12094]558        else:
[12125]559            if not contract_id.startswith('c'):
[12097]560                errs.append(('contract_id','invalid format'))
[12125]561        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.