source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12305

Last change on this file since 12305 was 12283, checked in by Henrik Bettermann, 10 years ago

Make selection of processors and exporters customizable.

  • Property svn:keywords set to Id
File size: 21.1 KB
Line 
1## $Id: batching.py 12283 2014-12-21 11:26:41Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
39    IObjectHistory, IGNORE_MARKER)
40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.customers.utils import generate_contract_id
43from waeup.ikoba.customers.interfaces import (
44    ICustomer, ICustomerUpdateByRegNo,
45    ICustomerSampleDocument, ISampleContract)
46from waeup.ikoba.customers.workflow import  (
47    IMPORTABLE_REGISTRATION_STATES, IMPORTABLE_REGISTRATION_TRANSITIONS)
48from waeup.ikoba.utils.batching import BatchProcessor
49
50
51class CustomerProcessor(BatchProcessor):
52    """A batch processor for ICustomer objects.
53    """
54    grok.implements(IBatchProcessor)
55    grok.provides(IBatchProcessor)
56    grok.context(Interface)
57    util_name = 'customerprocessor'
58    grok.name(util_name)
59
60    name = _('Customer Processor')
61    iface = ICustomer
62    iface_byregnumber = ICustomerUpdateByRegNo
63
64    location_fields = []
65    factory_name = 'waeup.Customer'
66
67    @property
68    def available_fields(self):
69        fields = getFields(self.iface)
70        return sorted(list(set(
71            ['customer_id', 'reg_number',
72            'password', 'state', 'transition'] + fields.keys())))
73
74    def checkHeaders(self, headerfields, mode='create'):
75        if 'state' in headerfields and 'transition' in headerfields:
76            raise FatalCSVError(
77                "State and transition can't be  imported at the same time!")
78        if not 'reg_number' in headerfields and not 'customer_id' \
79            in headerfields:
80            raise FatalCSVError(
81                "Need at least columns customer_id or reg_number " +
82                "for import!")
83        if mode == 'create':
84            for field in self.required_fields:
85                if not field in headerfields:
86                    raise FatalCSVError(
87                        "Need at least columns %s for import!" %
88                        ', '.join(["'%s'" % x for x in self.required_fields]))
89        # Check for fields to be ignored...
90        not_ignored_fields = [x for x in headerfields
91                              if not x.startswith('--')]
92        if len(set(not_ignored_fields)) < len(not_ignored_fields):
93            raise FatalCSVError(
94                "Double headers: each column name may only appear once.")
95        return True
96
97    def parentsExist(self, row, site):
98        return 'customers' in site.keys()
99
100    def getLocator(self, row):
101        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
102            return 'customer_id'
103        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
104            return 'reg_number'
105        else:
106            return None
107
108    # The entry never exists in create mode.
109    def entryExists(self, row, site):
110        return self.getEntry(row, site) is not None
111
112    def getParent(self, row, site):
113        return site['customers']
114
115    def getEntry(self, row, site):
116        if not 'customers' in site.keys():
117            return None
118        if self.getLocator(row) == 'customer_id':
119            if row['customer_id'] in site['customers']:
120                customer = site['customers'][row['customer_id']]
121                return customer
122        elif self.getLocator(row) == 'reg_number':
123            reg_number = row['reg_number']
124            cat = queryUtility(ICatalog, name='customers_catalog')
125            results = list(
126                cat.searchResults(reg_number=(reg_number, reg_number)))
127            if results:
128                return results[0]
129        return None
130
131    def addEntry(self, obj, row, site):
132        parent = self.getParent(row, site)
133        parent.addCustomer(obj)
134        # Reset _curr_cust_id if customer_id has been imported
135        if self.getLocator(row) == 'customer_id':
136            parent._curr_cust_id -= 1
137        # We have to log this if state is provided. If not,
138        # logging is done by the event handler handle_customer_added
139        if 'state' in row:
140            parent.logger.info('%s - Customer record created' % obj.customer_id)
141        history = IObjectHistory(obj)
142        history.addMessage(_('Customer record created'))
143        return
144
145    def delEntry(self, row, site):
146        customer = self.getEntry(row, site)
147        if customer is not None:
148            parent = self.getParent(row, site)
149            parent.logger.info('%s - Customer removed' % customer.customer_id)
150            del parent[customer.customer_id]
151        pass
152
153    def checkUpdateRequirements(self, obj, row, site):
154        """Checks requirements the object must fulfill when being updated.
155
156        This method is not used in case of deleting or adding objects.
157
158        Returns error messages as strings in case of requirement
159        problems.
160        """
161        transition = row.get('transition', IGNORE_MARKER)
162        if transition not in (IGNORE_MARKER, ''):
163            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
164            if transition not in allowed_transitions:
165                return 'Transition not allowed.'
166        return None
167
168    def updateEntry(self, obj, row, site, filename):
169        """Update obj to the values given in row.
170        """
171        items_changed = ''
172
173        # Remove customer_id from row if empty
174        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
175            row.pop('customer_id')
176
177        # Update password
178        # XXX: Take DELETION_MARKER into consideration
179        if 'password' in row:
180            passwd = row.get('password', IGNORE_MARKER)
181            if passwd not in ('', IGNORE_MARKER):
182                if passwd.startswith('{SSHA}'):
183                    # already encrypted password
184                    obj.password = passwd
185                else:
186                    # not yet encrypted password
187                    IUserAccount(obj).setPassword(passwd)
188                items_changed += ('%s=%s, ' % ('password', passwd))
189            row.pop('password')
190
191        # Update registration state
192        if 'state' in row:
193            state = row.get('state', IGNORE_MARKER)
194            if state not in (IGNORE_MARKER, ''):
195                value = row['state']
196                IWorkflowState(obj).setState(value)
197                msg = _("State '${a}' set", mapping={'a': value})
198                history = IObjectHistory(obj)
199                history.addMessage(msg)
200                items_changed += ('%s=%s, ' % ('state', state))
201            row.pop('state')
202
203        if 'transition' in row:
204            transition = row.get('transition', IGNORE_MARKER)
205            if transition not in (IGNORE_MARKER, ''):
206                value = row['transition']
207                IWorkflowInfo(obj).fireTransition(value)
208                items_changed += ('%s=%s, ' % ('transition', transition))
209            row.pop('transition')
210
211        # apply other values...
212        items_changed += super(CustomerProcessor, self).updateEntry(
213            obj, row, site, filename)
214
215        # Log actions...
216        parent = self.getParent(row, site)
217        if hasattr(obj, 'customer_id'):
218            # Update mode: the customer exists and we can get the customer_id.
219            # Create mode: the record contains the customer_id
220            parent.logger.info(
221                '%s - %s - %s - updated: %s'
222                % (self.name, filename, obj.customer_id, items_changed))
223        else:
224            # Create mode: the customer does not yet exist
225            # XXX: It seems that this never happens because customer_id
226            # is always set.
227            parent.logger.info(
228                '%s - %s - %s - imported: %s'
229                % (self.name, filename, obj.customer_id, items_changed))
230        return items_changed
231
232    def getMapping(self, path, headerfields, mode):
233        """Get a mapping from CSV file headerfields to actually used fieldnames.
234        """
235        result = dict()
236        reader = csv.reader(open(path, 'rb'))
237        raw_header = reader.next()
238        for num, field in enumerate(headerfields):
239            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
240                continue
241            if field == u'--IGNORE--':
242                # Skip ignored columns in failed and finished data files.
243                continue
244            result[raw_header[num]] = field
245        return result
246
247    def checkConversion(self, row, mode='create'):
248        """Validates all values in row.
249        """
250        iface = self.iface
251        if mode in ['update', 'remove']:
252            if self.getLocator(row) == 'reg_number':
253                iface = self.iface_byregnumber
254        converter = IObjectConverter(iface)
255        errs, inv_errs, conv_dict = converter.fromStringDict(
256            row, self.factory_name, mode=mode)
257        if 'transition' in row:
258            if row['transition'] not in IMPORTABLE_REGISTRATION_TRANSITIONS:
259                if row['transition'] not in (IGNORE_MARKER, ''):
260                    errs.append(('transition', 'not allowed'))
261        if 'state' in row:
262            if row['state'] not in IMPORTABLE_REGISTRATION_STATES:
263                if row['state'] not in (IGNORE_MARKER, ''):
264                    errs.append(('state', 'not allowed'))
265                else:
266                    # State is an attribute of Customer and must not
267                    # be changed if empty.
268                    conv_dict['state'] = IGNORE_MARKER
269        try:
270            # Correct cust_id counter. As the IConverter for customers
271            # creates customer objects that are not used afterwards, we
272            # have to fix the site-wide customer_id counter.
273            site = grok.getSite()
274            customers = site['customers']
275            customers._curr_cust_id -= 1
276        except (KeyError, TypeError, AttributeError):
277                pass
278        return errs, inv_errs, conv_dict
279
280
281class CustomerProcessorBase(BatchProcessor):
282    """A base for customer subitem processors.
283
284    Helps reducing redundancy.
285    """
286    grok.baseclass()
287
288    # additional available fields
289    # beside 'customer_id', 'reg_number' and 'matric_number'
290    additional_fields = []
291
292    #: header fields additionally required
293    additional_headers = []
294
295    @property
296    def available_fields(self):
297        fields = ['customer_id', 'reg_number'] + self.additional_fields
298        return sorted(list(set(fields + getFields(
299                self.iface).keys())))
300
301    def checkHeaders(self, headerfields, mode='ignore'):
302        if not 'reg_number' in headerfields and not 'customer_id' \
303            in headerfields:
304            raise FatalCSVError(
305                "Need at least columns customer_id " +
306                "or reg_number for import!")
307        for name in self.additional_headers:
308            if not name in headerfields:
309                raise FatalCSVError(
310                    "Need %s for import!" % name)
311
312        # Check for fields to be ignored...
313        not_ignored_fields = [x for x in headerfields
314                              if not x.startswith('--')]
315        if len(set(not_ignored_fields)) < len(not_ignored_fields):
316            raise FatalCSVError(
317                "Double headers: each column name may only appear once.")
318        return True
319
320    def _getCustomer(self, row, site):
321        NON_VALUES = ['', IGNORE_MARKER]
322        if not 'customers' in site.keys():
323            return None
324        if row.get('customer_id', '') not in NON_VALUES:
325            if row['customer_id'] in site['customers']:
326                customer = site['customers'][row['customer_id']]
327                return customer
328        elif row.get('reg_number', '') not in NON_VALUES:
329            reg_number = row['reg_number']
330            cat = queryUtility(ICatalog, name='customers_catalog')
331            results = list(
332                cat.searchResults(reg_number=(reg_number, reg_number)))
333            if results:
334                return results[0]
335        return None
336
337    def parentsExist(self, row, site):
338        return self.getParent(row, site) is not None
339
340    def entryExists(self, row, site):
341        return self.getEntry(row, site) is not None
342
343    def checkConversion(self, row, mode='ignore'):
344        """Validates all values in row.
345        """
346        converter = IObjectConverter(self.iface)
347        errs, inv_errs, conv_dict = converter.fromStringDict(
348            row, self.factory_name, mode=mode)
349        return errs, inv_errs, conv_dict
350
351    def getMapping(self, path, headerfields, mode):
352        """Get a mapping from CSV file headerfields to actually used fieldnames.
353        """
354        result = dict()
355        reader = csv.reader(open(path, 'rb'))
356        raw_header = reader.next()
357        for num, field in enumerate(headerfields):
358            if field not in ['customer_id', 'reg_number', 'document_id',
359                             'code', 'level', 'contract_id'
360                             ] and mode == 'remove':
361                continue
362            if field == u'--IGNORE--':
363                # Skip ignored columns in failed and finished data files.
364                continue
365            result[raw_header[num]] = field
366        return result
367
368class CustomerDocumentProcessorBase(CustomerProcessorBase):
369    """A batch processor for customer documents.
370
371    This is a baseclass.
372    """
373    grok.implements(IBatchProcessor)
374    grok.provides(IBatchProcessor)
375    grok.context(Interface)
376
377    grok.baseclass()
378
379    util_name = None
380    name = None
381    iface = None
382    factory_name = None
383
384    location_fields = []
385    additional_fields = ['class_name']
386    additional_headers = ['class_name']
387
388    def checkHeaders(self, headerfields, mode='ignore'):
389        super(CustomerDocumentProcessorBase, self).checkHeaders(headerfields)
390        if mode in ('update', 'remove') and not 'document_id' in headerfields:
391            raise FatalCSVError(
392                "Need document_id for import in update and remove modes!")
393        return True
394
395    def getParent(self, row, site):
396        customer = self._getCustomer(row, site)
397        if customer is None:
398            return None
399        return customer['documents']
400
401    def getEntry(self, row, site):
402        document_id = row.get('document_id', None)
403        if document_id is None:
404            return None
405        cat = queryUtility(ICatalog, name='documents_catalog')
406        results = list(cat.searchResults(document_id=(document_id, document_id)))
407        if results:
408            return results[0]
409        return None
410
411    def updateEntry(self, obj, row, site, filename):
412        """Update obj to the values given in row.
413        """
414        # Remove document_id from row if empty
415        if 'document_id' in row and row['document_id'] in (None, IGNORE_MARKER):
416            row.pop('document_id')
417        items_changed = super(CustomerDocumentProcessorBase, self).updateEntry(
418            obj, row, site, filename)
419        customer = self.getParent(row, site).__parent__
420        customer.__parent__.logger.info(
421            '%s - %s - %s - %s - updated: %s'
422            % (self.name, filename, customer.customer_id, obj.document_id,
423               items_changed))
424        return
425
426    def addEntry(self, obj, row, site):
427        parent = self.getParent(row, site)
428        parent.addDocument(obj)
429        return
430
431    def delEntry(self, row, site):
432        document = self.getEntry(row, site)
433        parent = self.getParent(row, site)
434        if document is not None:
435            customer = self._getCustomer(row, site)
436            customer.__parent__.logger.info('%s - Document removed: %s'
437                % (customer.customer_id, document.document_id))
438            del parent[document.document_id]
439        return
440
441    def checkConversion(self, row, mode='ignore'):
442        """Validates all values in row.
443        """
444        errs, inv_errs, conv_dict = super(
445            CustomerDocumentProcessorBase, self).checkConversion(row, mode=mode)
446        # We need to check if the class_name corresponds with the
447        # processor chosen. This is to avoid accidentally wrong imports.
448        if mode != 'remove':
449            class_name = row.get('class_name', None)
450            if class_name != self.factory_name.strip('waeup.'):
451                errs.append(('class_name','wrong processor'))
452        return errs, inv_errs, conv_dict
453
454
455class CustomerSampleDocumentProcessor(CustomerDocumentProcessorBase):
456    """A batch processor for ICustomerSampleDocument objects.
457    """
458    util_name = 'customersampledocumentprocessor'
459    grok.name(util_name)
460
461    name = _('Customer Sample Document Processor')
462    iface = ICustomerSampleDocument
463    factory_name = 'waeup.CustomerSampleDocument'
464
465
466class ContractProcessorBase(CustomerProcessorBase):
467    """A batch processor for contracts.
468
469    This is a baseclass.
470    """
471    grok.implements(IBatchProcessor)
472    grok.provides(IBatchProcessor)
473    grok.context(Interface)
474
475    grok.baseclass()
476
477    util_name = None
478    name = None
479    iface = None
480    factory_name = None
481
482    location_fields = []
483    additional_fields = ['class_name']
484    additional_headers = ['class_name']
485
486    def checkHeaders(self, headerfields, mode='ignore'):
487        super(ContractProcessorBase, self).checkHeaders(headerfields)
488        if mode in ('update', 'remove') and not 'contract_id' in headerfields:
489            raise FatalCSVError(
490                "Need contract_id for import in update and remove modes!")
491        return True
492
493    def getParent(self, row, site):
494        customer = self._getCustomer(row, site)
495        if customer is None:
496            return None
497        return customer['contracts']
498
499    def getEntry(self, row, site):
500        contracts = self.getParent(row, site)
501        if contracts is None:
502            return None
503        contract_id = row.get('contract_id', None)
504        if contract_id is None:
505            return None
506        cat = queryUtility(ICatalog, name='contracts_catalog')
507        results = list(cat.searchResults(contract_id=(contract_id, contract_id)))
508        if results:
509            return results[0]
510        return None
511
512    def updateEntry(self, obj, row, site, filename):
513        """Update obj to the values given in row.
514        """
515        # Remove contract_id from row if empty
516        if 'contract_id' in row and row['contract_id'] in (None, IGNORE_MARKER):
517            row.pop('contract_id')
518        items_changed = super(ContractProcessorBase, self).updateEntry(
519            obj, row, site, filename)
520        customer = self.getParent(row, site).__parent__
521        customer.__parent__.logger.info(
522            '%s - %s - %s - %s - updated: %s'
523            % (self.name, filename, customer.customer_id, obj.contract_id,
524               items_changed))
525        return
526
527    def addEntry(self, obj, row, site):
528        parent = self.getParent(row, site)
529        parent.addContract(obj)
530        return
531
532    def delEntry(self, row, site):
533        contract = self.getEntry(row, site)
534        parent = self.getParent(row, site)
535        if contract is not None:
536            customer = self._getCustomer(row, site)
537            customer.__parent__.logger.info('%s - Contract removed: %s'
538                % (customer.customer_id, contract.contract_id))
539            del parent[contract.contract_id]
540        return
541
542    def checkConversion(self, row, mode='ignore'):
543        """Validates all values in row.
544        """
545        errs, inv_errs, conv_dict = super(
546            ContractProcessorBase, self).checkConversion(row, mode=mode)
547        # We need to check if the class_name corresponds with the
548        # processor chosen. This is to avoid accidentally wrong imports.
549        if mode != 'remove':
550            class_name = row.get('class_name', None)
551            if class_name != self.factory_name.strip('waeup.'):
552                errs.append(('class_name','wrong processor'))
553        return errs, inv_errs, conv_dict
554
555
556class SampleContractProcessor(ContractProcessorBase):
557    """A batch processor for ISampleContract objects.
558    """
559    util_name = 'samplecontractprocessor'
560    grok.name(util_name)
561
562    name = _('Customer Sample Contract Processor')
563    iface = ISampleContract
564    factory_name = 'waeup.SampleContract'
Note: See TracBrowser for help on using the repository browser.