source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12131

Last change on this file since 12131 was 12131, checked in by Henrik Bettermann, 10 years ago

Adjust tests.

  • Property svn:keywords set to Id
File size: 22.9 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12131 2014-12-03 18:12:59Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from time import time
28from zope.event import notify
29from zope.component import createObject, queryUtility
30from zope.component.hooks import setSite, clearSite
31from zope.catalog.interfaces import ICatalog
32from zope.interface.verify import verifyClass, verifyObject
33from hurry.workflow.interfaces import IWorkflowState
34
35from waeup.ikoba.app import Company
36from waeup.ikoba.interfaces import (
37    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
38from waeup.ikoba.customers.batching import (
39    CustomerProcessor, CustomerDocumentProcessor, ContractProcessor)
40from waeup.ikoba.customers.customer import Customer
41from waeup.ikoba.customers.documents import CustomerSampleDocument
42from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
43
44CUSTOMER_SAMPLE_DATA = open(
45    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
46    'rb').read()
47
48CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
49    '\n')[0].split(',')
50
51CUSTOMER_SAMPLE_DATA_UPDATE = open(
52    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
53    'rb').read()
54
55CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
56    '\n')[0].split(',')
57
58CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
59    os.path.join(os.path.dirname(__file__),
60                 'sample_customer_data_duplicates.csv'),
61    'rb').read()
62
63CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
64    '\n')[0].split(',')
65
66DOCUMENT_SAMPLE_DATA = open(
67    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
68    'rb').read()
69
70DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
71    '\n')[0].split(',')
72
73CONTRACT_SAMPLE_DATA = open(
74    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
75    'rb').read()
76
77CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
78    '\n')[0].split(',')
79
80class CustomerImportExportSetup(FunctionalTestCase):
81
82    layer = FunctionalLayer
83
84    def setUp(self):
85        super(CustomerImportExportSetup, self).setUp()
86        self.dc_root = tempfile.mkdtemp()
87        self.workdir = tempfile.mkdtemp()
88        app = Company()
89        app['datacenter'].setStoragePath(self.dc_root)
90        self.getRootFolder()['app'] = app
91        self.app = self.getRootFolder()['app']
92        setSite(app)
93        self.logfile = os.path.join(
94            self.app['datacenter'].storage, 'logs', 'customers.log')
95
96        #Add some products
97        self.product = createObject('waeup.Product')
98        self.product.product_id = u'SAM'
99        self.product.title = u'Our Samle Product'
100        self.product.contract_category = u'sample'
101        self.app['products'].addProduct(self.product)
102        return
103
104    def tearDown(self):
105        super(CustomerImportExportSetup, self).tearDown()
106        shutil.rmtree(self.workdir)
107        shutil.rmtree(self.dc_root)
108        clearSite()
109        return
110
111    def setup_for_export(self):
112        customer = Customer()
113        customer.customer_id = u'A111111'
114        self.app['customers'][customer.customer_id] = self.customer = customer
115        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
116        return
117
118    def setup_customer(self, customer):
119        customer.matric_number = u'234'
120        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
121        customer.reg_number = u'123'
122        customer.firstname = u'Anna'
123        customer.lastname = u'Tester'
124        customer.middlename = u'M.'
125        customer.date_of_birth = datetime.date(1981, 2, 4)
126        #customer.sex = 'f'
127        customer.email = 'anna@sample.com'
128        customer.phone = u'+234-123-12345'
129        customer.notice = u'Some notice\nin lines.'
130        customer.nationality = u'NG'
131        # Add document
132        document = createObject('waeup.CustomerSampleDocument')
133        document.title = u'My Document'
134        customer['documents'].addDocument(document)
135        IWorkflowState(document).setState(VERIFIED)
136        self.document = document
137        # Add contract
138        contract = createObject('waeup.SampleContract')
139        contract.title = u'My Contract'
140        customer['contracts'].addContract(contract)
141        self.contract = contract
142        return customer
143
144class CustomerProcessorTest(CustomerImportExportSetup):
145
146    layer = FunctionalLayer
147
148    def setUp(self):
149        super(CustomerProcessorTest, self).setUp()
150
151        # Add customer with subobjects
152        customer = Customer()
153        self.app['customers'].addCustomer(customer)
154        customer = self.setup_customer(customer)
155        notify(grok.ObjectModifiedEvent(customer))
156        self.customer = self.app['customers'][customer.customer_id]
157
158        self.processor = CustomerProcessor()
159        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
160        self.csv_file_update = os.path.join(
161            self.workdir, 'sample_customer_data_update.csv')
162        self.csv_file_duplicates = os.path.join(
163            self.workdir, 'sample_customer_data_duplicates.csv')
164        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
165        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
166        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
167
168    def test_interface(self):
169        # Make sure we fulfill the interface contracts.
170        assert verifyObject(IBatchProcessor, self.processor) is True
171        assert verifyClass(
172            IBatchProcessor, CustomerProcessor) is True
173
174    def test_parentsExist(self):
175        self.assertFalse(self.processor.parentsExist(None, dict()))
176        self.assertTrue(self.processor.parentsExist(None, self.app))
177
178    def test_entryExists(self):
179        assert self.processor.entryExists(
180            dict(customer_id='ID_NONE'), self.app) is False
181        assert self.processor.entryExists(
182            dict(reg_number='123'), self.app) is True
183
184    def test_getParent(self):
185        parent = self.processor.getParent(None, self.app)
186        assert parent is self.app['customers']
187
188    def test_getEntry(self):
189        assert self.processor.getEntry(
190            dict(customer_id='ID_NONE'), self.app) is None
191        assert self.processor.getEntry(
192            dict(customer_id=self.customer.customer_id), self.app) is self.customer
193
194    def test_addEntry(self):
195        new_customer = Customer()
196        self.processor.addEntry(
197            new_customer, dict(), self.app)
198        assert len(self.app['customers'].keys()) == 2
199
200    def test_checkConversion(self):
201        # Make sure we can check conversions and that the cust_id
202        # counter is not raised during such checks.
203        initial_cust_id = self.app['customers']._curr_cust_id
204        errs, inv_errs, conv_dict = self.processor.checkConversion(
205            dict(reg_number='1', state='approved'))
206        self.assertEqual(len(errs),0)
207        # Empty state is allowed
208        errs, inv_errs, conv_dict = self.processor.checkConversion(
209            dict(reg_number='1', state=''))
210        self.assertEqual(len(errs),0)
211        #self.assertTrue(('state', 'no value provided') in errs)
212        errs, inv_errs, conv_dict = self.processor.checkConversion(
213            dict(reg_number='1', state='nonsense'))
214        self.assertEqual(len(errs),1)
215        self.assertTrue(('state', 'not allowed') in errs)
216        new_cust_id = self.app['customers']._curr_cust_id
217        self.assertEqual(initial_cust_id, new_cust_id)
218        return
219
220    def test_checkUpdateRequirements(self):
221        # Make sure that pg customers can't be updated with wrong transition.
222        err = self.processor.checkUpdateRequirements(self.customer,
223            dict(reg_number='1', state='returning'), self.app)
224        self.assertTrue(err is None)
225
226    def test_delEntry(self):
227        assert self.customer.customer_id in self.app['customers'].keys()
228        self.processor.delEntry(
229            dict(reg_number=self.customer.reg_number), self.app)
230        assert self.customer.customer_id not in self.app['customers'].keys()
231
232    def test_import(self):
233        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
234        num, num_warns, fin_file, fail_file = self.processor.doImport(
235            self.csv_file, CUSTOMER_HEADER_FIELDS)
236        self.assertEqual(num_warns,0)
237        self.assertEqual(len(self.app['customers']), 10)
238        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
239        self.assertEqual(
240            self.app['customers']['X666666'].state, 'requested')
241        # Two new customer_ids have been created.
242        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
243        shutil.rmtree(os.path.dirname(fin_file))
244
245    def test_import_update(self):
246        num, num_warns, fin_file, fail_file = self.processor.doImport(
247            self.csv_file, CUSTOMER_HEADER_FIELDS)
248        shutil.rmtree(os.path.dirname(fin_file))
249        num, num_warns, fin_file, fail_file = self.processor.doImport(
250            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
251        self.assertEqual(num_warns,0)
252        # state has changed
253        self.assertEqual(self.app['customers']['X666666'].state,'approved')
254        # state has not changed
255        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
256        shutil.rmtree(os.path.dirname(fin_file))
257
258    def test_import_duplicate_data(self):
259        num, num_warns, fin_file, fail_file = self.processor.doImport(
260            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
261        content = open(fail_file).read()
262        self.assertEqual(num_warns,2)
263        self.assertEqual(
264            content,
265            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
266            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: reg_number\r\n'
267            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
268
269            )
270        shutil.rmtree(os.path.dirname(fin_file))
271
272class CustomerDocumentProcessorTest(CustomerImportExportSetup):
273
274    def setUp(self):
275        super(CustomerDocumentProcessorTest, self).setUp()
276
277        customer = Customer()
278        self.app['customers'].addCustomer(customer)
279        customer = self.setup_customer(customer)
280        notify(grok.ObjectModifiedEvent(customer))
281        self.customer = self.app['customers'][customer.customer_id]
282
283        # Import customers with subobjects
284        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
285        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
286        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
287            customer_file, CUSTOMER_HEADER_FIELDS)
288        shutil.rmtree(os.path.dirname(fin_file))
289
290        self.processor = CustomerDocumentProcessor()
291        self.csv_file = os.path.join(
292            self.workdir, 'sample_document_data.csv')
293        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
294
295    def test_interface(self):
296        # Make sure we fulfill the interface contracts.
297        assert verifyObject(IBatchProcessor, self.processor) is True
298        assert verifyClass(
299            IBatchProcessor, CustomerDocumentProcessor) is True
300
301    def test_getEntry(self):
302        assert self.processor.getEntry(
303            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
304        assert self.processor.getEntry(
305            dict(customer_id=self.customer.customer_id, document_id='d101'),
306            self.app) is self.customer['documents']['d101']
307
308    def test_delEntry(self):
309        assert self.processor.getEntry(
310            dict(customer_id=self.customer.customer_id, document_id='d101'),
311            self.app) is self.customer['documents']['d101']
312        self.assertEqual(len(self.customer['documents'].keys()),1)
313        self.processor.delEntry(
314            dict(customer_id=self.customer.customer_id, document_id='d101'),
315            self.app)
316        assert self.processor.getEntry(
317            dict(customer_id=self.customer.customer_id, document_id='d101'),
318            self.app) is None
319        self.assertEqual(len(self.customer['documents'].keys()),0)
320
321    def test_addEntry(self):
322        self.assertEqual(len(self.customer['documents'].keys()),1)
323        document1 = createObject(u'waeup.CustomerSampleDocument')
324        document1.document_id = 'd234'
325        self.processor.addEntry(
326            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
327            self.app)
328        self.assertEqual(len(self.customer['documents'].keys()),2)
329        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
330        document2 = createObject(u'waeup.CustomerSampleDocument')
331        document1.document_id = 'nonsense'
332
333    def test_checkConversion(self):
334        errs, inv_errs, conv_dict = self.processor.checkConversion(
335            dict(document_id='d126', class_name='CustomerSampleDocument'))
336        self.assertEqual(len(errs),0)
337        errs, inv_errs, conv_dict = self.processor.checkConversion(
338            dict(document_id='nonsense', class_name='CustomerSampleDocument'))
339        self.assertEqual(len(errs),1)
340        errs, inv_errs, conv_dict = self.processor.checkConversion(
341            dict(document_id='d127', class_name='WrongDocument'))
342        self.assertEqual(len(errs),1)
343        errs, inv_errs, conv_dict = self.processor.checkConversion(
344            dict(document_id='', class_name='CustomerSampleDocument',
345            mode='update'))
346        self.assertEqual(len(errs),1)
347
348    def test_import(self):
349        num, num_warns, fin_file, fail_file = self.processor.doImport(
350            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
351        self.assertEqual(num_warns,1)
352        # document_id must be unique
353        fail_file = open(fail_file).read()
354        self.assertEqual(fail_file,
355            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
356            'CustomerSampleDocument,2,d5,My stolen doc,document_id: id exists\r\n')
357        document = self.processor.getEntry(dict(reg_number='1',
358            document_id='d3'), self.app)
359        self.assertEqual(
360            self.app['customers']['X666666']['documents']['d3'],
361            document)
362        self.assertEqual(document.document_id, 'd3')
363        document = self.processor.getEntry(dict(reg_number='3',
364            document_id='d5'), self.app)
365        shutil.rmtree(os.path.dirname(fin_file))
366        logcontent = open(self.logfile).read()
367        # Logging message from updateEntry
368        self.assertTrue(
369            'INFO - system - CustomerDocument Processor - '
370            'sample_document_data - X666666 - updated: '
371            'document_id=d3, title=My first doc'
372            in logcontent)
373
374    def test_import_update(self):
375        # We perform the same import twice,
376        # the second time in update mode. The number
377        # of warnings must be the same.
378        num, num_warns, fin_file, fail_file = self.processor.doImport(
379            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
380        shutil.rmtree(os.path.dirname(fin_file))
381        num, num_warns, fin_file, fail_file = self.processor.doImport(
382            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
383        # There is one record without document_id and one duplicate
384        self.assertEqual(num_warns,2)
385        shutil.rmtree(os.path.dirname(fin_file))
386
387    def test_import_remove(self):
388        # We perform the same import twice,
389        # the second time in remove mode. The number
390        # of warnings must be the same.
391        num, num_warns, fin_file, fail_file = self.processor.doImport(
392            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
393        shutil.rmtree(os.path.dirname(fin_file))
394        num, num_warns, fin_file, fail_file = self.processor.doImport(
395            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
396        # There is one record without document_id and one duplicate
397        self.assertEqual(num_warns,2)
398        shutil.rmtree(os.path.dirname(fin_file))
399        logcontent = open(self.logfile).read()
400        self.assertTrue(
401            'INFO - system - K1000001 - Document removed: d5'
402            in logcontent)
403
404
405class ContractProcessorTest(CustomerImportExportSetup):
406
407    def setUp(self):
408        super(ContractProcessorTest, self).setUp()
409
410        customer = Customer()
411        self.app['customers'].addCustomer(customer)
412        customer = self.setup_customer(customer)
413        notify(grok.ObjectModifiedEvent(customer))
414        self.customer = self.app['customers'][customer.customer_id]
415
416        # Import customers with subobjects
417        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
418        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
419        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
420            customer_file, CUSTOMER_HEADER_FIELDS)
421        shutil.rmtree(os.path.dirname(fin_file))
422
423        self.processor = ContractProcessor()
424        self.csv_file = os.path.join(
425            self.workdir, 'sample_contract_data.csv')
426        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
427
428    def test_interface(self):
429        # Make sure we fulfill the interface contracts.
430        assert verifyObject(IBatchProcessor, self.processor) is True
431        assert verifyClass(
432            IBatchProcessor, ContractProcessor) is True
433
434    def test_getEntry(self):
435        assert self.processor.getEntry(
436            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
437        assert self.processor.getEntry(
438            dict(customer_id=self.customer.customer_id, contract_id='c101'),
439            self.app) is self.customer['contracts']['c101']
440
441    def test_delEntry(self):
442        assert self.processor.getEntry(
443            dict(customer_id=self.customer.customer_id, contract_id='c101'),
444            self.app) is self.customer['contracts']['c101']
445        self.assertEqual(len(self.customer['contracts'].keys()),1)
446        self.processor.delEntry(
447            dict(customer_id=self.customer.customer_id, contract_id='c101'),
448            self.app)
449        assert self.processor.getEntry(
450            dict(customer_id=self.customer.customer_id, contract_id='c101'),
451            self.app) is None
452        self.assertEqual(len(self.customer['contracts'].keys()),0)
453
454    def test_addEntry(self):
455        self.assertEqual(len(self.customer['contracts'].keys()),1)
456        contract1 = createObject(u'waeup.SampleContract')
457        contract1.contract_id = 'c234'
458        self.processor.addEntry(
459            contract1, dict(customer_id=self.customer.customer_id, contract_id='c234'),
460            self.app)
461        self.assertEqual(len(self.customer['contracts'].keys()),2)
462        self.assertEqual(self.customer['contracts']['c234'].contract_id, 'c234')
463        contract2 = createObject(u'waeup.SampleContract')
464        contract1.contract_id = 'nonsense'
465
466    def test_checkConversion(self):
467        errs, inv_errs, conv_dict = self.processor.checkConversion(
468            dict(contract_id='c126', class_name='SampleContract',
469                 document_object='d101', product_object='SAM'))
470        self.assertEqual(len(errs),0)
471        errs, inv_errs, conv_dict = self.processor.checkConversion(
472            dict(contract_id='nonsense', class_name='SampleContract'))
473        self.assertEqual(len(errs),1)
474        errs, inv_errs, conv_dict = self.processor.checkConversion(
475            dict(contract_id='c127', class_name='WrongContract'))
476        self.assertEqual(len(errs),1)
477
478    def test_import(self):
479        num, num_warns, fin_file, fail_file = self.processor.doImport(
480            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
481        self.assertEqual(num_warns,3)
482        # contract_id must be unique
483        fail_file = open(fail_file).read()
484        self.assertEqual(fail_file,
485            'reg_number,contract_id,title,class_name,product_object,'
486            'document_object,--ERRORS--\r\n'
487            '2,c5,My stolen contract,SampleContract,SAM,d101,'
488            'contract_id: id exists\r\n'
489            '3,c6,My 5th contract,SampleContract,SAM,NONEXISTENT,'
490            'document_object: Invalid value\r\n'
491            '3,c7,My sixt contract,SampleContract,NONEXISTENT,d101,'
492            'product_object: Invalid value\r\n')
493        contract = self.processor.getEntry(dict(reg_number='1',
494            contract_id='c3'), self.app)
495        self.assertEqual(
496            self.app['customers']['X666666']['contracts']['c3'],
497            contract)
498        self.assertEqual(contract.contract_id, 'c3')
499        contract = self.processor.getEntry(dict(reg_number='3',
500            contract_id='c5'), self.app)
501        shutil.rmtree(os.path.dirname(fin_file))
502        logcontent = open(self.logfile).read()
503        # Logging message from updateEntry
504        self.assertTrue(
505            'INFO - system - Contract Processor - '
506            'sample_contract_data - X666666 - updated: '
507            'contract_id=c3, title=My first contract'
508            in logcontent)
509
510    def test_import_update(self):
511        # We perform the same import twice,
512        # the second time in update mode. The number
513        # of warnings must be the same.
514        num, num_warns, fin_file, fail_file = self.processor.doImport(
515            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
516        shutil.rmtree(os.path.dirname(fin_file))
517        num, num_warns, fin_file, fail_file = self.processor.doImport(
518            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
519        # There are one record without contract_id, one duplicate and two
520        # records with wrong object identifier.
521        self.assertEqual(num_warns,4)
522        shutil.rmtree(os.path.dirname(fin_file))
523
524    def test_import_remove(self):
525        # We perform the same import twice,
526        # the second time in remove mode. The number
527        # of warnings must be the same.
528        num, num_warns, fin_file, fail_file = self.processor.doImport(
529            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
530        shutil.rmtree(os.path.dirname(fin_file))
531        num, num_warns, fin_file, fail_file = self.processor.doImport(
532            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
533        # There are one record without contract_id, one duplicate and two
534        # records with wrong object identifier.
535        self.assertEqual(num_warns,4)
536        shutil.rmtree(os.path.dirname(fin_file))
537        logcontent = open(self.logfile).read()
538        self.assertTrue(
539            'INFO - system - K1000001 - Contract removed: c5'
540            in logcontent)
Note: See TracBrowser for help on using the repository browser.