source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12253

Last change on this file since 12253 was 12250, checked in by Henrik Bettermann, 10 years ago

Some repairs of document and contract batch processors (work in progress!).

  • Property svn:keywords set to Id
File size: 22.7 KB
Line 
1## $Id: batching.py 12250 2014-12-16 09:02:13Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
39    IObjectHistory, IGNORE_MARKER)
40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.documents.utils import generate_document_id
43from waeup.ikoba.customers.utils import generate_contract_id
44from waeup.ikoba.customers.interfaces import (
45    ICustomer, ICustomerUpdateByRegNo,
46    ICustomerDocument, IContract, ISampleContract)
47from waeup.ikoba.customers.workflow import  (
48    IMPORTABLE_REGISTRATION_STATES, IMPORTABLE_REGISTRATION_TRANSITIONS)
49from waeup.ikoba.utils.batching import BatchProcessor
50
51
52class CustomerProcessor(BatchProcessor):
53    """A batch processor for ICustomer objects.
54    """
55    grok.implements(IBatchProcessor)
56    grok.provides(IBatchProcessor)
57    grok.context(Interface)
58    util_name = 'customerprocessor'
59    grok.name(util_name)
60
61    name = _('Customer Processor')
62    iface = ICustomer
63    iface_byregnumber = ICustomerUpdateByRegNo
64
65    location_fields = []
66    factory_name = 'waeup.Customer'
67
68    @property
69    def available_fields(self):
70        fields = getFields(self.iface)
71        return sorted(list(set(
72            ['customer_id', 'reg_number',
73            'password', 'state', 'transition'] + fields.keys())))
74
75    def checkHeaders(self, headerfields, mode='create'):
76        if 'state' in headerfields and 'transition' in headerfields:
77            raise FatalCSVError(
78                "State and transition can't be  imported at the same time!")
79        if not 'reg_number' in headerfields and not 'customer_id' \
80            in headerfields:
81            raise FatalCSVError(
82                "Need at least columns customer_id or reg_number " +
83                "for import!")
84        if mode == 'create':
85            for field in self.required_fields:
86                if not field in headerfields:
87                    raise FatalCSVError(
88                        "Need at least columns %s for import!" %
89                        ', '.join(["'%s'" % x for x in self.required_fields]))
90        # Check for fields to be ignored...
91        not_ignored_fields = [x for x in headerfields
92                              if not x.startswith('--')]
93        if len(set(not_ignored_fields)) < len(not_ignored_fields):
94            raise FatalCSVError(
95                "Double headers: each column name may only appear once.")
96        return True
97
98    def parentsExist(self, row, site):
99        return 'customers' in site.keys()
100
101    def getLocator(self, row):
102        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
103            return 'customer_id'
104        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
105            return 'reg_number'
106        else:
107            return None
108
109    # The entry never exists in create mode.
110    def entryExists(self, row, site):
111        return self.getEntry(row, site) is not None
112
113    def getParent(self, row, site):
114        return site['customers']
115
116    def getEntry(self, row, site):
117        if not 'customers' in site.keys():
118            return None
119        if self.getLocator(row) == 'customer_id':
120            if row['customer_id'] in site['customers']:
121                customer = site['customers'][row['customer_id']]
122                return customer
123        elif self.getLocator(row) == 'reg_number':
124            reg_number = row['reg_number']
125            cat = queryUtility(ICatalog, name='customers_catalog')
126            results = list(
127                cat.searchResults(reg_number=(reg_number, reg_number)))
128            if results:
129                return results[0]
130        return None
131
132    def addEntry(self, obj, row, site):
133        parent = self.getParent(row, site)
134        parent.addCustomer(obj)
135        # Reset _curr_cust_id if customer_id has been imported
136        if self.getLocator(row) == 'customer_id':
137            parent._curr_cust_id -= 1
138        # We have to log this if state is provided. If not,
139        # logging is done by the event handler handle_customer_added
140        if 'state' in row:
141            parent.logger.info('%s - Customer record created' % obj.customer_id)
142        history = IObjectHistory(obj)
143        history.addMessage(_('Customer record created'))
144        return
145
146    def delEntry(self, row, site):
147        customer = self.getEntry(row, site)
148        if customer is not None:
149            parent = self.getParent(row, site)
150            parent.logger.info('%s - Customer removed' % customer.customer_id)
151            del parent[customer.customer_id]
152        pass
153
154    def checkUpdateRequirements(self, obj, row, site):
155        """Checks requirements the object must fulfill when being updated.
156
157        This method is not used in case of deleting or adding objects.
158
159        Returns error messages as strings in case of requirement
160        problems.
161        """
162        transition = row.get('transition', IGNORE_MARKER)
163        if transition not in (IGNORE_MARKER, ''):
164            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
165            if transition not in allowed_transitions:
166                return 'Transition not allowed.'
167        return None
168
169    def updateEntry(self, obj, row, site, filename):
170        """Update obj to the values given in row.
171        """
172        items_changed = ''
173
174        # Remove customer_id from row if empty
175        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
176            row.pop('customer_id')
177
178        # Update password
179        # XXX: Take DELETION_MARKER into consideration
180        if 'password' in row:
181            passwd = row.get('password', IGNORE_MARKER)
182            if passwd not in ('', IGNORE_MARKER):
183                if passwd.startswith('{SSHA}'):
184                    # already encrypted password
185                    obj.password = passwd
186                else:
187                    # not yet encrypted password
188                    IUserAccount(obj).setPassword(passwd)
189                items_changed += ('%s=%s, ' % ('password', passwd))
190            row.pop('password')
191
192        # Update registration state
193        if 'state' in row:
194            state = row.get('state', IGNORE_MARKER)
195            if state not in (IGNORE_MARKER, ''):
196                value = row['state']
197                IWorkflowState(obj).setState(value)
198                msg = _("State '${a}' set", mapping={'a': value})
199                history = IObjectHistory(obj)
200                history.addMessage(msg)
201                items_changed += ('%s=%s, ' % ('state', state))
202            row.pop('state')
203
204        if 'transition' in row:
205            transition = row.get('transition', IGNORE_MARKER)
206            if transition not in (IGNORE_MARKER, ''):
207                value = row['transition']
208                IWorkflowInfo(obj).fireTransition(value)
209                items_changed += ('%s=%s, ' % ('transition', transition))
210            row.pop('transition')
211
212        # apply other values...
213        items_changed += super(CustomerProcessor, self).updateEntry(
214            obj, row, site, filename)
215
216        # Log actions...
217        parent = self.getParent(row, site)
218        if hasattr(obj, 'customer_id'):
219            # Update mode: the customer exists and we can get the customer_id.
220            # Create mode: the record contains the customer_id
221            parent.logger.info(
222                '%s - %s - %s - updated: %s'
223                % (self.name, filename, obj.customer_id, items_changed))
224        else:
225            # Create mode: the customer does not yet exist
226            # XXX: It seems that this never happens because customer_id
227            # is always set.
228            parent.logger.info(
229                '%s - %s - %s - imported: %s'
230                % (self.name, filename, obj.customer_id, items_changed))
231        return items_changed
232
233    def getMapping(self, path, headerfields, mode):
234        """Get a mapping from CSV file headerfields to actually used fieldnames.
235        """
236        result = dict()
237        reader = csv.reader(open(path, 'rb'))
238        raw_header = reader.next()
239        for num, field in enumerate(headerfields):
240            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
241                continue
242            if field == u'--IGNORE--':
243                # Skip ignored columns in failed and finished data files.
244                continue
245            result[raw_header[num]] = field
246        return result
247
248    def checkConversion(self, row, mode='create'):
249        """Validates all values in row.
250        """
251        iface = self.iface
252        if mode in ['update', 'remove']:
253            if self.getLocator(row) == 'reg_number':
254                iface = self.iface_byregnumber
255        converter = IObjectConverter(iface)
256        errs, inv_errs, conv_dict = converter.fromStringDict(
257            row, self.factory_name, mode=mode)
258        if 'transition' in row:
259            if row['transition'] not in IMPORTABLE_REGISTRATION_TRANSITIONS:
260                if row['transition'] not in (IGNORE_MARKER, ''):
261                    errs.append(('transition', 'not allowed'))
262        if 'state' in row:
263            if row['state'] not in IMPORTABLE_REGISTRATION_STATES:
264                if row['state'] not in (IGNORE_MARKER, ''):
265                    errs.append(('state', 'not allowed'))
266                else:
267                    # State is an attribute of Customer and must not
268                    # be changed if empty.
269                    conv_dict['state'] = IGNORE_MARKER
270        try:
271            # Correct cust_id counter. As the IConverter for customers
272            # creates customer objects that are not used afterwards, we
273            # have to fix the site-wide customer_id counter.
274            site = grok.getSite()
275            customers = site['customers']
276            customers._curr_cust_id -= 1
277        except (KeyError, TypeError, AttributeError):
278                pass
279        return errs, inv_errs, conv_dict
280
281
282class CustomerProcessorBase(BatchProcessor):
283    """A base for customer subitem processors.
284
285    Helps reducing redundancy.
286    """
287    grok.baseclass()
288
289    # additional available fields
290    # beside 'customer_id', 'reg_number' and 'matric_number'
291    additional_fields = []
292
293    #: header fields additionally required
294    additional_headers = []
295
296    @property
297    def available_fields(self):
298        fields = ['customer_id', 'reg_number'] + self.additional_fields
299        return sorted(list(set(fields + getFields(
300                self.iface).keys())))
301
302    def checkHeaders(self, headerfields, mode='ignore'):
303        if not 'reg_number' in headerfields and not 'customer_id' \
304            in headerfields:
305            raise FatalCSVError(
306                "Need at least columns customer_id " +
307                "or reg_number for import!")
308        for name in self.additional_headers:
309            if not name in headerfields:
310                raise FatalCSVError(
311                    "Need %s for import!" % name)
312
313        # Check for fields to be ignored...
314        not_ignored_fields = [x for x in headerfields
315                              if not x.startswith('--')]
316        if len(set(not_ignored_fields)) < len(not_ignored_fields):
317            raise FatalCSVError(
318                "Double headers: each column name may only appear once.")
319        return True
320
321    def _getCustomer(self, row, site):
322        NON_VALUES = ['', IGNORE_MARKER]
323        if not 'customers' in site.keys():
324            return None
325        if row.get('customer_id', '') not in NON_VALUES:
326            if row['customer_id'] in site['customers']:
327                customer = site['customers'][row['customer_id']]
328                return customer
329        elif row.get('reg_number', '') not in NON_VALUES:
330            reg_number = row['reg_number']
331            cat = queryUtility(ICatalog, name='customers_catalog')
332            results = list(
333                cat.searchResults(reg_number=(reg_number, reg_number)))
334            if results:
335                return results[0]
336        return None
337
338    def parentsExist(self, row, site):
339        return self.getParent(row, site) is not None
340
341    def entryExists(self, row, site):
342        return self.getEntry(row, site) is not None
343
344    def checkConversion(self, row, mode='ignore'):
345        """Validates all values in row.
346        """
347        converter = IObjectConverter(self.iface)
348        errs, inv_errs, conv_dict = converter.fromStringDict(
349            row, self.factory_name, mode=mode)
350        return errs, inv_errs, conv_dict
351
352    def getMapping(self, path, headerfields, mode):
353        """Get a mapping from CSV file headerfields to actually used fieldnames.
354        """
355        result = dict()
356        reader = csv.reader(open(path, 'rb'))
357        raw_header = reader.next()
358        for num, field in enumerate(headerfields):
359            if field not in ['customer_id', 'reg_number', 'document_id',
360                             'code', 'level', 'contract_id'
361                             ] and mode == 'remove':
362                continue
363            if field == u'--IGNORE--':
364                # Skip ignored columns in failed and finished data files.
365                continue
366            result[raw_header[num]] = field
367        return result
368
369class CustomerDocumentProcessor(CustomerProcessorBase):
370    """A batch processor for ICustomerDocument objects.
371    """
372    grok.implements(IBatchProcessor)
373    grok.provides(IBatchProcessor)
374    grok.context(Interface)
375    util_name = 'customerdocumentprocessor'
376    grok.name(util_name)
377
378    name = _('CustomerDocument Processor')
379    iface = ICustomerDocument
380    factory_name = 'waeup.CustomerSampleDocument'
381
382    location_fields = []
383    additional_fields = ['class_name']
384    additional_headers = ['class_name']
385
386    def checkHeaders(self, headerfields, mode='ignore'):
387        super(CustomerDocumentProcessor, self).checkHeaders(headerfields)
388        if mode in ('update', 'remove') and not 'document_id' in headerfields:
389            raise FatalCSVError(
390                "Need document_id for import in update and remove modes!")
391        return True
392
393    def getParent(self, row, site):
394        customer = self._getCustomer(row, site)
395        if customer is None:
396            return None
397        return customer['documents']
398
399    def getEntry(self, row, site):
400        documents = self.getParent(row, site)
401        if documents is None:
402            return None
403        document_id = row.get('document_id', None)
404        if document_id is None:
405            return None
406        entry = documents.get(document_id)
407        return entry
408
409    def updateEntry(self, obj, row, site, filename):
410        """Update obj to the values given in row.
411        """
412        items_changed = super(CustomerDocumentProcessor, self).updateEntry(
413            obj, row, site, filename)
414        customer = self.getParent(row, site).__parent__
415        customer.__parent__.logger.info(
416            '%s - %s - %s - updated: %s'
417            % (self.name, filename, customer.customer_id, items_changed))
418        return
419
420    def addEntry(self, obj, row, site):
421        parent = self.getParent(row, site)
422        document_id = row['document_id'].strip('#')
423        parent[document_id] = obj
424        # Reset _curr_doc_id if document_id has been imported
425        site = grok.getSite()
426        if row.get('document_id', None) not in (None, IGNORE_MARKER):
427            site._curr_doc_id -= 1
428        return
429
430    def delEntry(self, row, site):
431        document = self.getEntry(row, site)
432        parent = self.getParent(row, site)
433        if document is not None:
434            customer = self._getCustomer(row, site)
435            customer.__parent__.logger.info('%s - Document removed: %s'
436                % (customer.customer_id, document.document_id))
437            del parent[document.document_id]
438        return
439
440    def checkConversion(self, row, mode='ignore'):
441        """Validates all values in row.
442        """
443        errs, inv_errs, conv_dict = super(
444            CustomerDocumentProcessor, self).checkConversion(row, mode=mode)
445        # We need to check if the class_name corresponds with the
446        # processor chosen. This is to avoid accidentally wrong imports.
447        if mode != 'remove':
448            class_name = row.get('class_name', None)
449            if class_name != self.factory_name.strip('waeup.'):
450                errs.append(('class_name','wrong processor'))
451        try:
452            # Correct doc_id counter. As the IConverter for documents
453            # creates document objects that are not used afterwards, we
454            # have to fix the site-wide doc_id counter.
455            site = grok.getSite()
456            site._curr_doc_id -= 1
457        except (KeyError, TypeError, AttributeError):
458                pass
459        # We have to check document_id.
460        document_id = row.get('document_id', None)
461        if mode == 'create':
462            if not document_id:
463                document_id = generate_document_id()
464                conv_dict['document_id'] = document_id
465                return errs, inv_errs, conv_dict
466            cat = queryUtility(ICatalog, name='documents_catalog')
467            results = list(
468                cat.searchResults(document_id=(document_id, document_id)))
469            if results:
470                # document_id must not exist.
471                errs.append(('document_id','id exists'))
472        else:
473            if not document_id.startswith('d'):
474                errs.append(('document_id','invalid format'))
475        return errs, inv_errs, conv_dict
476
477
478class ContractProcessor(CustomerProcessorBase):
479    """A batch processor for IContract objects.
480    """
481    grok.implements(IBatchProcessor)
482    grok.provides(IBatchProcessor)
483    grok.context(Interface)
484    util_name = 'contractprocessor'
485    grok.name(util_name)
486
487    name = _('Contract Processor')
488    iface = ISampleContract
489    factory_name = 'waeup.SampleContract'
490
491    location_fields = []
492    additional_fields = ['class_name']
493    additional_headers = ['class_name']
494
495    def checkHeaders(self, headerfields, mode='ignore'):
496        super(ContractProcessor, self).checkHeaders(headerfields)
497        if mode in ('update', 'remove') and not 'contract_id' in headerfields:
498            raise FatalCSVError(
499                "Need contract_id for import in update and remove modes!")
500        return True
501
502    def getParent(self, row, site):
503        customer = self._getCustomer(row, site)
504        if customer is None:
505            return None
506        return customer['contracts']
507
508    def getEntry(self, row, site):
509        contracts = self.getParent(row, site)
510        if contracts is None:
511            return None
512        contract_id = row.get('contract_id', None)
513        if contract_id is None:
514            return None
515        entry = contracts.get(contract_id)
516        return entry
517
518    def updateEntry(self, obj, row, site, filename):
519        """Update obj to the values given in row.
520        """
521        items_changed = super(ContractProcessor, self).updateEntry(
522            obj, row, site, filename)
523        customer = self.getParent(row, site).__parent__
524        customer.__parent__.logger.info(
525            '%s - %s - %s - updated: %s'
526            % (self.name, filename, customer.customer_id, items_changed))
527        return
528
529    def addEntry(self, obj, row, site):
530        parent = self.getParent(row, site)
531        contract_id = row['contract_id'].strip('#')
532        parent[contract_id] = obj
533        # Reset _curr_con_id if contract_id has been imported
534        site = grok.getSite()
535        if row.get('contract_id', None) not in (None, IGNORE_MARKER):
536            site._curr_con_id -= 1
537        return
538
539    def delEntry(self, row, site):
540        contract = self.getEntry(row, site)
541        parent = self.getParent(row, site)
542        if contract is not None:
543            customer = self._getCustomer(row, site)
544            customer.__parent__.logger.info('%s - Contract removed: %s'
545                % (customer.customer_id, contract.contract_id))
546            del parent[contract.contract_id]
547        return
548
549    def checkConversion(self, row, mode='ignore'):
550        """Validates all values in row.
551        """
552        errs, inv_errs, conv_dict = super(
553            ContractProcessor, self).checkConversion(row, mode=mode)
554        # We need to check if the class_name corresponds with the
555        # processor chosen. This is to avoid accidentally wrong imports.
556        if mode != 'remove':
557            class_name = row.get('class_name', None)
558            if class_name != self.factory_name.strip('waeup.'):
559                errs.append(('class_name','wrong processor'))
560        try:
561            # Correct con_id counter. As the IConverter for contracts
562            # creates contract objects that are not used afterwards, we
563            # have to fix the site-wide con_id counter.
564            site = grok.getSite()
565            site._curr_con_id -= 1
566        except (KeyError, TypeError, AttributeError):
567                pass
568        # We have to check contract_id.
569        contract_id = row.get('contract_id', None)
570        if mode == 'create':
571            if not contract_id:
572                contract_id = generate_contract_id()
573                conv_dict['contract_id'] = contract_id
574                return errs, inv_errs, conv_dict
575            cat = queryUtility(ICatalog, name='contracts_catalog')
576            results = list(
577                cat.searchResults(contract_id=(contract_id, contract_id)))
578            if results:
579                # contract_id must not exist.
580                errs.append(('contract_id','id exists'))
581        else:
582            if not contract_id.startswith('c'):
583                errs.append(('contract_id','invalid format'))
584        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.