source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12258

Last change on this file since 12258 was 12258, checked in by Henrik Bettermann, 10 years ago

Change contract_id generation algorithm. Use Universally Unique IDentifiers instead of consecutive numbers.

  • Property svn:keywords set to Id
File size: 20.6 KB
Line 
1## $Id: batching.py 12258 2014-12-18 14:44:30Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
39    IObjectHistory, IGNORE_MARKER)
40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.customers.utils import generate_contract_id
43from waeup.ikoba.customers.interfaces import (
44    ICustomer, ICustomerUpdateByRegNo,
45    ICustomerDocument, IContract, ISampleContract)
46from waeup.ikoba.customers.workflow import  (
47    IMPORTABLE_REGISTRATION_STATES, IMPORTABLE_REGISTRATION_TRANSITIONS)
48from waeup.ikoba.utils.batching import BatchProcessor
49
50
51class CustomerProcessor(BatchProcessor):
52    """A batch processor for ICustomer objects.
53    """
54    grok.implements(IBatchProcessor)
55    grok.provides(IBatchProcessor)
56    grok.context(Interface)
57    util_name = 'customerprocessor'
58    grok.name(util_name)
59
60    name = _('Customer Processor')
61    iface = ICustomer
62    iface_byregnumber = ICustomerUpdateByRegNo
63
64    location_fields = []
65    factory_name = 'waeup.Customer'
66
67    @property
68    def available_fields(self):
69        fields = getFields(self.iface)
70        return sorted(list(set(
71            ['customer_id', 'reg_number',
72            'password', 'state', 'transition'] + fields.keys())))
73
74    def checkHeaders(self, headerfields, mode='create'):
75        if 'state' in headerfields and 'transition' in headerfields:
76            raise FatalCSVError(
77                "State and transition can't be  imported at the same time!")
78        if not 'reg_number' in headerfields and not 'customer_id' \
79            in headerfields:
80            raise FatalCSVError(
81                "Need at least columns customer_id or reg_number " +
82                "for import!")
83        if mode == 'create':
84            for field in self.required_fields:
85                if not field in headerfields:
86                    raise FatalCSVError(
87                        "Need at least columns %s for import!" %
88                        ', '.join(["'%s'" % x for x in self.required_fields]))
89        # Check for fields to be ignored...
90        not_ignored_fields = [x for x in headerfields
91                              if not x.startswith('--')]
92        if len(set(not_ignored_fields)) < len(not_ignored_fields):
93            raise FatalCSVError(
94                "Double headers: each column name may only appear once.")
95        return True
96
97    def parentsExist(self, row, site):
98        return 'customers' in site.keys()
99
100    def getLocator(self, row):
101        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
102            return 'customer_id'
103        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
104            return 'reg_number'
105        else:
106            return None
107
108    # The entry never exists in create mode.
109    def entryExists(self, row, site):
110        return self.getEntry(row, site) is not None
111
112    def getParent(self, row, site):
113        return site['customers']
114
115    def getEntry(self, row, site):
116        if not 'customers' in site.keys():
117            return None
118        if self.getLocator(row) == 'customer_id':
119            if row['customer_id'] in site['customers']:
120                customer = site['customers'][row['customer_id']]
121                return customer
122        elif self.getLocator(row) == 'reg_number':
123            reg_number = row['reg_number']
124            cat = queryUtility(ICatalog, name='customers_catalog')
125            results = list(
126                cat.searchResults(reg_number=(reg_number, reg_number)))
127            if results:
128                return results[0]
129        return None
130
131    def addEntry(self, obj, row, site):
132        parent = self.getParent(row, site)
133        parent.addCustomer(obj)
134        # Reset _curr_cust_id if customer_id has been imported
135        if self.getLocator(row) == 'customer_id':
136            parent._curr_cust_id -= 1
137        # We have to log this if state is provided. If not,
138        # logging is done by the event handler handle_customer_added
139        if 'state' in row:
140            parent.logger.info('%s - Customer record created' % obj.customer_id)
141        history = IObjectHistory(obj)
142        history.addMessage(_('Customer record created'))
143        return
144
145    def delEntry(self, row, site):
146        customer = self.getEntry(row, site)
147        if customer is not None:
148            parent = self.getParent(row, site)
149            parent.logger.info('%s - Customer removed' % customer.customer_id)
150            del parent[customer.customer_id]
151        pass
152
153    def checkUpdateRequirements(self, obj, row, site):
154        """Checks requirements the object must fulfill when being updated.
155
156        This method is not used in case of deleting or adding objects.
157
158        Returns error messages as strings in case of requirement
159        problems.
160        """
161        transition = row.get('transition', IGNORE_MARKER)
162        if transition not in (IGNORE_MARKER, ''):
163            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
164            if transition not in allowed_transitions:
165                return 'Transition not allowed.'
166        return None
167
168    def updateEntry(self, obj, row, site, filename):
169        """Update obj to the values given in row.
170        """
171        items_changed = ''
172
173        # Remove customer_id from row if empty
174        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
175            row.pop('customer_id')
176
177        # Update password
178        # XXX: Take DELETION_MARKER into consideration
179        if 'password' in row:
180            passwd = row.get('password', IGNORE_MARKER)
181            if passwd not in ('', IGNORE_MARKER):
182                if passwd.startswith('{SSHA}'):
183                    # already encrypted password
184                    obj.password = passwd
185                else:
186                    # not yet encrypted password
187                    IUserAccount(obj).setPassword(passwd)
188                items_changed += ('%s=%s, ' % ('password', passwd))
189            row.pop('password')
190
191        # Update registration state
192        if 'state' in row:
193            state = row.get('state', IGNORE_MARKER)
194            if state not in (IGNORE_MARKER, ''):
195                value = row['state']
196                IWorkflowState(obj).setState(value)
197                msg = _("State '${a}' set", mapping={'a': value})
198                history = IObjectHistory(obj)
199                history.addMessage(msg)
200                items_changed += ('%s=%s, ' % ('state', state))
201            row.pop('state')
202
203        if 'transition' in row:
204            transition = row.get('transition', IGNORE_MARKER)
205            if transition not in (IGNORE_MARKER, ''):
206                value = row['transition']
207                IWorkflowInfo(obj).fireTransition(value)
208                items_changed += ('%s=%s, ' % ('transition', transition))
209            row.pop('transition')
210
211        # apply other values...
212        items_changed += super(CustomerProcessor, self).updateEntry(
213            obj, row, site, filename)
214
215        # Log actions...
216        parent = self.getParent(row, site)
217        if hasattr(obj, 'customer_id'):
218            # Update mode: the customer exists and we can get the customer_id.
219            # Create mode: the record contains the customer_id
220            parent.logger.info(
221                '%s - %s - %s - updated: %s'
222                % (self.name, filename, obj.customer_id, items_changed))
223        else:
224            # Create mode: the customer does not yet exist
225            # XXX: It seems that this never happens because customer_id
226            # is always set.
227            parent.logger.info(
228                '%s - %s - %s - imported: %s'
229                % (self.name, filename, obj.customer_id, items_changed))
230        return items_changed
231
232    def getMapping(self, path, headerfields, mode):
233        """Get a mapping from CSV file headerfields to actually used fieldnames.
234        """
235        result = dict()
236        reader = csv.reader(open(path, 'rb'))
237        raw_header = reader.next()
238        for num, field in enumerate(headerfields):
239            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
240                continue
241            if field == u'--IGNORE--':
242                # Skip ignored columns in failed and finished data files.
243                continue
244            result[raw_header[num]] = field
245        return result
246
247    def checkConversion(self, row, mode='create'):
248        """Validates all values in row.
249        """
250        iface = self.iface
251        if mode in ['update', 'remove']:
252            if self.getLocator(row) == 'reg_number':
253                iface = self.iface_byregnumber
254        converter = IObjectConverter(iface)
255        errs, inv_errs, conv_dict = converter.fromStringDict(
256            row, self.factory_name, mode=mode)
257        if 'transition' in row:
258            if row['transition'] not in IMPORTABLE_REGISTRATION_TRANSITIONS:
259                if row['transition'] not in (IGNORE_MARKER, ''):
260                    errs.append(('transition', 'not allowed'))
261        if 'state' in row:
262            if row['state'] not in IMPORTABLE_REGISTRATION_STATES:
263                if row['state'] not in (IGNORE_MARKER, ''):
264                    errs.append(('state', 'not allowed'))
265                else:
266                    # State is an attribute of Customer and must not
267                    # be changed if empty.
268                    conv_dict['state'] = IGNORE_MARKER
269        try:
270            # Correct cust_id counter. As the IConverter for customers
271            # creates customer objects that are not used afterwards, we
272            # have to fix the site-wide customer_id counter.
273            site = grok.getSite()
274            customers = site['customers']
275            customers._curr_cust_id -= 1
276        except (KeyError, TypeError, AttributeError):
277                pass
278        return errs, inv_errs, conv_dict
279
280
281class CustomerProcessorBase(BatchProcessor):
282    """A base for customer subitem processors.
283
284    Helps reducing redundancy.
285    """
286    grok.baseclass()
287
288    # additional available fields
289    # beside 'customer_id', 'reg_number' and 'matric_number'
290    additional_fields = []
291
292    #: header fields additionally required
293    additional_headers = []
294
295    @property
296    def available_fields(self):
297        fields = ['customer_id', 'reg_number'] + self.additional_fields
298        return sorted(list(set(fields + getFields(
299                self.iface).keys())))
300
301    def checkHeaders(self, headerfields, mode='ignore'):
302        if not 'reg_number' in headerfields and not 'customer_id' \
303            in headerfields:
304            raise FatalCSVError(
305                "Need at least columns customer_id " +
306                "or reg_number for import!")
307        for name in self.additional_headers:
308            if not name in headerfields:
309                raise FatalCSVError(
310                    "Need %s for import!" % name)
311
312        # Check for fields to be ignored...
313        not_ignored_fields = [x for x in headerfields
314                              if not x.startswith('--')]
315        if len(set(not_ignored_fields)) < len(not_ignored_fields):
316            raise FatalCSVError(
317                "Double headers: each column name may only appear once.")
318        return True
319
320    def _getCustomer(self, row, site):
321        NON_VALUES = ['', IGNORE_MARKER]
322        if not 'customers' in site.keys():
323            return None
324        if row.get('customer_id', '') not in NON_VALUES:
325            if row['customer_id'] in site['customers']:
326                customer = site['customers'][row['customer_id']]
327                return customer
328        elif row.get('reg_number', '') not in NON_VALUES:
329            reg_number = row['reg_number']
330            cat = queryUtility(ICatalog, name='customers_catalog')
331            results = list(
332                cat.searchResults(reg_number=(reg_number, reg_number)))
333            if results:
334                return results[0]
335        return None
336
337    def parentsExist(self, row, site):
338        return self.getParent(row, site) is not None
339
340    def entryExists(self, row, site):
341        return self.getEntry(row, site) is not None
342
343    def checkConversion(self, row, mode='ignore'):
344        """Validates all values in row.
345        """
346        converter = IObjectConverter(self.iface)
347        errs, inv_errs, conv_dict = converter.fromStringDict(
348            row, self.factory_name, mode=mode)
349        return errs, inv_errs, conv_dict
350
351    def getMapping(self, path, headerfields, mode):
352        """Get a mapping from CSV file headerfields to actually used fieldnames.
353        """
354        result = dict()
355        reader = csv.reader(open(path, 'rb'))
356        raw_header = reader.next()
357        for num, field in enumerate(headerfields):
358            if field not in ['customer_id', 'reg_number', 'document_id',
359                             'code', 'level', 'contract_id'
360                             ] and mode == 'remove':
361                continue
362            if field == u'--IGNORE--':
363                # Skip ignored columns in failed and finished data files.
364                continue
365            result[raw_header[num]] = field
366        return result
367
368class CustomerDocumentProcessor(CustomerProcessorBase):
369    """A batch processor for ICustomerDocument objects.
370    """
371    grok.implements(IBatchProcessor)
372    grok.provides(IBatchProcessor)
373    grok.context(Interface)
374    util_name = 'customerdocumentprocessor'
375    grok.name(util_name)
376
377    name = _('CustomerDocument Processor')
378    iface = ICustomerDocument
379    factory_name = 'waeup.CustomerSampleDocument'
380
381    location_fields = []
382    additional_fields = ['class_name']
383    additional_headers = ['class_name']
384
385    def checkHeaders(self, headerfields, mode='ignore'):
386        super(CustomerDocumentProcessor, self).checkHeaders(headerfields)
387        if mode in ('update', 'remove') and not 'document_id' in headerfields:
388            raise FatalCSVError(
389                "Need document_id for import in update and remove modes!")
390        return True
391
392    def getParent(self, row, site):
393        customer = self._getCustomer(row, site)
394        if customer is None:
395            return None
396        return customer['documents']
397
398    def getEntry(self, row, site):
399        document_id = row.get('document_id', None)
400        if document_id is None:
401            return None
402        cat = queryUtility(ICatalog, name='documents_catalog')
403        results = list(cat.searchResults(document_id=(document_id, document_id)))
404        if results:
405            return results[0]
406        return None
407
408    def updateEntry(self, obj, row, site, filename):
409        """Update obj to the values given in row.
410        """
411        # Remove document_id from row if empty
412        if 'document_id' in row and row['document_id'] in (None, IGNORE_MARKER):
413            row.pop('document_id')
414        items_changed = super(CustomerDocumentProcessor, self).updateEntry(
415            obj, row, site, filename)
416        customer = self.getParent(row, site).__parent__
417        customer.__parent__.logger.info(
418            '%s - %s - %s - %s - updated: %s'
419            % (self.name, filename, customer.customer_id, obj.document_id,
420               items_changed))
421        return
422
423    def addEntry(self, obj, row, site):
424        parent = self.getParent(row, site)
425        parent.addDocument(obj)
426        return
427
428    def delEntry(self, row, site):
429        document = self.getEntry(row, site)
430        parent = self.getParent(row, site)
431        if document is not None:
432            customer = self._getCustomer(row, site)
433            customer.__parent__.logger.info('%s - Document removed: %s'
434                % (customer.customer_id, document.document_id))
435            del parent[document.document_id]
436        return
437
438    def checkConversion(self, row, mode='ignore'):
439        """Validates all values in row.
440        """
441        errs, inv_errs, conv_dict = super(
442            CustomerDocumentProcessor, self).checkConversion(row, mode=mode)
443        # We need to check if the class_name corresponds with the
444        # processor chosen. This is to avoid accidentally wrong imports.
445        if mode != 'remove':
446            class_name = row.get('class_name', None)
447            if class_name != self.factory_name.strip('waeup.'):
448                errs.append(('class_name','wrong processor'))
449        return errs, inv_errs, conv_dict
450
451
452class ContractProcessor(CustomerProcessorBase):
453    """A batch processor for IContract objects.
454    """
455    grok.implements(IBatchProcessor)
456    grok.provides(IBatchProcessor)
457    grok.context(Interface)
458    util_name = 'contractprocessor'
459    grok.name(util_name)
460
461    name = _('Contract Processor')
462    iface = ISampleContract
463    factory_name = 'waeup.SampleContract'
464
465    location_fields = []
466    additional_fields = ['class_name']
467    additional_headers = ['class_name']
468
469    def checkHeaders(self, headerfields, mode='ignore'):
470        super(ContractProcessor, self).checkHeaders(headerfields)
471        if mode in ('update', 'remove') and not 'contract_id' in headerfields:
472            raise FatalCSVError(
473                "Need contract_id for import in update and remove modes!")
474        return True
475
476    def getParent(self, row, site):
477        customer = self._getCustomer(row, site)
478        if customer is None:
479            return None
480        return customer['contracts']
481
482    def getEntry(self, row, site):
483        contracts = self.getParent(row, site)
484        if contracts is None:
485            return None
486        contract_id = row.get('contract_id', None)
487        if contract_id is None:
488            return None
489        cat = queryUtility(ICatalog, name='contracts_catalog')
490        results = list(cat.searchResults(contract_id=(contract_id, contract_id)))
491        if results:
492            return results[0]
493        return None
494
495    def updateEntry(self, obj, row, site, filename):
496        """Update obj to the values given in row.
497        """
498        # Remove contract_id from row if empty
499        if 'contract_id' in row and row['contract_id'] in (None, IGNORE_MARKER):
500            row.pop('contract_id')
501        items_changed = super(ContractProcessor, self).updateEntry(
502            obj, row, site, filename)
503        customer = self.getParent(row, site).__parent__
504        customer.__parent__.logger.info(
505            '%s - %s - %s - %s - updated: %s'
506            % (self.name, filename, customer.customer_id, obj.contract_id,
507               items_changed))
508        return
509
510    def addEntry(self, obj, row, site):
511        parent = self.getParent(row, site)
512        parent.addContract(obj)
513        return
514
515    def delEntry(self, row, site):
516        contract = self.getEntry(row, site)
517        parent = self.getParent(row, site)
518        if contract is not None:
519            customer = self._getCustomer(row, site)
520            customer.__parent__.logger.info('%s - Contract removed: %s'
521                % (customer.customer_id, contract.contract_id))
522            del parent[contract.contract_id]
523        return
524
525    def checkConversion(self, row, mode='ignore'):
526        """Validates all values in row.
527        """
528        errs, inv_errs, conv_dict = super(
529            ContractProcessor, self).checkConversion(row, mode=mode)
530        # We need to check if the class_name corresponds with the
531        # processor chosen. This is to avoid accidentally wrong imports.
532        if mode != 'remove':
533            class_name = row.get('class_name', None)
534            if class_name != self.factory_name.strip('waeup.'):
535                errs.append(('class_name','wrong processor'))
536        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.