source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 11995

Last change on this file since 11995 was 11995, checked in by Henrik Bettermann, 10 years ago

Add batch processor for customer documents.

File size: 15.3 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 11756 2014-07-09 12:46:08Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from time import time
28from zope.event import notify
29from zope.component import createObject, queryUtility
30from zope.component.hooks import setSite, clearSite
31from zope.catalog.interfaces import ICatalog
32from zope.interface.verify import verifyClass, verifyObject
33from hurry.workflow.interfaces import IWorkflowState
34
35from waeup.ikoba.app import Company
36from waeup.ikoba.interfaces import IBatchProcessor, FatalCSVError, IUserAccount
37from waeup.ikoba.customers.batching import (
38    CustomerProcessor, CustomerDocumentProcessor)
39from waeup.ikoba.customers.customer import Customer
40from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
41
42CUSTOMER_SAMPLE_DATA = open(
43    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
44    'rb').read()
45
46CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
47    '\n')[0].split(',')
48
49CUSTOMER_SAMPLE_DATA_UPDATE = open(
50    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
51    'rb').read()
52
53CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
54    '\n')[0].split(',')
55
56CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
57    os.path.join(os.path.dirname(__file__),
58                 'sample_customer_data_duplicates.csv'),
59    'rb').read()
60
61CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
62    '\n')[0].split(',')
63
64DOCUMENT_SAMPLE_DATA = open(
65    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
66    'rb').read()
67
68DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
69    '\n')[0].split(',')
70
71class CustomerImportExportSetup(FunctionalTestCase):
72
73    layer = FunctionalLayer
74
75    def setUp(self):
76        super(CustomerImportExportSetup, self).setUp()
77        self.dc_root = tempfile.mkdtemp()
78        self.workdir = tempfile.mkdtemp()
79        app = Company()
80        app['datacenter'].setStoragePath(self.dc_root)
81        self.getRootFolder()['app'] = app
82        self.app = self.getRootFolder()['app']
83        setSite(app)
84
85        self.logfile = os.path.join(
86            self.app['datacenter'].storage, 'logs', 'customers.log')
87        return
88
89    def tearDown(self):
90        super(CustomerImportExportSetup, self).tearDown()
91        shutil.rmtree(self.workdir)
92        shutil.rmtree(self.dc_root)
93        clearSite()
94        return
95
96    def setup_for_export(self):
97        customer = Customer()
98        customer.customer_id = u'A111111'
99        self.app['customers'][customer.customer_id] = self.customer = customer
100        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
101        return
102
103    def setup_customer(self, customer):
104        # set predictable values for `customer`
105        customer.matric_number = u'234'
106        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
107        customer.reg_number = u'123'
108        customer.firstname = u'Anna'
109        customer.lastname = u'Tester'
110        customer.middlename = u'M.'
111        customer.date_of_birth = datetime.date(1981, 2, 4)
112        #customer.sex = 'f'
113        customer.email = 'anna@sample.com'
114        customer.phone = u'+234-123-12345'
115        customer.notice = u'Some notice\nin lines.'
116        customer.nationality = u'NG'
117
118        return customer
119
120class CustomerProcessorTest(CustomerImportExportSetup):
121
122    layer = FunctionalLayer
123
124    def setUp(self):
125        super(CustomerProcessorTest, self).setUp()
126
127        # Add customer with subobjects
128        customer = Customer()
129        self.app['customers'].addCustomer(customer)
130        customer = self.setup_customer(customer)
131        notify(grok.ObjectModifiedEvent(customer))
132        self.customer = self.app['customers'][customer.customer_id]
133
134        self.processor = CustomerProcessor()
135        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
136        self.csv_file_update = os.path.join(
137            self.workdir, 'sample_customer_data_update.csv')
138        self.csv_file_duplicates = os.path.join(
139            self.workdir, 'sample_customer_data_duplicates.csv')
140        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
141        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
142        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
143
144    def test_interface(self):
145        # Make sure we fulfill the interface contracts.
146        assert verifyObject(IBatchProcessor, self.processor) is True
147        assert verifyClass(
148            IBatchProcessor, CustomerProcessor) is True
149
150    def test_parentsExist(self):
151        self.assertFalse(self.processor.parentsExist(None, dict()))
152        self.assertTrue(self.processor.parentsExist(None, self.app))
153
154    def test_entryExists(self):
155        assert self.processor.entryExists(
156            dict(customer_id='ID_NONE'), self.app) is False
157        assert self.processor.entryExists(
158            dict(reg_number='123'), self.app) is True
159
160    def test_getParent(self):
161        parent = self.processor.getParent(None, self.app)
162        assert parent is self.app['customers']
163
164    def test_getEntry(self):
165        assert self.processor.getEntry(
166            dict(customer_id='ID_NONE'), self.app) is None
167        assert self.processor.getEntry(
168            dict(customer_id=self.customer.customer_id), self.app) is self.customer
169
170    def test_addEntry(self):
171        new_customer = Customer()
172        self.processor.addEntry(
173            new_customer, dict(), self.app)
174        assert len(self.app['customers'].keys()) == 2
175
176    def test_checkConversion(self):
177        # Make sure we can check conversions and that the cust_id
178        # counter is not raised during such checks.
179        initial_cust_id = self.app['customers']._curr_cust_id
180        errs, inv_errs, conv_dict = self.processor.checkConversion(
181            dict(reg_number='1', state='approved'))
182        self.assertEqual(len(errs),0)
183        # Empty state is allowed
184        errs, inv_errs, conv_dict = self.processor.checkConversion(
185            dict(reg_number='1', state=''))
186        self.assertEqual(len(errs),0)
187        #self.assertTrue(('state', 'no value provided') in errs)
188        errs, inv_errs, conv_dict = self.processor.checkConversion(
189            dict(reg_number='1', state='nonsense'))
190        self.assertEqual(len(errs),1)
191        self.assertTrue(('state', 'not allowed') in errs)
192        new_cust_id = self.app['customers']._curr_cust_id
193        self.assertEqual(initial_cust_id, new_cust_id)
194        return
195
196    def test_checkUpdateRequirements(self):
197        # Make sure that pg customers can't be updated with wrong transition.
198        err = self.processor.checkUpdateRequirements(self.customer,
199            dict(reg_number='1', state='returning'), self.app)
200        self.assertTrue(err is None)
201
202    def test_delEntry(self):
203        assert self.customer.customer_id in self.app['customers'].keys()
204        self.processor.delEntry(
205            dict(reg_number=self.customer.reg_number), self.app)
206        assert self.customer.customer_id not in self.app['customers'].keys()
207
208    def test_import(self):
209        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
210        num, num_warns, fin_file, fail_file = self.processor.doImport(
211            self.csv_file, CUSTOMER_HEADER_FIELDS)
212        self.assertEqual(num_warns,0)
213        self.assertEqual(len(self.app['customers']), 10)
214        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
215        self.assertEqual(
216            self.app['customers']['X666666'].state, 'requested')
217        # Two new customer_ids have been created.
218        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
219        shutil.rmtree(os.path.dirname(fin_file))
220
221    def test_import_update(self):
222        num, num_warns, fin_file, fail_file = self.processor.doImport(
223            self.csv_file, CUSTOMER_HEADER_FIELDS)
224        shutil.rmtree(os.path.dirname(fin_file))
225        num, num_warns, fin_file, fail_file = self.processor.doImport(
226            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
227        self.assertEqual(num_warns,0)
228        # state has changed
229        self.assertEqual(self.app['customers']['X666666'].state,'approved')
230        # state has not changed
231        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
232        shutil.rmtree(os.path.dirname(fin_file))
233
234    def test_import_duplicate_data(self):
235        num, num_warns, fin_file, fail_file = self.processor.doImport(
236            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
237        content = open(fail_file).read()
238        self.assertEqual(num_warns,2)
239        self.assertEqual(
240            content,
241            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
242            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: reg_number\r\n'
243            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
244
245            )
246        shutil.rmtree(os.path.dirname(fin_file))
247
248class CustomerDocumentProcessorTest(CustomerImportExportSetup):
249
250    def setUp(self):
251        super(CustomerDocumentProcessorTest, self).setUp()
252
253        # Add customer with document
254        customer = Customer()
255        customer.firstname = u'Anna'
256        customer.lastname = u'Tester'
257        customer.reg_number = u'123'
258        self.app['customers'].addCustomer(customer)
259        self.customer = self.app['customers'][customer.customer_id]
260        document = createObject(u'waeup.CustomerDocument')
261        document.id = 'd120'
262        self.customer['documents'][document.id] = document
263
264        # Import customers with subobjects
265        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
266        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
267        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
268            customer_file, CUSTOMER_HEADER_FIELDS)
269        shutil.rmtree(os.path.dirname(fin_file))
270
271        self.processor = CustomerDocumentProcessor()
272        self.csv_file = os.path.join(
273            self.workdir, 'sample_document_data.csv')
274        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
275
276    def test_interface(self):
277        # Make sure we fulfill the interface contracts.
278        assert verifyObject(IBatchProcessor, self.processor) is True
279        assert verifyClass(
280            IBatchProcessor, CustomerDocumentProcessor) is True
281
282    def test_getEntry(self):
283        assert self.processor.getEntry(
284            dict(customer_id='ID_NONE', id='nonsense'), self.app) is None
285        assert self.processor.getEntry(
286            dict(customer_id=self.customer.customer_id, id='d120'),
287            self.app) is self.customer['documents']['d120']
288
289    def test_delEntry(self):
290        assert self.processor.getEntry(
291            dict(customer_id=self.customer.customer_id, id='d120'),
292            self.app) is self.customer['documents']['d120']
293        self.assertEqual(len(self.customer['documents'].keys()),1)
294        self.processor.delEntry(
295            dict(customer_id=self.customer.customer_id, id='d120'),
296            self.app)
297        assert self.processor.getEntry(
298            dict(customer_id=self.customer.customer_id, id='d120'),
299            self.app) is None
300        self.assertEqual(len(self.customer['documents'].keys()),0)
301
302    def test_addEntry(self):
303        self.assertEqual(len(self.customer['documents'].keys()),1)
304        document1 = createObject(u'waeup.CustomerDocument')
305        document1.id = 'p234'
306        self.processor.addEntry(
307            document1, dict(customer_id=self.customer.customer_id, id='p234'),
308            self.app)
309        self.assertEqual(len(self.customer['documents'].keys()),2)
310        self.assertEqual(self.customer['documents']['p234'].id, 'p234')
311        document2 = createObject(u'waeup.CustomerDocument')
312        document1.id = 'nonsense'
313
314    def test_checkConversion(self):
315        errs, inv_errs, conv_dict = self.processor.checkConversion(
316            dict(id='d1266236341955'))
317        self.assertEqual(len(errs),0)
318        errs, inv_errs, conv_dict = self.processor.checkConversion(
319            dict(id='nonsense'))
320        self.assertEqual(len(errs),1)
321        timestamp = ("%d" % int(time()*10000))[1:]
322        id = "d%s" % timestamp
323        errs, inv_errs, conv_dict = self.processor.checkConversion(
324            dict(id=id))
325        self.assertEqual(len(errs),0)
326
327    def test_import(self):
328        num, num_warns, fin_file, fail_file = self.processor.doImport(
329            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
330        self.assertEqual(num_warns,0)
331        document = self.processor.getEntry(dict(reg_number='1',
332            id='d1266236341953'), self.app)
333        self.assertEqual(
334            self.app['customers']['X666666']['documents']['d1266236341953'],
335            document)
336        self.assertEqual(document.id, 'd1266236341953')
337        document = self.processor.getEntry(dict(reg_number='3',
338            id='d1266236341955'), self.app)
339        shutil.rmtree(os.path.dirname(fin_file))
340        logcontent = open(self.logfile).read()
341        # Logging message from updateEntry
342        self.assertTrue(
343            'INFO - system - CustomerDocument Processor - '
344            'sample_document_data - X666666 - updated: '
345            'id=d1266236341953, title=My first doc'
346            in logcontent)
347
348    def test_import_update(self):
349        # We perform the same import twice,
350        # the second time in update mode. The number
351        # of warnings must be the same.
352        num, num_warns, fin_file, fail_file = self.processor.doImport(
353            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
354        shutil.rmtree(os.path.dirname(fin_file))
355        num, num_warns, fin_file, fail_file = self.processor.doImport(
356            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
357        self.assertEqual(num_warns,1)  # There is one record without id
358        shutil.rmtree(os.path.dirname(fin_file))
359
360    def test_import_remove(self):
361        # We perform the same import twice,
362        # the second time in remove mode. The number
363        # of warnings must be the same.
364        num, num_warns, fin_file, fail_file = self.processor.doImport(
365            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
366        shutil.rmtree(os.path.dirname(fin_file))
367        num, num_warns, fin_file, fail_file = self.processor.doImport(
368            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
369        self.assertEqual(num_warns,1)  # There is one record without id
370        shutil.rmtree(os.path.dirname(fin_file))
371        logcontent = open(self.logfile).read()
372        self.assertTrue(
373            'INFO - system - K1000001 - Document removed: d1266236341955'
374            in logcontent)
Note: See TracBrowser for help on using the repository browser.