source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12236

Last change on this file since 12236 was 12212, checked in by Henrik Bettermann, 10 years ago

Remove unused lists and tuples.

  • Property svn:keywords set to Id
File size: 21.6 KB
Line 
1## $Id: batching.py 12212 2014-12-13 09:36:29Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
39    IObjectHistory, IGNORE_MARKER)
40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.documents.utils import generate_document_id
43from waeup.ikoba.customers.utils import generate_contract_id
44from waeup.ikoba.customers.interfaces import (
45    ICustomer, ICustomerUpdateByRegNo,
46    ICustomerDocument, IContract, ISampleContract)
47from waeup.ikoba.customers.workflow import  (
48    IMPORTABLE_REGISTRATION_STATES, IMPORTABLE_REGISTRATION_TRANSITIONS)
49from waeup.ikoba.utils.batching import BatchProcessor
50
51
52class CustomerProcessor(BatchProcessor):
53    """A batch processor for ICustomer objects.
54    """
55    grok.implements(IBatchProcessor)
56    grok.provides(IBatchProcessor)
57    grok.context(Interface)
58    util_name = 'customerprocessor'
59    grok.name(util_name)
60
61    name = _('Customer Processor')
62    iface = ICustomer
63    iface_byregnumber = ICustomerUpdateByRegNo
64
65    location_fields = []
66    factory_name = 'waeup.Customer'
67
68    @property
69    def available_fields(self):
70        fields = getFields(self.iface)
71        return sorted(list(set(
72            ['customer_id', 'reg_number',
73            'password', 'state', 'transition'] + fields.keys())))
74
75    def checkHeaders(self, headerfields, mode='create'):
76        if 'state' in headerfields and 'transition' in headerfields:
77            raise FatalCSVError(
78                "State and transition can't be  imported at the same time!")
79        if not 'reg_number' in headerfields and not 'customer_id' \
80            in headerfields:
81            raise FatalCSVError(
82                "Need at least columns customer_id or reg_number " +
83                "for import!")
84        if mode == 'create':
85            for field in self.required_fields:
86                if not field in headerfields:
87                    raise FatalCSVError(
88                        "Need at least columns %s for import!" %
89                        ', '.join(["'%s'" % x for x in self.required_fields]))
90        # Check for fields to be ignored...
91        not_ignored_fields = [x for x in headerfields
92                              if not x.startswith('--')]
93        if len(set(not_ignored_fields)) < len(not_ignored_fields):
94            raise FatalCSVError(
95                "Double headers: each column name may only appear once.")
96        return True
97
98    def parentsExist(self, row, site):
99        return 'customers' in site.keys()
100
101    def getLocator(self, row):
102        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
103            return 'customer_id'
104        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
105            return 'reg_number'
106        else:
107            return None
108
109    # The entry never exists in create mode.
110    def entryExists(self, row, site):
111        return self.getEntry(row, site) is not None
112
113    def getParent(self, row, site):
114        return site['customers']
115
116    def getEntry(self, row, site):
117        if not 'customers' in site.keys():
118            return None
119        if self.getLocator(row) == 'customer_id':
120            if row['customer_id'] in site['customers']:
121                customer = site['customers'][row['customer_id']]
122                return customer
123        elif self.getLocator(row) == 'reg_number':
124            reg_number = row['reg_number']
125            cat = queryUtility(ICatalog, name='customers_catalog')
126            results = list(
127                cat.searchResults(reg_number=(reg_number, reg_number)))
128            if results:
129                return results[0]
130        return None
131
132    def addEntry(self, obj, row, site):
133        parent = self.getParent(row, site)
134        parent.addCustomer(obj)
135        # Reset _curr_cust_id if customer_id has been imported
136        if self.getLocator(row) == 'customer_id':
137            parent._curr_cust_id -= 1
138        # We have to log this if state is provided. If not,
139        # logging is done by the event handler handle_customer_added
140        if 'state' in row:
141            parent.logger.info('%s - Customer record created' % obj.customer_id)
142        history = IObjectHistory(obj)
143        history.addMessage(_('Customer record created'))
144        return
145
146    def delEntry(self, row, site):
147        customer = self.getEntry(row, site)
148        if customer is not None:
149            parent = self.getParent(row, site)
150            parent.logger.info('%s - Customer removed' % customer.customer_id)
151            del parent[customer.customer_id]
152        pass
153
154    def checkUpdateRequirements(self, obj, row, site):
155        """Checks requirements the object must fulfill when being updated.
156
157        This method is not used in case of deleting or adding objects.
158
159        Returns error messages as strings in case of requirement
160        problems.
161        """
162        transition = row.get('transition', IGNORE_MARKER)
163        if transition not in (IGNORE_MARKER, ''):
164            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
165            if transition not in allowed_transitions:
166                return 'Transition not allowed.'
167        return None
168
169    def updateEntry(self, obj, row, site, filename):
170        """Update obj to the values given in row.
171        """
172        items_changed = ''
173
174        # Remove customer_id from row if empty
175        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
176            row.pop('customer_id')
177
178        # Update password
179        # XXX: Take DELETION_MARKER into consideration
180        if 'password' in row:
181            passwd = row.get('password', IGNORE_MARKER)
182            if passwd not in ('', IGNORE_MARKER):
183                if passwd.startswith('{SSHA}'):
184                    # already encrypted password
185                    obj.password = passwd
186                else:
187                    # not yet encrypted password
188                    IUserAccount(obj).setPassword(passwd)
189                items_changed += ('%s=%s, ' % ('password', passwd))
190            row.pop('password')
191
192        # Update registration state
193        if 'state' in row:
194            state = row.get('state', IGNORE_MARKER)
195            if state not in (IGNORE_MARKER, ''):
196                value = row['state']
197                IWorkflowState(obj).setState(value)
198                msg = _("State '${a}' set", mapping={'a': value})
199                history = IObjectHistory(obj)
200                history.addMessage(msg)
201                items_changed += ('%s=%s, ' % ('state', state))
202            row.pop('state')
203
204        if 'transition' in row:
205            transition = row.get('transition', IGNORE_MARKER)
206            if transition not in (IGNORE_MARKER, ''):
207                value = row['transition']
208                IWorkflowInfo(obj).fireTransition(value)
209                items_changed += ('%s=%s, ' % ('transition', transition))
210            row.pop('transition')
211
212        # apply other values...
213        items_changed += super(CustomerProcessor, self).updateEntry(
214            obj, row, site, filename)
215
216        # Log actions...
217        parent = self.getParent(row, site)
218        if hasattr(obj, 'customer_id'):
219            # Update mode: the customer exists and we can get the customer_id.
220            # Create mode: the record contains the customer_id
221            parent.logger.info(
222                '%s - %s - %s - updated: %s'
223                % (self.name, filename, obj.customer_id, items_changed))
224        else:
225            # Create mode: the customer does not yet exist
226            # XXX: It seems that this never happens because customer_id
227            # is always set.
228            parent.logger.info(
229                '%s - %s - %s - imported: %s'
230                % (self.name, filename, obj.customer_id, items_changed))
231        return items_changed
232
233    def getMapping(self, path, headerfields, mode):
234        """Get a mapping from CSV file headerfields to actually used fieldnames.
235        """
236        result = dict()
237        reader = csv.reader(open(path, 'rb'))
238        raw_header = reader.next()
239        for num, field in enumerate(headerfields):
240            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
241                continue
242            if field == u'--IGNORE--':
243                # Skip ignored columns in failed and finished data files.
244                continue
245            result[raw_header[num]] = field
246        return result
247
248    def checkConversion(self, row, mode='create'):
249        """Validates all values in row.
250        """
251        iface = self.iface
252        if mode in ['update', 'remove']:
253            if self.getLocator(row) == 'reg_number':
254                iface = self.iface_byregnumber
255        converter = IObjectConverter(iface)
256        errs, inv_errs, conv_dict = converter.fromStringDict(
257            row, self.factory_name, mode=mode)
258        if 'transition' in row:
259            if row['transition'] not in IMPORTABLE_REGISTRATION_TRANSITIONS:
260                if row['transition'] not in (IGNORE_MARKER, ''):
261                    errs.append(('transition', 'not allowed'))
262        if 'state' in row:
263            if row['state'] not in IMPORTABLE_REGISTRATION_STATES:
264                if row['state'] not in (IGNORE_MARKER, ''):
265                    errs.append(('state', 'not allowed'))
266                else:
267                    # State is an attribute of Customer and must not
268                    # be changed if empty.
269                    conv_dict['state'] = IGNORE_MARKER
270        try:
271            # Correct cust_id counter. As the IConverter for customers
272            # creates customer objects that are not used afterwards, we
273            # have to fix the site-wide customer_id counter.
274            site = grok.getSite()
275            customers = site['customers']
276            customers._curr_cust_id -= 1
277        except (KeyError, TypeError, AttributeError):
278                pass
279        return errs, inv_errs, conv_dict
280
281
282class CustomerProcessorBase(BatchProcessor):
283    """A base for customer subitem processors.
284
285    Helps reducing redundancy.
286    """
287    grok.baseclass()
288
289    # additional available fields
290    # beside 'customer_id', 'reg_number' and 'matric_number'
291    additional_fields = []
292
293    #: header fields additionally required
294    additional_headers = []
295
296    @property
297    def available_fields(self):
298        fields = ['customer_id', 'reg_number'] + self.additional_fields
299        return sorted(list(set(fields + getFields(
300                self.iface).keys())))
301
302    def checkHeaders(self, headerfields, mode='ignore'):
303        if not 'reg_number' in headerfields and not 'customer_id' \
304            in headerfields:
305            raise FatalCSVError(
306                "Need at least columns customer_id " +
307                "or reg_number for import!")
308        for name in self.additional_headers:
309            if not name in headerfields:
310                raise FatalCSVError(
311                    "Need %s for import!" % name)
312
313        # Check for fields to be ignored...
314        not_ignored_fields = [x for x in headerfields
315                              if not x.startswith('--')]
316        if len(set(not_ignored_fields)) < len(not_ignored_fields):
317            raise FatalCSVError(
318                "Double headers: each column name may only appear once.")
319        return True
320
321    def _getCustomer(self, row, site):
322        NON_VALUES = ['', IGNORE_MARKER]
323        if not 'customers' in site.keys():
324            return None
325        if row.get('customer_id', '') not in NON_VALUES:
326            if row['customer_id'] in site['customers']:
327                customer = site['customers'][row['customer_id']]
328                return customer
329        elif row.get('reg_number', '') not in NON_VALUES:
330            reg_number = row['reg_number']
331            cat = queryUtility(ICatalog, name='customers_catalog')
332            results = list(
333                cat.searchResults(reg_number=(reg_number, reg_number)))
334            if results:
335                return results[0]
336        return None
337
338    def parentsExist(self, row, site):
339        return self.getParent(row, site) is not None
340
341    def entryExists(self, row, site):
342        return self.getEntry(row, site) is not None
343
344    def checkConversion(self, row, mode='ignore'):
345        """Validates all values in row.
346        """
347        converter = IObjectConverter(self.iface)
348        errs, inv_errs, conv_dict = converter.fromStringDict(
349            row, self.factory_name, mode=mode)
350        return errs, inv_errs, conv_dict
351
352    def getMapping(self, path, headerfields, mode):
353        """Get a mapping from CSV file headerfields to actually used fieldnames.
354        """
355        result = dict()
356        reader = csv.reader(open(path, 'rb'))
357        raw_header = reader.next()
358        for num, field in enumerate(headerfields):
359            if field not in ['customer_id', 'reg_number', 'document_id',
360                             'code', 'level', 'contract_id'
361                             ] and mode == 'remove':
362                continue
363            if field == u'--IGNORE--':
364                # Skip ignored columns in failed and finished data files.
365                continue
366            result[raw_header[num]] = field
367        return result
368
369class CustomerDocumentProcessor(CustomerProcessorBase):
370    """A batch processor for ICustomerDocument objects.
371    """
372    grok.implements(IBatchProcessor)
373    grok.provides(IBatchProcessor)
374    grok.context(Interface)
375    util_name = 'customerdocumentprocessor'
376    grok.name(util_name)
377
378    name = _('CustomerDocument Processor')
379    iface = ICustomerDocument
380    factory_name = 'waeup.CustomerSampleDocument'
381
382    location_fields = []
383    additional_fields = ['document_id', 'class_name']
384    additional_headers = ['class_name']
385
386    def checkHeaders(self, headerfields, mode='ignore'):
387        super(CustomerDocumentProcessor, self).checkHeaders(headerfields)
388        if mode in ('update', 'remove') and not 'document_id' in headerfields:
389            raise FatalCSVError(
390                "Need document_id for import in update and remove modes!")
391        return True
392
393    def getParent(self, row, site):
394        customer = self._getCustomer(row, site)
395        if customer is None:
396            return None
397        return customer['documents']
398
399    def getEntry(self, row, site):
400        documents = self.getParent(row, site)
401        if documents is None:
402            return None
403        document_id = row.get('document_id', None)
404        if document_id is None:
405            return None
406        entry = documents.get(document_id)
407        return entry
408
409    def updateEntry(self, obj, row, site, filename):
410        """Update obj to the values given in row.
411        """
412        items_changed = super(CustomerDocumentProcessor, self).updateEntry(
413            obj, row, site, filename)
414        customer = self.getParent(row, site).__parent__
415        customer.__parent__.logger.info(
416            '%s - %s - %s - updated: %s'
417            % (self.name, filename, customer.customer_id, items_changed))
418        return
419
420    def addEntry(self, obj, row, site):
421        parent = self.getParent(row, site)
422        document_id = row['document_id'].strip('#')
423        parent[document_id] = obj
424        return
425
426    def delEntry(self, row, site):
427        document = self.getEntry(row, site)
428        parent = self.getParent(row, site)
429        if document is not None:
430            customer = self._getCustomer(row, site)
431            customer.__parent__.logger.info('%s - Document removed: %s'
432                % (customer.customer_id, document.document_id))
433            del parent[document.document_id]
434        return
435
436    def checkConversion(self, row, mode='ignore'):
437        """Validates all values in row.
438        """
439        errs, inv_errs, conv_dict = super(
440            CustomerDocumentProcessor, self).checkConversion(row, mode=mode)
441        # We need to check if the class_name corresponds with the
442        # processor chosen. This is to avoid accidentally wrong imports.
443        if mode != 'remove':
444            class_name = row.get('class_name', None)
445            if class_name != self.factory_name.strip('waeup.'):
446                errs.append(('class_name','wrong processor'))
447        # We have to check document_id.
448        document_id = row.get('document_id', None)
449        if mode == 'create':
450            if not document_id:
451                document_id = generate_document_id()
452                conv_dict['document_id'] = document_id
453                return errs, inv_errs, conv_dict
454            cat = queryUtility(ICatalog, name='documents_catalog')
455            results = list(
456                cat.searchResults(document_id=(document_id, document_id)))
457            if results:
458                # document_id must not exist.
459                errs.append(('document_id','id exists'))
460        else:
461            if not document_id.startswith('d'):
462                errs.append(('document_id','invalid format'))
463        return errs, inv_errs, conv_dict
464
465
466class ContractProcessor(CustomerProcessorBase):
467    """A batch processor for IContract objects.
468    """
469    grok.implements(IBatchProcessor)
470    grok.provides(IBatchProcessor)
471    grok.context(Interface)
472    util_name = 'contractprocessor'
473    grok.name(util_name)
474
475    name = _('Contract Processor')
476    iface = ISampleContract
477    factory_name = 'waeup.SampleContract'
478
479    location_fields = []
480    additional_fields = ['contract_id', 'class_name']
481    additional_headers = ['class_name']
482
483    def checkHeaders(self, headerfields, mode='ignore'):
484        super(ContractProcessor, self).checkHeaders(headerfields)
485        if mode in ('update', 'remove') and not 'contract_id' in headerfields:
486            raise FatalCSVError(
487                "Need contract_id for import in update and remove modes!")
488        return True
489
490    def getParent(self, row, site):
491        customer = self._getCustomer(row, site)
492        if customer is None:
493            return None
494        return customer['contracts']
495
496    def getEntry(self, row, site):
497        contracts = self.getParent(row, site)
498        if contracts is None:
499            return None
500        contract_id = row.get('contract_id', None)
501        if contract_id is None:
502            return None
503        entry = contracts.get(contract_id)
504        return entry
505
506    def updateEntry(self, obj, row, site, filename):
507        """Update obj to the values given in row.
508        """
509        items_changed = super(ContractProcessor, self).updateEntry(
510            obj, row, site, filename)
511        customer = self.getParent(row, site).__parent__
512        customer.__parent__.logger.info(
513            '%s - %s - %s - updated: %s'
514            % (self.name, filename, customer.customer_id, items_changed))
515        return
516
517    def addEntry(self, obj, row, site):
518        parent = self.getParent(row, site)
519        contract_id = row['contract_id'].strip('#')
520        parent[contract_id] = obj
521        return
522
523    def delEntry(self, row, site):
524        contract = self.getEntry(row, site)
525        parent = self.getParent(row, site)
526        if contract is not None:
527            customer = self._getCustomer(row, site)
528            customer.__parent__.logger.info('%s - Contract removed: %s'
529                % (customer.customer_id, contract.contract_id))
530            del parent[contract.contract_id]
531        return
532
533    def checkConversion(self, row, mode='ignore'):
534        """Validates all values in row.
535        """
536        errs, inv_errs, conv_dict = super(
537            ContractProcessor, self).checkConversion(row, mode=mode)
538        # We need to check if the class_name corresponds with the
539        # processor chosen. This is to avoid accidentally wrong imports.
540        if mode != 'remove':
541            class_name = row.get('class_name', None)
542            if class_name != self.factory_name.strip('waeup.'):
543                errs.append(('class_name','wrong processor'))
544        # We have to check contract_id.
545        contract_id = row.get('contract_id', None)
546        if mode == 'create':
547            if not contract_id:
548                contract_id = generate_contract_id()
549                conv_dict['contract_id'] = contract_id
550                return errs, inv_errs, conv_dict
551            cat = queryUtility(ICatalog, name='contracts_catalog')
552            results = list(
553                cat.searchResults(contract_id=(contract_id, contract_id)))
554            if results:
555                # contract_id must not exist.
556                errs.append(('contract_id','id exists'))
557        else:
558            if not contract_id.startswith('c'):
559                errs.append(('contract_id','invalid format'))
560        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.