source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 12017

Last change on this file since 12017 was 12005, checked in by Henrik Bettermann, 10 years ago

Implement document_id generator. Ensure that document_ids remain unique during import.

  • Property svn:keywords set to Id
File size: 17.4 KB
Line 
1## $Id: batching.py 12005 2014-11-20 05:40:52Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
39    IObjectHistory, IGNORE_MARKER)
40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.documents.utils import generate_document_id
43from waeup.ikoba.customers.interfaces import (
44    ICustomer, ICustomerUpdateByRegNo,
45    ICustomerDocument)
46from waeup.ikoba.customers.workflow import  (
47    IMPORTABLE_STATES, IMPORTABLE_TRANSITIONS)
48from waeup.ikoba.utils.batching import BatchProcessor
49
50
51class CustomerProcessor(BatchProcessor):
52    """A batch processor for ICustomer objects.
53    """
54    grok.implements(IBatchProcessor)
55    grok.provides(IBatchProcessor)
56    grok.context(Interface)
57    util_name = 'customerprocessor'
58    grok.name(util_name)
59
60    name = _('Customer Processor')
61    iface = ICustomer
62    iface_byregnumber = ICustomerUpdateByRegNo
63
64    location_fields = []
65    factory_name = 'waeup.Customer'
66
67    @property
68    def available_fields(self):
69        fields = getFields(self.iface)
70        return sorted(list(set(
71            ['customer_id', 'reg_number',
72            'password', 'state', 'transition'] + fields.keys())))
73
74    def checkHeaders(self, headerfields, mode='create'):
75        if 'state' in headerfields and 'transition' in headerfields:
76            raise FatalCSVError(
77                "State and transition can't be  imported at the same time!")
78        if not 'reg_number' in headerfields and not 'customer_id' \
79            in headerfields:
80            raise FatalCSVError(
81                "Need at least columns customer_id or reg_number " +
82                "for import!")
83        if mode == 'create':
84            for field in self.required_fields:
85                if not field in headerfields:
86                    raise FatalCSVError(
87                        "Need at least columns %s for import!" %
88                        ', '.join(["'%s'" % x for x in self.required_fields]))
89        # Check for fields to be ignored...
90        not_ignored_fields = [x for x in headerfields
91                              if not x.startswith('--')]
92        if len(set(not_ignored_fields)) < len(not_ignored_fields):
93            raise FatalCSVError(
94                "Double headers: each column name may only appear once.")
95        return True
96
97    def parentsExist(self, row, site):
98        return 'customers' in site.keys()
99
100    def getLocator(self, row):
101        if row.get('customer_id', None) not in (None, IGNORE_MARKER):
102            return 'customer_id'
103        elif row.get('reg_number', None) not in (None, IGNORE_MARKER):
104            return 'reg_number'
105        else:
106            return None
107
108    # The entry never exists in create mode.
109    def entryExists(self, row, site):
110        return self.getEntry(row, site) is not None
111
112    def getParent(self, row, site):
113        return site['customers']
114
115    def getEntry(self, row, site):
116        if not 'customers' in site.keys():
117            return None
118        if self.getLocator(row) == 'customer_id':
119            if row['customer_id'] in site['customers']:
120                customer = site['customers'][row['customer_id']]
121                return customer
122        elif self.getLocator(row) == 'reg_number':
123            reg_number = row['reg_number']
124            cat = queryUtility(ICatalog, name='customers_catalog')
125            results = list(
126                cat.searchResults(reg_number=(reg_number, reg_number)))
127            if results:
128                return results[0]
129        return None
130
131    def addEntry(self, obj, row, site):
132        parent = self.getParent(row, site)
133        parent.addCustomer(obj)
134        # Reset _curr_cust_id if customer_id has been imported
135        if self.getLocator(row) == 'customer_id':
136            parent._curr_cust_id -= 1
137        # We have to log this if state is provided. If not,
138        # logging is done by the event handler handle_customer_added
139        if 'state' in row:
140            parent.logger.info('%s - Customer record created' % obj.customer_id)
141        history = IObjectHistory(obj)
142        history.addMessage(_('Customer record created'))
143        return
144
145    def delEntry(self, row, site):
146        customer = self.getEntry(row, site)
147        if customer is not None:
148            parent = self.getParent(row, site)
149            parent.logger.info('%s - Customer removed' % customer.customer_id)
150            del parent[customer.customer_id]
151        pass
152
153    def checkUpdateRequirements(self, obj, row, site):
154        """Checks requirements the object must fulfill when being updated.
155
156        This method is not used in case of deleting or adding objects.
157
158        Returns error messages as strings in case of requirement
159        problems.
160        """
161        transition = row.get('transition', IGNORE_MARKER)
162        if transition not in (IGNORE_MARKER, ''):
163            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
164            if transition not in allowed_transitions:
165                return 'Transition not allowed.'
166        return None
167
168    def updateEntry(self, obj, row, site, filename):
169        """Update obj to the values given in row.
170        """
171        items_changed = ''
172
173        # Remove customer_id from row if empty
174        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
175            row.pop('customer_id')
176
177        # Update password
178        # XXX: Take DELETION_MARKER into consideration
179        if 'password' in row:
180            passwd = row.get('password', IGNORE_MARKER)
181            if passwd not in ('', IGNORE_MARKER):
182                if passwd.startswith('{SSHA}'):
183                    # already encrypted password
184                    obj.password = passwd
185                else:
186                    # not yet encrypted password
187                    IUserAccount(obj).setPassword(passwd)
188                items_changed += ('%s=%s, ' % ('password', passwd))
189            row.pop('password')
190
191        # Update registration state
192        if 'state' in row:
193            state = row.get('state', IGNORE_MARKER)
194            if state not in (IGNORE_MARKER, ''):
195                value = row['state']
196                IWorkflowState(obj).setState(value)
197                msg = _("State '${a}' set", mapping={'a': value})
198                history = IObjectHistory(obj)
199                history.addMessage(msg)
200                items_changed += ('%s=%s, ' % ('state', state))
201            row.pop('state')
202
203        if 'transition' in row:
204            transition = row.get('transition', IGNORE_MARKER)
205            if transition not in (IGNORE_MARKER, ''):
206                value = row['transition']
207                IWorkflowInfo(obj).fireTransition(value)
208                items_changed += ('%s=%s, ' % ('transition', transition))
209            row.pop('transition')
210
211        # apply other values...
212        items_changed += super(CustomerProcessor, self).updateEntry(
213            obj, row, site, filename)
214
215        # Log actions...
216        parent = self.getParent(row, site)
217        if hasattr(obj, 'customer_id'):
218            # Update mode: the customer exists and we can get the customer_id.
219            # Create mode: the record contains the customer_id
220            parent.logger.info(
221                '%s - %s - %s - updated: %s'
222                % (self.name, filename, obj.customer_id, items_changed))
223        else:
224            # Create mode: the customer does not yet exist
225            # XXX: It seems that this never happens because customer_id
226            # is always set.
227            parent.logger.info(
228                '%s - %s - %s - imported: %s'
229                % (self.name, filename, obj.customer_id, items_changed))
230        return items_changed
231
232    def getMapping(self, path, headerfields, mode):
233        """Get a mapping from CSV file headerfields to actually used fieldnames.
234        """
235        result = dict()
236        reader = csv.reader(open(path, 'rb'))
237        raw_header = reader.next()
238        for num, field in enumerate(headerfields):
239            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
240                continue
241            if field == u'--IGNORE--':
242                # Skip ignored columns in failed and finished data files.
243                continue
244            result[raw_header[num]] = field
245        return result
246
247    def checkConversion(self, row, mode='create'):
248        """Validates all values in row.
249        """
250        iface = self.iface
251        if mode in ['update', 'remove']:
252            if self.getLocator(row) == 'reg_number':
253                iface = self.iface_byregnumber
254        converter = IObjectConverter(iface)
255        errs, inv_errs, conv_dict = converter.fromStringDict(
256            row, self.factory_name, mode=mode)
257        if 'transition' in row:
258            if row['transition'] not in IMPORTABLE_TRANSITIONS:
259                if row['transition'] not in (IGNORE_MARKER, ''):
260                    errs.append(('transition', 'not allowed'))
261        if 'state' in row:
262            if row['state'] not in IMPORTABLE_STATES:
263                if row['state'] not in (IGNORE_MARKER, ''):
264                    errs.append(('state', 'not allowed'))
265                else:
266                    # State is an attribute of Customer and must not
267                    # be changed if empty.
268                    conv_dict['state'] = IGNORE_MARKER
269        try:
270            # Correct cust_id counter. As the IConverter for customers
271            # creates customer objects that are not used afterwards, we
272            # have to fix the site-wide customer_id counter.
273            site = grok.getSite()
274            customers = site['customers']
275            customers._curr_cust_id -= 1
276        except (KeyError, TypeError, AttributeError):
277                pass
278        return errs, inv_errs, conv_dict
279
280
281class CustomerProcessorBase(BatchProcessor):
282    """A base for customer subitem processors.
283
284    Helps reducing redundancy.
285    """
286    grok.baseclass()
287
288    # additional available fields
289    # beside 'customer_id', 'reg_number' and 'matric_number'
290    additional_fields = []
291
292    #: header fields additionally required
293    additional_headers = []
294
295    @property
296    def available_fields(self):
297        fields = ['customer_id', 'reg_number'] + self.additional_fields
298        return sorted(list(set(fields + getFields(
299                self.iface).keys())))
300
301    def checkHeaders(self, headerfields, mode='ignore'):
302        if not 'reg_number' in headerfields and not 'customer_id' \
303            in headerfields:
304            raise FatalCSVError(
305                "Need at least columns customer_id " +
306                "or reg_number for import!")
307        for name in self.additional_headers:
308            if not name in headerfields:
309                raise FatalCSVError(
310                    "Need %s for import!" % name)
311
312        # Check for fields to be ignored...
313        not_ignored_fields = [x for x in headerfields
314                              if not x.startswith('--')]
315        if len(set(not_ignored_fields)) < len(not_ignored_fields):
316            raise FatalCSVError(
317                "Double headers: each column name may only appear once.")
318        return True
319
320    def _getCustomer(self, row, site):
321        NON_VALUES = ['', IGNORE_MARKER]
322        if not 'customers' in site.keys():
323            return None
324        if row.get('customer_id', '') not in NON_VALUES:
325            if row['customer_id'] in site['customers']:
326                customer = site['customers'][row['customer_id']]
327                return customer
328        elif row.get('reg_number', '') not in NON_VALUES:
329            reg_number = row['reg_number']
330            cat = queryUtility(ICatalog, name='customers_catalog')
331            results = list(
332                cat.searchResults(reg_number=(reg_number, reg_number)))
333            if results:
334                return results[0]
335        return None
336
337    def parentsExist(self, row, site):
338        return self.getParent(row, site) is not None
339
340    def entryExists(self, row, site):
341        return self.getEntry(row, site) is not None
342
343    def checkConversion(self, row, mode='ignore'):
344        """Validates all values in row.
345        """
346        converter = IObjectConverter(self.iface)
347        errs, inv_errs, conv_dict = converter.fromStringDict(
348            row, self.factory_name, mode=mode)
349        return errs, inv_errs, conv_dict
350
351    def getMapping(self, path, headerfields, mode):
352        """Get a mapping from CSV file headerfields to actually used fieldnames.
353        """
354        result = dict()
355        reader = csv.reader(open(path, 'rb'))
356        raw_header = reader.next()
357        for num, field in enumerate(headerfields):
358            if field not in ['customer_id', 'reg_number', 'document_id', 'code', 'level'
359                             ] and mode == 'remove':
360                continue
361            if field == u'--IGNORE--':
362                # Skip ignored columns in failed and finished data files.
363                continue
364            result[raw_header[num]] = field
365        return result
366
367class CustomerDocumentProcessor(CustomerProcessorBase):
368    """A batch processor for ICustomerDocument objects.
369    """
370    grok.implements(IBatchProcessor)
371    grok.provides(IBatchProcessor)
372    grok.context(Interface)
373    util_name = 'customerdocumentprocessor'
374    grok.name(util_name)
375
376    name = _('CustomerDocument Processor')
377    iface = ICustomerDocument
378    factory_name = 'waeup.CustomerDocument'
379
380    location_fields = []
381    additional_fields = ['document_id']
382    additional_headers = []
383
384    def checkHeaders(self, headerfields, mode='ignore'):
385        super(CustomerDocumentProcessor, self).checkHeaders(headerfields)
386        if mode in ('update', 'remove') and not 'document_id' in headerfields:
387            raise FatalCSVError(
388                "Need document_id for import in update and remove modes!")
389        return True
390
391    def getParent(self, row, site):
392        customer = self._getCustomer(row, site)
393        if customer is None:
394            return None
395        return customer['documents']
396
397    def getEntry(self, row, site):
398        documents = self.getParent(row, site)
399        if documents is None:
400            return None
401        document_id = row.get('document_id', None)
402        if document_id is None:
403            return None
404        entry = documents.get(document_id)
405        return entry
406
407    def updateEntry(self, obj, row, site, filename):
408        """Update obj to the values given in row.
409        """
410        items_changed = super(CustomerDocumentProcessor, self).updateEntry(
411            obj, row, site, filename)
412        customer = self.getParent(row, site).__parent__
413        customer.__parent__.logger.info(
414            '%s - %s - %s - updated: %s'
415            % (self.name, filename, customer.customer_id, items_changed))
416        return
417
418    def addEntry(self, obj, row, site):
419        parent = self.getParent(row, site)
420        document_id = row['document_id'].strip('#')
421        parent[document_id] = obj
422        return
423
424    def delEntry(self, row, site):
425        document = self.getEntry(row, site)
426        parent = self.getParent(row, site)
427        if document is not None:
428            customer = self._getCustomer(row, site)
429            customer.__parent__.logger.info('%s - Document removed: %s'
430                % (customer.customer_id, document.document_id))
431            del parent[document.document_id]
432        return
433
434    def checkConversion(self, row, mode='ignore'):
435        """Validates all values in row.
436        """
437        errs, inv_errs, conv_dict = super(
438            CustomerDocumentProcessor, self).checkConversion(row, mode=mode)
439
440        # We have to check document_id.
441        document_id = row.get('document_id', None)
442        if not document_id:
443            document_id = generate_document_id()
444            conv_dict['document_id'] = document_id
445            return errs, inv_errs, conv_dict
446        # document_id must not exist.
447        if mode == 'create':
448            cat = queryUtility(ICatalog, name='documents_catalog')
449            results = list(
450                cat.searchResults(document_id=(document_id, document_id)))
451            if results:
452                errs.append(('document_id','id exists'))
453        if not document_id.startswith('d'):
454            errs.append(('document_id','invalid format'))
455        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.