source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 16536

Last change on this file since 16536 was 12633, checked in by Henrik Bettermann, 10 years ago

Copy over valid_from and valid_to from products when creating contract objects.

Fix ContractProcessorBase?. Do really allow to import tc_dict, valid_from, title and valid_to.

  • Property svn:keywords set to Id
File size: 23.1 KB
Line 
1## $Id: batching.py 12633 2015-02-27 17:39:46Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.formlib.interfaces import ConversionError
34from zope.component import queryUtility, getUtility, createObject
35from zope.event import notify
36from zope.catalog.interfaces import ICatalog
37from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
38from waeup.ikoba.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
40    IObjectHistory, IGNORE_MARKER)
41from waeup.ikoba.interfaces import IIkobaUtils, DELETION_MARKER, IGNORE_MARKER
42from waeup.ikoba.interfaces import MessageFactory as _
43from waeup.ikoba.customers.utils import generate_contract_id
44from waeup.ikoba.customers.interfaces import (
45    ICustomer, ICustomerUpdateByRegNo,
46    ICustomerSampleDocument, ISampleContractProcess)
47from waeup.ikoba.customers.workflow import  (
48    IMPORTABLE_REGISTRATION_STATES, IMPORTABLE_REGISTRATION_TRANSITIONS)
49from waeup.ikoba.utils.batching import BatchProcessor
50
51DATE_FORMAT = '%Y-%m-%d'
52
53
54def date_to_field(input):
55    # In import files we can use the hash symbol at the end of a
56    # date string to avoid annoying automatic date transformation
57    # by Excel or Calc
58    input = input.strip('#')
59    try:
60        value = datetime.strptime(input, DATE_FORMAT)
61    except (ValueError, IndexError), v:
62        raise ConversionError("Invalid datetime data", v)
63    return value.date()
64
65
66class CustomerProcessor(BatchProcessor):
67    """A batch processor for ICustomer objects.
68    """
69    grok.implements(IBatchProcessor)
70    grok.provides(IBatchProcessor)
71    grok.context(Interface)
72    util_name = 'customerprocessor'
73    grok.name(util_name)
74
75    name = _('Customer Processor')
76    iface = ICustomer
77    iface_byregnumber = ICustomerUpdateByRegNo
78
79    location_fields = []
80    factory_name = 'waeup.Customer'
81
82    @property
83    def available_fields(self):
84        fields = getFields(self.iface)
85        return sorted(list(set(
86            ['customer_id', 'reg_number',
87            'password', 'state', 'transition'] + fields.keys())))
88
89    def checkHeaders(self, headerfields, mode='create'):
90        if 'state' in headerfields and 'transition' in headerfields:
91            raise FatalCSVError(
92                "State and transition can't be  imported at the same time!")
93        if not 'reg_number' in headerfields and not 'customer_id' \
94            in headerfields:
95            raise FatalCSVError(
96                "Need at least columns customer_id or reg_number " +
97                "for import!")
98        if mode == 'create':
99            for field in self.required_fields:
100                if not field in headerfields:
101                    raise FatalCSVError(
102                        "Need at least columns %s for import!" %
103                        ', '.join(["'%s'" % x for x in self.required_fields]))
104        # Check for fields to be ignored...
105        not_ignored_fields = [x for x in headerfields
106                              if not x.startswith('--')]
107        if len(set(not_ignored_fields)) < len(not_ignored_fields):
108            raise FatalCSVError(
109                "Double headers: each column name may only appear once.")
110        return True
111
112    def parentsExist(self, row, site):
113        return 'customers' in site.keys()
114
115    def getLocator(self, row):
116        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
117            return 'customer_id'
118        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
119            return 'reg_number'
120        else:
121            return None
122
123    # The entry never exists in create mode.
124    def entryExists(self, row, site):
125        return self.getEntry(row, site) is not None
126
127    def getParent(self, row, site):
128        return site['customers']
129
130    def getEntry(self, row, site):
131        if not 'customers' in site.keys():
132            return None
133        if self.getLocator(row) == 'customer_id':
134            if row['customer_id'] in site['customers']:
135                customer = site['customers'][row['customer_id']]
136                return customer
137        elif self.getLocator(row) == 'reg_number':
138            reg_number = row['reg_number']
139            cat = queryUtility(ICatalog, name='customers_catalog')
140            results = list(
141                cat.searchResults(reg_number=(reg_number, reg_number)))
142            if results:
143                return results[0]
144        return None
145
146    def addEntry(self, obj, row, site):
147        parent = self.getParent(row, site)
148        parent.addCustomer(obj)
149        # Reset _curr_cust_id if customer_id has been imported
150        if self.getLocator(row) == 'customer_id':
151            parent._curr_cust_id -= 1
152        # We have to log this if state is provided. If not,
153        # logging is done by the event handler handle_customer_added
154        if 'state' in row:
155            parent.logger.info('%s - Customer record created' % obj.customer_id)
156        history = IObjectHistory(obj)
157        history.addMessage(_('Customer record created'))
158        return
159
160    def delEntry(self, row, site):
161        customer = self.getEntry(row, site)
162        if customer is not None:
163            parent = self.getParent(row, site)
164            parent.logger.info('%s - Customer removed' % customer.customer_id)
165            del parent[customer.customer_id]
166        pass
167
168    def checkUpdateRequirements(self, obj, row, site):
169        """Checks requirements the object must fulfill when being updated.
170
171        This method is not used in case of deleting or adding objects.
172
173        Returns error messages as strings in case of requirement
174        problems.
175        """
176        transition = row.get('transition', IGNORE_MARKER)
177        if transition not in (IGNORE_MARKER, ''):
178            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
179            if transition not in allowed_transitions:
180                return 'Transition not allowed.'
181        return None
182
183    def updateEntry(self, obj, row, site, filename):
184        """Update obj to the values given in row.
185        """
186        items_changed = ''
187
188        # Remove customer_id from row if empty
189        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
190            row.pop('customer_id')
191
192        # Update password
193        # XXX: Take DELETION_MARKER into consideration
194        if 'password' in row:
195            passwd = row.get('password', IGNORE_MARKER)
196            if passwd not in ('', IGNORE_MARKER):
197                if passwd.startswith('{SSHA}'):
198                    # already encrypted password
199                    obj.password = passwd
200                else:
201                    # not yet encrypted password
202                    IUserAccount(obj).setPassword(passwd)
203                items_changed += ('%s=%s, ' % ('password', passwd))
204            row.pop('password')
205
206        # Update registration state
207        if 'state' in row:
208            state = row.get('state', IGNORE_MARKER)
209            if state not in (IGNORE_MARKER, ''):
210                value = row['state']
211                IWorkflowState(obj).setState(value)
212                msg = _("State '${a}' set", mapping={'a': value})
213                history = IObjectHistory(obj)
214                history.addMessage(msg)
215                items_changed += ('%s=%s, ' % ('state', state))
216            row.pop('state')
217
218        if 'transition' in row:
219            transition = row.get('transition', IGNORE_MARKER)
220            if transition not in (IGNORE_MARKER, ''):
221                value = row['transition']
222                IWorkflowInfo(obj).fireTransition(value)
223                items_changed += ('%s=%s, ' % ('transition', transition))
224            row.pop('transition')
225
226        # apply other values...
227        items_changed += super(CustomerProcessor, self).updateEntry(
228            obj, row, site, filename)
229
230        # Log actions...
231        parent = self.getParent(row, site)
232        if hasattr(obj, 'customer_id'):
233            # Update mode: the customer exists and we can get the customer_id.
234            # Create mode: the record contains the customer_id
235            parent.logger.info(
236                '%s - %s - %s - updated: %s'
237                % (self.name, filename, obj.customer_id, items_changed))
238        else:
239            # Create mode: the customer does not yet exist
240            # XXX: It seems that this never happens because customer_id
241            # is always set.
242            parent.logger.info(
243                '%s - %s - %s - imported: %s'
244                % (self.name, filename, obj.customer_id, items_changed))
245        return items_changed
246
247    def getMapping(self, path, headerfields, mode):
248        """Get a mapping from CSV file headerfields to actually used fieldnames.
249        """
250        result = dict()
251        reader = csv.reader(open(path, 'rb'))
252        raw_header = reader.next()
253        for num, field in enumerate(headerfields):
254            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
255                continue
256            if field == u'--IGNORE--':
257                # Skip ignored columns in failed and finished data files.
258                continue
259            result[raw_header[num]] = field
260        return result
261
262    def checkConversion(self, row, mode='create'):
263        """Validates all values in row.
264        """
265        iface = self.iface
266        if mode in ['update', 'remove']:
267            if self.getLocator(row) == 'reg_number':
268                iface = self.iface_byregnumber
269        converter = IObjectConverter(iface)
270        errs, inv_errs, conv_dict = converter.fromStringDict(
271            row, self.factory_name, mode=mode)
272        if 'transition' in row:
273            if row['transition'] not in IMPORTABLE_REGISTRATION_TRANSITIONS:
274                if row['transition'] not in (IGNORE_MARKER, ''):
275                    errs.append(('transition', 'not allowed'))
276        if 'state' in row:
277            if row['state'] not in IMPORTABLE_REGISTRATION_STATES:
278                if row['state'] not in (IGNORE_MARKER, ''):
279                    errs.append(('state', 'not allowed'))
280                else:
281                    # State is an attribute of Customer and must not
282                    # be changed if empty.
283                    conv_dict['state'] = IGNORE_MARKER
284        try:
285            # Correct cust_id counter. As the IConverter for customers
286            # creates customer objects that are not used afterwards, we
287            # have to fix the site-wide customer_id counter.
288            site = grok.getSite()
289            customers = site['customers']
290            customers._curr_cust_id -= 1
291        except (KeyError, TypeError, AttributeError):
292                pass
293        return errs, inv_errs, conv_dict
294
295
296class CustomerProcessorBase(BatchProcessor):
297    """A base for customer subitem processors.
298
299    Helps reducing redundancy.
300    """
301    grok.baseclass()
302
303    # additional available fields
304    # beside 'customer_id', 'reg_number' and 'matric_number'
305    additional_fields = []
306
307    #: header fields additionally required
308    additional_headers = []
309
310    @property
311    def available_fields(self):
312        fields = ['customer_id', 'reg_number'] + self.additional_fields
313        return sorted(list(set(fields + getFields(
314                self.iface).keys())))
315
316    def checkHeaders(self, headerfields, mode='ignore'):
317        if not 'reg_number' in headerfields and not 'customer_id' \
318            in headerfields:
319            raise FatalCSVError(
320                "Need at least columns customer_id " +
321                "or reg_number for import!")
322        for name in self.additional_headers:
323            if not name in headerfields:
324                raise FatalCSVError(
325                    "Need %s for import!" % name)
326
327        # Check for fields to be ignored...
328        not_ignored_fields = [x for x in headerfields
329                              if not x.startswith('--')]
330        if len(set(not_ignored_fields)) < len(not_ignored_fields):
331            raise FatalCSVError(
332                "Double headers: each column name may only appear once.")
333        return True
334
335    def _getCustomer(self, row, site):
336        NON_VALUES = ['', IGNORE_MARKER]
337        if not 'customers' in site.keys():
338            return None
339        if row.get('customer_id', '') not in NON_VALUES:
340            if row['customer_id'] in site['customers']:
341                customer = site['customers'][row['customer_id']]
342                return customer
343        elif row.get('reg_number', '') not in NON_VALUES:
344            reg_number = row['reg_number']
345            cat = queryUtility(ICatalog, name='customers_catalog')
346            results = list(
347                cat.searchResults(reg_number=(reg_number, reg_number)))
348            if results:
349                return results[0]
350        return None
351
352    def parentsExist(self, row, site):
353        return self.getParent(row, site) is not None
354
355    def entryExists(self, row, site):
356        return self.getEntry(row, site) is not None
357
358    def checkConversion(self, row, mode='ignore'):
359        """Validates all values in row.
360        """
361        converter = IObjectConverter(self.iface)
362        errs, inv_errs, conv_dict = converter.fromStringDict(
363            row, self.factory_name, mode=mode)
364        return errs, inv_errs, conv_dict
365
366    def getMapping(self, path, headerfields, mode):
367        """Get a mapping from CSV file headerfields to actually used fieldnames.
368        """
369        result = dict()
370        reader = csv.reader(open(path, 'rb'))
371        raw_header = reader.next()
372        for num, field in enumerate(headerfields):
373            if field not in ['customer_id', 'reg_number', 'document_id',
374                             'code', 'level', 'contract_id'
375                             ] and mode == 'remove':
376                continue
377            if field == u'--IGNORE--':
378                # Skip ignored columns in failed and finished data files.
379                continue
380            result[raw_header[num]] = field
381        return result
382
383class CustomerDocumentProcessorBase(CustomerProcessorBase):
384    """A batch processor for customer documents.
385
386    This is a baseclass.
387    """
388    grok.implements(IBatchProcessor)
389    grok.provides(IBatchProcessor)
390    grok.context(Interface)
391
392    grok.baseclass()
393
394    util_name = None
395    name = None
396    iface = None
397    factory_name = None
398
399    location_fields = []
400    additional_fields = ['class_name']
401    additional_headers = ['class_name']
402
403    def checkHeaders(self, headerfields, mode='ignore'):
404        super(CustomerDocumentProcessorBase, self).checkHeaders(headerfields)
405        if mode in ('update', 'remove') and not 'document_id' in headerfields:
406            raise FatalCSVError(
407                "Need document_id for import in update and remove modes!")
408        return True
409
410    def getParent(self, row, site):
411        customer = self._getCustomer(row, site)
412        if customer is None:
413            return None
414        return customer['documents']
415
416    def getEntry(self, row, site):
417        document_id = row.get('document_id', None)
418        if document_id is None:
419            return None
420        cat = queryUtility(ICatalog, name='documents_catalog')
421        results = list(cat.searchResults(document_id=(document_id, document_id)))
422        if results:
423            return results[0]
424        return None
425
426    def updateEntry(self, obj, row, site, filename):
427        """Update obj to the values given in row.
428        """
429        # Remove document_id from row if empty
430        if 'document_id' in row and row['document_id'] in (None, IGNORE_MARKER):
431            row.pop('document_id')
432        items_changed = super(CustomerDocumentProcessorBase, self).updateEntry(
433            obj, row, site, filename)
434        customer = self.getParent(row, site).__parent__
435        customer.__parent__.logger.info(
436            '%s - %s - %s - %s - updated: %s'
437            % (self.name, filename, customer.customer_id, obj.document_id,
438               items_changed))
439        return
440
441    def addEntry(self, obj, row, site):
442        parent = self.getParent(row, site)
443        parent.addDocument(obj)
444        return
445
446    def delEntry(self, row, site):
447        document = self.getEntry(row, site)
448        parent = self.getParent(row, site)
449        if document is not None:
450            customer = self._getCustomer(row, site)
451            customer.__parent__.logger.info('%s - Document removed: %s'
452                % (customer.customer_id, document.document_id))
453            del parent[document.document_id]
454        return
455
456    def checkConversion(self, row, mode='ignore'):
457        """Validates all values in row.
458        """
459        errs, inv_errs, conv_dict = super(
460            CustomerDocumentProcessorBase, self).checkConversion(row, mode=mode)
461        # We need to check if the class_name corresponds with the
462        # processor chosen. This is to avoid accidentally wrong imports.
463        if mode != 'remove':
464            class_name = row.get('class_name', None)
465            if class_name != self.factory_name.strip('waeup.'):
466                errs.append(('class_name','wrong processor'))
467        return errs, inv_errs, conv_dict
468
469
470class CustomerSampleDocumentProcessor(CustomerDocumentProcessorBase):
471    """A batch processor for ICustomerSampleDocument objects.
472    """
473    util_name = 'customersampledocumentprocessor'
474    grok.name(util_name)
475
476    name = _('Customer Sample Document Processor')
477    iface = ICustomerSampleDocument
478    factory_name = 'waeup.CustomerSampleDocument'
479
480
481class ContractProcessorBase(CustomerProcessorBase):
482    """A batch processor for contracts.
483
484    This is a baseclass.
485    """
486    grok.implements(IBatchProcessor)
487    grok.provides(IBatchProcessor)
488    grok.context(Interface)
489
490    grok.baseclass()
491
492    util_name = None
493    name = None
494    iface = None
495    factory_name = None
496
497    location_fields = []
498    additional_fields = ['class_name', 'title',
499                         'tc_dict', 'valid_from', 'valid_to']
500    additional_headers = ['class_name']
501
502    def checkHeaders(self, headerfields, mode='ignore'):
503        super(ContractProcessorBase, self).checkHeaders(headerfields)
504        if mode in ('update', 'remove') and not 'contract_id' in headerfields:
505            raise FatalCSVError(
506                "Need contract_id for import in update and remove modes!")
507        return True
508
509    def getParent(self, row, site):
510        customer = self._getCustomer(row, site)
511        if customer is None:
512            return None
513        return customer['contracts']
514
515    def getEntry(self, row, site):
516        contracts = self.getParent(row, site)
517        if contracts is None:
518            return None
519        contract_id = row.get('contract_id', None)
520        if contract_id is None:
521            return None
522        cat = queryUtility(ICatalog, name='contracts_catalog')
523        results = list(cat.searchResults(contract_id=(contract_id, contract_id)))
524        if results:
525            return results[0]
526        return None
527
528    def updateEntry(self, obj, row, site, filename):
529        """Update obj to the values given in row.
530        """
531        # Remove contract_id from row if empty
532        if 'contract_id' in row and row['contract_id'] in (None, IGNORE_MARKER):
533            row.pop('contract_id')
534        items_changed = super(ContractProcessorBase, self).updateEntry(
535            obj, row, site, filename)
536        customer = self.getParent(row, site).__parent__
537        customer.__parent__.logger.info(
538            '%s - %s - %s - %s - updated: %s'
539            % (self.name, filename, customer.customer_id, obj.contract_id,
540               items_changed))
541        return
542
543    def addEntry(self, obj, row, site):
544        parent = self.getParent(row, site)
545        parent.addContract(obj)
546        return
547
548    def delEntry(self, row, site):
549        contract = self.getEntry(row, site)
550        parent = self.getParent(row, site)
551        if contract is not None:
552            customer = self._getCustomer(row, site)
553            customer.__parent__.logger.info('%s - Contract removed: %s'
554                % (customer.customer_id, contract.contract_id))
555            del parent[contract.contract_id]
556        return
557
558    def checkConversion(self, row, mode='ignore'):
559        """Validates all values in row.
560        """
561        errs, inv_errs, conv_dict = super(
562            ContractProcessorBase, self).checkConversion(row, mode=mode)
563        # We need to check if the class_name corresponds with the
564        # processor chosen. This is to avoid accidentally wrong imports.
565        if mode != 'remove':
566            class_name = row.get('class_name', None)
567            if class_name != self.factory_name.strip('waeup.'):
568                errs.append(('class_name','wrong processor'))
569        tc_dict = row.get('tc_dict', None)
570        # We need to check tc_dict, valid_to and valid_from because they
571        # are not schema fields are thus not checked by the
572        # DefaultObjectConverter.
573        while True:
574            if tc_dict in (IGNORE_MARKER, None, {}, ''):
575                break
576            if tc_dict == DELETION_MARKER:
577                conv_dict['tc_dict'] = None
578                break
579            try:
580                tc_dict = eval(tc_dict)
581                conv_dict['tc_dict'] = tc_dict
582                break
583            except SyntaxError:
584                errs.append(('tc_dict','syntax error'))
585                break
586            if not isinstance(tc_dict, dict):
587                errs.append(('tc_dict','not a dictionary'))
588                break
589        valid_from = row.get('valid_from', None)
590        while True:
591            if valid_from in (IGNORE_MARKER, None, ''):
592                break
593            try:
594                date = date_to_field(valid_from)
595            except ConversionError:
596                errs.append(('valid_from','invalid date format'))
597            break
598        valid_to = row.get('valid_to', None)
599        while True:
600            if valid_to in (IGNORE_MARKER, None, ''):
601                break
602            try:
603                date = date_to_field(valid_to)
604            except ConversionError:
605                errs.append(('valid_to','invalid date format'))
606            break
607        return errs, inv_errs, conv_dict
608
609
610class SampleContractProcessor(ContractProcessorBase):
611    """A batch processor for ISampleContract objects.
612    """
613    util_name = 'samplecontractprocessor'
614    grok.name(util_name)
615
616    name = _('Customer Sample Contract Processor')
617    iface = ISampleContractProcess
618    factory_name = 'waeup.SampleContract'
Note: See TracBrowser for help on using the repository browser.