source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12100

Last change on this file since 12100 was 12097, checked in by Henrik Bettermann, 10 years ago

The term 'application' should really not be used in Python-based portal software.

Replace 'application' by 'contract': batch 1

  • Property svn:keywords set to Id
File size: 21.5 KB
Line 
1## $Id: batching.py 12097 2014-11-30 20:49:22Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
39    IObjectHistory, IGNORE_MARKER)
40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.documents.utils import generate_document_id
43from waeup.ikoba.customers.interfaces import (
44    ICustomer, ICustomerUpdateByRegNo,
45    ICustomerDocument, IContract)
46from waeup.ikoba.customers.workflow import  (
47    IMPORTABLE_REGISTRATION_STATES, IMPORTABLE_REGISTRATION_TRANSITIONS,
48    IMPORTABLE_CONTRACT_STATES, IMPORTABLE_CONTRACT_TRANSITIONS)
49from waeup.ikoba.utils.batching import BatchProcessor
50
51
52class CustomerProcessor(BatchProcessor):
53    """A batch processor for ICustomer objects.
54    """
55    grok.implements(IBatchProcessor)
56    grok.provides(IBatchProcessor)
57    grok.context(Interface)
58    util_name = 'customerprocessor'
59    grok.name(util_name)
60
61    name = _('Customer Processor')
62    iface = ICustomer
63    iface_byregnumber = ICustomerUpdateByRegNo
64
65    location_fields = []
66    factory_name = 'waeup.Customer'
67
68    @property
69    def available_fields(self):
70        fields = getFields(self.iface)
71        return sorted(list(set(
72            ['customer_id', 'reg_number',
73            'password', 'state', 'transition'] + fields.keys())))
74
75    def checkHeaders(self, headerfields, mode='create'):
76        if 'state' in headerfields and 'transition' in headerfields:
77            raise FatalCSVError(
78                "State and transition can't be  imported at the same time!")
79        if not 'reg_number' in headerfields and not 'customer_id' \
80            in headerfields:
81            raise FatalCSVError(
82                "Need at least columns customer_id or reg_number " +
83                "for import!")
84        if mode == 'create':
85            for field in self.required_fields:
86                if not field in headerfields:
87                    raise FatalCSVError(
88                        "Need at least columns %s for import!" %
89                        ', '.join(["'%s'" % x for x in self.required_fields]))
90        # Check for fields to be ignored...
91        not_ignored_fields = [x for x in headerfields
92                              if not x.startswith('--')]
93        if len(set(not_ignored_fields)) < len(not_ignored_fields):
94            raise FatalCSVError(
95                "Double headers: each column name may only appear once.")
96        return True
97
98    def parentsExist(self, row, site):
99        return 'customers' in site.keys()
100
101    def getLocator(self, row):
102        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
103            return 'customer_id'
104        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
105            return 'reg_number'
106        else:
107            return None
108
109    # The entry never exists in create mode.
110    def entryExists(self, row, site):
111        return self.getEntry(row, site) is not None
112
113    def getParent(self, row, site):
114        return site['customers']
115
116    def getEntry(self, row, site):
117        if not 'customers' in site.keys():
118            return None
119        if self.getLocator(row) == 'customer_id':
120            if row['customer_id'] in site['customers']:
121                customer = site['customers'][row['customer_id']]
122                return customer
123        elif self.getLocator(row) == 'reg_number':
124            reg_number = row['reg_number']
125            cat = queryUtility(ICatalog, name='customers_catalog')
126            results = list(
127                cat.searchResults(reg_number=(reg_number, reg_number)))
128            if results:
129                return results[0]
130        return None
131
132    def addEntry(self, obj, row, site):
133        parent = self.getParent(row, site)
134        parent.addCustomer(obj)
135        # Reset _curr_cust_id if customer_id has been imported
136        if self.getLocator(row) == 'customer_id':
137            parent._curr_cust_id -= 1
138        # We have to log this if state is provided. If not,
139        # logging is done by the event handler handle_customer_added
140        if 'state' in row:
141            parent.logger.info('%s - Customer record created' % obj.customer_id)
142        history = IObjectHistory(obj)
143        history.addMessage(_('Customer record created'))
144        return
145
146    def delEntry(self, row, site):
147        customer = self.getEntry(row, site)
148        if customer is not None:
149            parent = self.getParent(row, site)
150            parent.logger.info('%s - Customer removed' % customer.customer_id)
151            del parent[customer.customer_id]
152        pass
153
154    def checkUpdateRequirements(self, obj, row, site):
155        """Checks requirements the object must fulfill when being updated.
156
157        This method is not used in case of deleting or adding objects.
158
159        Returns error messages as strings in case of requirement
160        problems.
161        """
162        transition = row.get('transition', IGNORE_MARKER)
163        if transition not in (IGNORE_MARKER, ''):
164            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
165            if transition not in allowed_transitions:
166                return 'Transition not allowed.'
167        return None
168
169    def updateEntry(self, obj, row, site, filename):
170        """Update obj to the values given in row.
171        """
172        items_changed = ''
173
174        # Remove customer_id from row if empty
175        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
176            row.pop('customer_id')
177
178        # Update password
179        # XXX: Take DELETION_MARKER into consideration
180        if 'password' in row:
181            passwd = row.get('password', IGNORE_MARKER)
182            if passwd not in ('', IGNORE_MARKER):
183                if passwd.startswith('{SSHA}'):
184                    # already encrypted password
185                    obj.password = passwd
186                else:
187                    # not yet encrypted password
188                    IUserAccount(obj).setPassword(passwd)
189                items_changed += ('%s=%s, ' % ('password', passwd))
190            row.pop('password')
191
192        # Update registration state
193        if 'state' in row:
194            state = row.get('state', IGNORE_MARKER)
195            if state not in (IGNORE_MARKER, ''):
196                value = row['state']
197                IWorkflowState(obj).setState(value)
198                msg = _("State '${a}' set", mapping={'a': value})
199                history = IObjectHistory(obj)
200                history.addMessage(msg)
201                items_changed += ('%s=%s, ' % ('state', state))
202            row.pop('state')
203
204        if 'transition' in row:
205            transition = row.get('transition', IGNORE_MARKER)
206            if transition not in (IGNORE_MARKER, ''):
207                value = row['transition']
208                IWorkflowInfo(obj).fireTransition(value)
209                items_changed += ('%s=%s, ' % ('transition', transition))
210            row.pop('transition')
211
212        # apply other values...
213        items_changed += super(CustomerProcessor, self).updateEntry(
214            obj, row, site, filename)
215
216        # Log actions...
217        parent = self.getParent(row, site)
218        if hasattr(obj, 'customer_id'):
219            # Update mode: the customer exists and we can get the customer_id.
220            # Create mode: the record contains the customer_id
221            parent.logger.info(
222                '%s - %s - %s - updated: %s'
223                % (self.name, filename, obj.customer_id, items_changed))
224        else:
225            # Create mode: the customer does not yet exist
226            # XXX: It seems that this never happens because customer_id
227            # is always set.
228            parent.logger.info(
229                '%s - %s - %s - imported: %s'
230                % (self.name, filename, obj.customer_id, items_changed))
231        return items_changed
232
233    def getMapping(self, path, headerfields, mode):
234        """Get a mapping from CSV file headerfields to actually used fieldnames.
235        """
236        result = dict()
237        reader = csv.reader(open(path, 'rb'))
238        raw_header = reader.next()
239        for num, field in enumerate(headerfields):
240            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
241                continue
242            if field == u'--IGNORE--':
243                # Skip ignored columns in failed and finished data files.
244                continue
245            result[raw_header[num]] = field
246        return result
247
248    def checkConversion(self, row, mode='create'):
249        """Validates all values in row.
250        """
251        iface = self.iface
252        if mode in ['update', 'remove']:
253            if self.getLocator(row) == 'reg_number':
254                iface = self.iface_byregnumber
255        converter = IObjectConverter(iface)
256        errs, inv_errs, conv_dict = converter.fromStringDict(
257            row, self.factory_name, mode=mode)
258        if 'transition' in row:
259            if row['transition'] not in IMPORTABLE_REGISTRATION_TRANSITIONS:
260                if row['transition'] not in (IGNORE_MARKER, ''):
261                    errs.append(('transition', 'not allowed'))
262        if 'state' in row:
263            if row['state'] not in IMPORTABLE_REGISTRATION_STATES:
264                if row['state'] not in (IGNORE_MARKER, ''):
265                    errs.append(('state', 'not allowed'))
266                else:
267                    # State is an attribute of Customer and must not
268                    # be changed if empty.
269                    conv_dict['state'] = IGNORE_MARKER
270        try:
271            # Correct cust_id counter. As the IConverter for customers
272            # creates customer objects that are not used afterwards, we
273            # have to fix the site-wide customer_id counter.
274            site = grok.getSite()
275            customers = site['customers']
276            customers._curr_cust_id -= 1
277        except (KeyError, TypeError, AttributeError):
278                pass
279        return errs, inv_errs, conv_dict
280
281
282class CustomerProcessorBase(BatchProcessor):
283    """A base for customer subitem processors.
284
285    Helps reducing redundancy.
286    """
287    grok.baseclass()
288
289    # additional available fields
290    # beside 'customer_id', 'reg_number' and 'matric_number'
291    additional_fields = []
292
293    #: header fields additionally required
294    additional_headers = []
295
296    @property
297    def available_fields(self):
298        fields = ['customer_id', 'reg_number'] + self.additional_fields
299        return sorted(list(set(fields + getFields(
300                self.iface).keys())))
301
302    def checkHeaders(self, headerfields, mode='ignore'):
303        if not 'reg_number' in headerfields and not 'customer_id' \
304            in headerfields:
305            raise FatalCSVError(
306                "Need at least columns customer_id " +
307                "or reg_number for import!")
308        for name in self.additional_headers:
309            if not name in headerfields:
310                raise FatalCSVError(
311                    "Need %s for import!" % name)
312
313        # Check for fields to be ignored...
314        not_ignored_fields = [x for x in headerfields
315                              if not x.startswith('--')]
316        if len(set(not_ignored_fields)) < len(not_ignored_fields):
317            raise FatalCSVError(
318                "Double headers: each column name may only appear once.")
319        return True
320
321    def _getCustomer(self, row, site):
322        NON_VALUES = ['', IGNORE_MARKER]
323        if not 'customers' in site.keys():
324            return None
325        if row.get('customer_id', '') not in NON_VALUES:
326            if row['customer_id'] in site['customers']:
327                customer = site['customers'][row['customer_id']]
328                return customer
329        elif row.get('reg_number', '') not in NON_VALUES:
330            reg_number = row['reg_number']
331            cat = queryUtility(ICatalog, name='customers_catalog')
332            results = list(
333                cat.searchResults(reg_number=(reg_number, reg_number)))
334            if results:
335                return results[0]
336        return None
337
338    def parentsExist(self, row, site):
339        return self.getParent(row, site) is not None
340
341    def entryExists(self, row, site):
342        return self.getEntry(row, site) is not None
343
344    def checkConversion(self, row, mode='ignore'):
345        """Validates all values in row.
346        """
347        converter = IObjectConverter(self.iface)
348        errs, inv_errs, conv_dict = converter.fromStringDict(
349            row, self.factory_name, mode=mode)
350        return errs, inv_errs, conv_dict
351
352    def getMapping(self, path, headerfields, mode):
353        """Get a mapping from CSV file headerfields to actually used fieldnames.
354        """
355        result = dict()
356        reader = csv.reader(open(path, 'rb'))
357        raw_header = reader.next()
358        for num, field in enumerate(headerfields):
359            if field not in ['customer_id', 'reg_number', 'document_id', 'code', 'level'
360                             ] and mode == 'remove':
361                continue
362            if field == u'--IGNORE--':
363                # Skip ignored columns in failed and finished data files.
364                continue
365            result[raw_header[num]] = field
366        return result
367
368class CustomerDocumentProcessor(CustomerProcessorBase):
369    """A batch processor for ICustomerDocument objects.
370    """
371    grok.implements(IBatchProcessor)
372    grok.provides(IBatchProcessor)
373    grok.context(Interface)
374    util_name = 'customerdocumentprocessor'
375    grok.name(util_name)
376
377    name = _('CustomerDocument Processor')
378    iface = ICustomerDocument
379    factory_name = 'waeup.CustomerSampleDocument'
380
381    location_fields = []
382    additional_fields = ['document_id', 'class_name']
383    additional_headers = ['class_name']
384
385    def checkHeaders(self, headerfields, mode='ignore'):
386        super(CustomerDocumentProcessor, self).checkHeaders(headerfields)
387        if mode in ('update', 'remove') and not 'document_id' in headerfields:
388            raise FatalCSVError(
389                "Need document_id for import in update and remove modes!")
390        return True
391
392    def getParent(self, row, site):
393        customer = self._getCustomer(row, site)
394        if customer is None:
395            return None
396        return customer['documents']
397
398    def getEntry(self, row, site):
399        documents = self.getParent(row, site)
400        if documents is None:
401            return None
402        document_id = row.get('document_id', None)
403        if document_id is None:
404            return None
405        entry = documents.get(document_id)
406        return entry
407
408    def updateEntry(self, obj, row, site, filename):
409        """Update obj to the values given in row.
410        """
411        items_changed = super(CustomerDocumentProcessor, self).updateEntry(
412            obj, row, site, filename)
413        customer = self.getParent(row, site).__parent__
414        customer.__parent__.logger.info(
415            '%s - %s - %s - updated: %s'
416            % (self.name, filename, customer.customer_id, items_changed))
417        return
418
419    def addEntry(self, obj, row, site):
420        parent = self.getParent(row, site)
421        document_id = row['document_id'].strip('#')
422        parent[document_id] = obj
423        return
424
425    def delEntry(self, row, site):
426        document = self.getEntry(row, site)
427        parent = self.getParent(row, site)
428        if document is not None:
429            customer = self._getCustomer(row, site)
430            customer.__parent__.logger.info('%s - Document removed: %s'
431                % (customer.customer_id, document.document_id))
432            del parent[document.document_id]
433        return
434
435    def checkConversion(self, row, mode='ignore'):
436        """Validates all values in row.
437        """
438        errs, inv_errs, conv_dict = super(
439            CustomerDocumentProcessor, self).checkConversion(row, mode=mode)
440        # We need to check if the class_name corresponds with the
441        # processor chosen. This is to avoid accidentally wrong imports.
442        if mode != 'remove':
443            class_name = row.get('class_name', None)
444            if class_name != self.factory_name.strip('waeup.'):
445                errs.append(('class_name','wrong processor'))
446        # We have to check document_id.
447        document_id = row.get('document_id', None)
448        if mode == 'create':
449            if not document_id:
450                document_id = generate_document_id()
451                conv_dict['document_id'] = document_id
452                return errs, inv_errs, conv_dict
453            cat = queryUtility(ICatalog, name='documents_catalog')
454            results = list(
455                cat.searchResults(document_id=(document_id, document_id)))
456            if results:
457                # document_id must not exist.
458                errs.append(('document_id','id exists'))
459        else:
460            if not document_id.startswith('d'):
461                errs.append(('document_id','invalid format'))
462        return errs, inv_errs, conv_dict
463
464
465class ContractProcessor(CustomerProcessorBase):
466    """A batch processor for IContract objects.
467    """
468    grok.implements(IBatchProcessor)
469    grok.provides(IBatchProcessor)
470    grok.context(Interface)
471    util_name = 'contractprocessor'
472    grok.name(util_name)
473
474    name = _('Contract Processor')
475    iface = IContract
476    factory_name = 'waeup.SampleContract'
477
478    location_fields = []
479    additional_fields = ['contract_id', 'class_name']
480    additional_headers = ['class_name']
481
482    def checkHeaders(self, headerfields, mode='ignore'):
483        super(ContractProcessor, self).checkHeaders(headerfields)
484        if mode in ('update', 'remove') and not 'contract_id' in headerfields:
485            raise FatalCSVError(
486                "Need contract_id for import in update and remove modes!")
487        return True
488
489    def getParent(self, row, site):
490        customer = self._getCustomer(row, site)
491        if customer is None:
492            return None
493        return customer['contracts']
494
495    def getEntry(self, row, site):
496        contracts = self.getParent(row, site)
497        if contracts is None:
498            return None
499        contract_id = row.get('contract_id', None)
500        if contract_id is None:
501            return None
502        entry = contracts.get(contract_id)
503        return entry
504
505    def updateEntry(self, obj, row, site, filename):
506        """Update obj to the values given in row.
507        """
508        items_changed = super(ContractProcessor, self).updateEntry(
509            obj, row, site, filename)
510        customer = self.getParent(row, site).__parent__
511        customer.__parent__.logger.info(
512            '%s - %s - %s - updated: %s'
513            % (self.name, filename, customer.customer_id, items_changed))
514        return
515
516    def addEntry(self, obj, row, site):
517        parent = self.getParent(row, site)
518        contract_id = row['contract_id'].strip('#')
519        parent[contract_id] = obj
520        return
521
522    def delEntry(self, row, site):
523        contract = self.getEntry(row, site)
524        parent = self.getParent(row, site)
525        if contract is not None:
526            customer = self._getCustomer(row, site)
527            customer.__parent__.logger.info('%s - Contract removed: %s'
528                % (customer.customer_id, contract.contract_id))
529            del parent[contract.contract_id]
530        return
531
532    def checkConversion(self, row, mode='ignore'):
533        """Validates all values in row.
534        """
535        errs, inv_errs, conv_dict = super(
536            ContractProcessor, self).checkConversion(row, mode=mode)
537        # We need to check if the class_name corresponds with the
538        # processor chosen. This is to avoid accidentally wrong imports.
539        if mode != 'remove':
540            class_name = row.get('class_name', None)
541            if class_name != self.factory_name.strip('waeup.'):
542                errs.append(('class_name','wrong processor'))
543        # We have to check contract_id.
544        contract_id = row.get('contract_id', None)
545        if mode == 'create':
546            if not contract_id:
547                contract_id = generate_contract_id()
548                conv_dict['contract_id'] = contract_id
549                return errs, inv_errs, conv_dict
550            cat = queryUtility(ICatalog, name='contracts_catalog')
551            results = list(
552                cat.searchResults(contract_id=(contract_id, contract_id)))
553            if results:
554                # contract_id must not exist.
555                errs.append(('contract_id','id exists'))
556        else:
557            if not contract_id.startswith('a'):
558                errs.append(('contract_id','invalid format'))
559        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.