source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py @ 11984

Last change on this file since 11984 was 11964, checked in by Henrik Bettermann, 10 years ago

Adjust customer workflow.

File size: 14.1 KB
RevLine 
[11958]1## $Id: batching.py 11891 2014-10-28 20:02:45Z henrik $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing components for customer objects.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22
23Here we define the processors for customers specific objects like
24customers and subobjects.
25"""
26import grok
27import unicodecsv as csv # XXX: csv ops should move to dedicated module.
28from time import time
29from datetime import datetime
30from zope.i18n import translate
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.component import queryUtility, getUtility, createObject
34from zope.event import notify
35from zope.catalog.interfaces import ICatalog
36from hurry.workflow.interfaces import IWorkflowState, IWorkflowInfo
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IObjectConverter, IUserAccount,
[11964]39    IObjectHistory, IGNORE_MARKER)
[11958]40from waeup.ikoba.interfaces import IIkobaUtils
41from waeup.ikoba.interfaces import MessageFactory as _
42from waeup.ikoba.customers.interfaces import (
43    ICustomer, ICustomerUpdateByRegNo)
44from waeup.ikoba.customers.workflow import  (
45    IMPORTABLE_STATES, IMPORTABLE_TRANSITIONS)
46from waeup.ikoba.utils.batching import BatchProcessor
47
48class CustomerProcessor(BatchProcessor):
49    """A batch processor for ICustomer objects.
50    """
51    grok.implements(IBatchProcessor)
52    grok.provides(IBatchProcessor)
53    grok.context(Interface)
54    util_name = 'customerprocessor'
55    grok.name(util_name)
56
57    name = _('Customer Processor')
58    iface = ICustomer
59    iface_byregnumber = ICustomerUpdateByRegNo
60
61    location_fields = []
62    factory_name = 'waeup.Customer'
63
64    @property
65    def available_fields(self):
66        fields = getFields(self.iface)
67        return sorted(list(set(
68            ['customer_id','reg_number',
69            'password', 'state', 'transition'] + fields.keys())))
70
71    def checkHeaders(self, headerfields, mode='create'):
72        if 'state' in headerfields and 'transition' in headerfields:
73            raise FatalCSVError(
74                "State and transition can't be  imported at the same time!")
75        if not 'reg_number' in headerfields and not 'customer_id' \
76            in headerfields:
77            raise FatalCSVError(
78                "Need at least columns customer_id or reg_number " +
79                "for import!")
80        if mode == 'create':
81            for field in self.required_fields:
82                if not field in headerfields:
83                    raise FatalCSVError(
84                        "Need at least columns %s for import!" %
85                        ', '.join(["'%s'" % x for x in self.required_fields]))
86        # Check for fields to be ignored...
87        not_ignored_fields = [x for x in headerfields
88                              if not x.startswith('--')]
89        if len(set(not_ignored_fields)) < len(not_ignored_fields):
90            raise FatalCSVError(
91                "Double headers: each column name may only appear once.")
92        return True
93
94    def parentsExist(self, row, site):
95        return 'customers' in site.keys()
96
97    def getLocator(self, row):
98        if row.get('customer_id',None) not in (None, IGNORE_MARKER):
99            return 'customer_id'
100        elif row.get('reg_number',None) not in (None, IGNORE_MARKER):
101            return 'reg_number'
102        else:
103            return None
104
105    # The entry never exists in create mode.
106    def entryExists(self, row, site):
107        return self.getEntry(row, site) is not None
108
109    def getParent(self, row, site):
110        return site['customers']
111
112    def getEntry(self, row, site):
113        if not 'customers' in site.keys():
114            return None
115        if self.getLocator(row) == 'customer_id':
116            if row['customer_id'] in site['customers']:
117                customer = site['customers'][row['customer_id']]
118                return customer
119        elif self.getLocator(row) == 'reg_number':
120            reg_number = row['reg_number']
121            cat = queryUtility(ICatalog, name='customers_catalog')
122            results = list(
123                cat.searchResults(reg_number=(reg_number, reg_number)))
124            if results:
125                return results[0]
126        return None
127
128    def addEntry(self, obj, row, site):
129        parent = self.getParent(row, site)
130        parent.addCustomer(obj)
131        # Reset _curr_cust_id if customer_id has been imported
132        if self.getLocator(row) == 'customer_id':
133            parent._curr_cust_id -= 1
134        # We have to log this if state is provided. If not,
135        # logging is done by the event handler handle_customer_added
136        if 'state' in row:
137            parent.logger.info('%s - Customer record created' % obj.customer_id)
138        history = IObjectHistory(obj)
139        history.addMessage(_('Customer record created'))
140        return
141
142    def delEntry(self, row, site):
143        customer = self.getEntry(row, site)
144        if customer is not None:
145            parent = self.getParent(row, site)
146            parent.logger.info('%s - Customer removed' % customer.customer_id)
147            del parent[customer.customer_id]
148        pass
149
150    def checkUpdateRequirements(self, obj, row, site):
151        """Checks requirements the object must fulfill when being updated.
152
153        This method is not used in case of deleting or adding objects.
154
155        Returns error messages as strings in case of requirement
156        problems.
157        """
158        transition = row.get('transition', IGNORE_MARKER)
159        if transition not in (IGNORE_MARKER, ''):
160            allowed_transitions = IWorkflowInfo(obj).getManualTransitionIds()
161            if transition not in allowed_transitions:
162                return 'Transition not allowed.'
163        return None
164
165    def updateEntry(self, obj, row, site, filename):
166        """Update obj to the values given in row.
167        """
168        items_changed = ''
169
170        # Remove customer_id from row if empty
171        if 'customer_id' in row and row['customer_id'] in (None, IGNORE_MARKER):
172            row.pop('customer_id')
173
174        # Update password
175        # XXX: Take DELETION_MARKER into consideration
176        if 'password' in row:
177            passwd = row.get('password', IGNORE_MARKER)
178            if passwd not in ('', IGNORE_MARKER):
179                if passwd.startswith('{SSHA}'):
180                    # already encrypted password
181                    obj.password = passwd
182                else:
183                    # not yet encrypted password
184                    IUserAccount(obj).setPassword(passwd)
185                items_changed += ('%s=%s, ' % ('password', passwd))
186            row.pop('password')
187
188        # Update registration state
189        if 'state' in row:
190            state = row.get('state', IGNORE_MARKER)
191            if state not in (IGNORE_MARKER, ''):
192                value = row['state']
193                IWorkflowState(obj).setState(value)
194                msg = _("State '${a}' set", mapping = {'a':value})
195                history = IObjectHistory(obj)
196                history.addMessage(msg)
197                items_changed += ('%s=%s, ' % ('state', state))
198            row.pop('state')
199
200        if 'transition' in row:
201            transition = row.get('transition', IGNORE_MARKER)
202            if transition not in (IGNORE_MARKER, ''):
203                value = row['transition']
204                IWorkflowInfo(obj).fireTransition(value)
205                items_changed += ('%s=%s, ' % ('transition', transition))
206            row.pop('transition')
207
208        # apply other values...
209        items_changed += super(CustomerProcessor, self).updateEntry(
210            obj, row, site, filename)
211
212        # Log actions...
213        parent = self.getParent(row, site)
214        if hasattr(obj,'customer_id'):
215            # Update mode: the customer exists and we can get the customer_id.
216            # Create mode: the record contains the customer_id
217            parent.logger.info(
218                '%s - %s - %s - updated: %s'
219                % (self.name, filename, obj.customer_id, items_changed))
220        else:
221            # Create mode: the customer does not yet exist
222            # XXX: It seems that this never happens because customer_id
223            # is always set.
224            parent.logger.info(
225                '%s - %s - %s - imported: %s'
226                % (self.name, filename, obj.customer_id, items_changed))
227        return items_changed
228
229    def getMapping(self, path, headerfields, mode):
230        """Get a mapping from CSV file headerfields to actually used fieldnames.
231        """
232        result = dict()
233        reader = csv.reader(open(path, 'rb'))
234        raw_header = reader.next()
235        for num, field in enumerate(headerfields):
236            if field not in ['customer_id', 'reg_number'] and mode == 'remove':
237                continue
238            if field == u'--IGNORE--':
239                # Skip ignored columns in failed and finished data files.
240                continue
241            result[raw_header[num]] = field
242        return result
243
244    def checkConversion(self, row, mode='create'):
245        """Validates all values in row.
246        """
247        iface = self.iface
248        if mode in ['update', 'remove']:
249            if self.getLocator(row) == 'reg_number':
250                iface = self.iface_byregnumber
251        converter = IObjectConverter(iface)
252        errs, inv_errs, conv_dict =  converter.fromStringDict(
253            row, self.factory_name, mode=mode)
254        if 'transition' in row:
255            if row['transition'] not in IMPORTABLE_TRANSITIONS:
256                if row['transition'] not in (IGNORE_MARKER, ''):
257                    errs.append(('transition','not allowed'))
258        if 'state' in row:
259            if row['state'] not in IMPORTABLE_STATES:
260                if row['state'] not in (IGNORE_MARKER, ''):
261                    errs.append(('state','not allowed'))
262                else:
263                    # State is an attribute of Customer and must not
264                    # be changed if empty.
265                    conv_dict['state'] = IGNORE_MARKER
266        try:
267            # Correct cust_id counter. As the IConverter for customers
268            # creates customer objects that are not used afterwards, we
269            # have to fix the site-wide customer_id counter.
270            site = grok.getSite()
271            customers = site['customers']
272            customers._curr_cust_id -= 1
273        except (KeyError, TypeError, AttributeError):
274                pass
275        return errs, inv_errs, conv_dict
276
277
278class CustomerProcessorBase(BatchProcessor):
279    """A base for customer subitem processor.
280
281    Helps reducing redundancy.
282    """
283    grok.baseclass()
284
285    # additional available fields
286    # beside 'customer_id', 'reg_number' and 'matric_number'
287    additional_fields = []
288
289    #: header fields additionally required
290    additional_headers = []
291
292    @property
293    def available_fields(self):
294        fields = ['customer_id','reg_number'] + self.additional_fields
295        return sorted(list(set(fields + getFields(
296                self.iface).keys())))
297
298    def checkHeaders(self, headerfields, mode='ignore'):
299        if not 'reg_number' in headerfields and not 'customer_id' \
300            in headerfields:
301            raise FatalCSVError(
302                "Need at least columns customer_id " +
303                "or reg_number for import!")
304        for name in self.additional_headers:
305            if not name in headerfields:
306                raise FatalCSVError(
307                    "Need %s for import!" % name)
308
309        # Check for fields to be ignored...
310        not_ignored_fields = [x for x in headerfields
311                              if not x.startswith('--')]
312        if len(set(not_ignored_fields)) < len(not_ignored_fields):
313            raise FatalCSVError(
314                "Double headers: each column name may only appear once.")
315        return True
316
317    def _getCustomer(self, row, site):
318        NON_VALUES = ['', IGNORE_MARKER]
319        if not 'customers' in site.keys():
320            return None
321        if row.get('customer_id', '') not in NON_VALUES:
322            if row['customer_id'] in site['customers']:
323                customer = site['customers'][row['customer_id']]
324                return customer
325        elif row.get('reg_number', '') not in NON_VALUES:
326            reg_number = row['reg_number']
327            cat = queryUtility(ICatalog, name='customers_catalog')
328            results = list(
329                cat.searchResults(reg_number=(reg_number, reg_number)))
330            if results:
331                return results[0]
332        return None
333
334    def parentsExist(self, row, site):
335        return self.getParent(row, site) is not None
336
337    def entryExists(self, row, site):
338        return self.getEntry(row, site) is not None
339
340    def checkConversion(self, row, mode='ignore'):
341        """Validates all values in row.
342        """
343        converter = IObjectConverter(self.iface)
344        errs, inv_errs, conv_dict =  converter.fromStringDict(
345            row, self.factory_name, mode=mode)
346        return errs, inv_errs, conv_dict
347
348    def getMapping(self, path, headerfields, mode):
349        """Get a mapping from CSV file headerfields to actually used fieldnames.
350        """
351        result = dict()
352        reader = csv.reader(open(path, 'rb'))
353        raw_header = reader.next()
354        for num, field in enumerate(headerfields):
355            if field not in ['customer_id', 'reg_number', 'p_id', 'code', 'level'
356                             ] and mode == 'remove':
357                continue
358            if field == u'--IGNORE--':
359                # Skip ignored columns in failed and finished data files.
360                continue
361            result[raw_header[num]] = field
362        return result
363
364
Note: See TracBrowser for help on using the repository browser.