source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12207

Last change on this file since 12207 was 12143, checked in by Henrik Bettermann, 10 years ago

Add contract exporter.

  • Property svn:keywords set to Id
File size: 23.0 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12143 2014-12-04 12:22:03Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from time import time
28from zope.event import notify
29from zope.component import createObject, queryUtility
30from zope.component.hooks import setSite, clearSite
31from zope.catalog.interfaces import ICatalog
32from zope.interface.verify import verifyClass, verifyObject
33from hurry.workflow.interfaces import IWorkflowState
34
35from waeup.ikoba.app import Company
36from waeup.ikoba.interfaces import (
37    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
38from waeup.ikoba.customers.batching import (
39    CustomerProcessor, CustomerDocumentProcessor, ContractProcessor)
40from waeup.ikoba.customers.customer import Customer
41from waeup.ikoba.customers.documents import CustomerSampleDocument
42from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
43
44CUSTOMER_SAMPLE_DATA = open(
45    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
46    'rb').read()
47
48CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
49    '\n')[0].split(',')
50
51CUSTOMER_SAMPLE_DATA_UPDATE = open(
52    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
53    'rb').read()
54
55CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
56    '\n')[0].split(',')
57
58CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
59    os.path.join(os.path.dirname(__file__),
60                 'sample_customer_data_duplicates.csv'),
61    'rb').read()
62
63CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
64    '\n')[0].split(',')
65
66DOCUMENT_SAMPLE_DATA = open(
67    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
68    'rb').read()
69
70DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
71    '\n')[0].split(',')
72
73CONTRACT_SAMPLE_DATA = open(
74    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
75    'rb').read()
76
77CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
78    '\n')[0].split(',')
79
80class CustomerImportExportSetup(FunctionalTestCase):
81
82    layer = FunctionalLayer
83
84    def setUp(self):
85        super(CustomerImportExportSetup, self).setUp()
86        self.dc_root = tempfile.mkdtemp()
87        self.workdir = tempfile.mkdtemp()
88        app = Company()
89        app['datacenter'].setStoragePath(self.dc_root)
90        self.getRootFolder()['app'] = app
91        self.app = self.getRootFolder()['app']
92        setSite(app)
93        self.logfile = os.path.join(
94            self.app['datacenter'].storage, 'logs', 'customers.log')
95
96        #Add some products
97        self.product = createObject('waeup.Product')
98        self.product.product_id = u'SAM'
99        self.product.title = u'Our Samle Product'
100        self.product.contract_category = u'sample'
101        self.app['products'].addProduct(self.product)
102        return
103
104    def tearDown(self):
105        super(CustomerImportExportSetup, self).tearDown()
106        shutil.rmtree(self.workdir)
107        shutil.rmtree(self.dc_root)
108        clearSite()
109        return
110
111    def setup_for_export(self):
112        customer = Customer()
113        customer.customer_id = u'A111111'
114        self.app['customers'][customer.customer_id] = self.customer = customer
115        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
116        return
117
118    def setup_customer(self, customer):
119        customer.matric_number = u'234'
120        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
121        customer.reg_number = u'123'
122        customer.firstname = u'Anna'
123        customer.lastname = u'Tester'
124        customer.middlename = u'M.'
125        customer.date_of_birth = datetime.date(1981, 2, 4)
126        #customer.sex = 'f'
127        customer.email = 'anna@sample.com'
128        customer.phone = u'+234-123-12345'
129        customer.notice = u'Some notice\nin lines.'
130        customer.nationality = u'NG'
131        # Add document
132        document = createObject('waeup.CustomerSampleDocument')
133        document.title = u'My Document'
134        customer['documents'].addDocument(document)
135        IWorkflowState(document).setState(VERIFIED)
136        self.document = document
137        # Add contract
138        contract = createObject('waeup.SampleContract')
139        contract.title = u'My Contract'
140        contract.product_object = self.product
141        contract.document_object = self.document
142        customer['contracts'].addContract(contract)
143        self.contract = contract
144        return customer
145
146class CustomerProcessorTest(CustomerImportExportSetup):
147
148    layer = FunctionalLayer
149
150    def setUp(self):
151        super(CustomerProcessorTest, self).setUp()
152
153        # Add customer with subobjects
154        customer = Customer()
155        self.app['customers'].addCustomer(customer)
156        customer = self.setup_customer(customer)
157        notify(grok.ObjectModifiedEvent(customer))
158        self.customer = self.app['customers'][customer.customer_id]
159
160        self.processor = CustomerProcessor()
161        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
162        self.csv_file_update = os.path.join(
163            self.workdir, 'sample_customer_data_update.csv')
164        self.csv_file_duplicates = os.path.join(
165            self.workdir, 'sample_customer_data_duplicates.csv')
166        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
167        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
168        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
169
170    def test_interface(self):
171        # Make sure we fulfill the interface contracts.
172        assert verifyObject(IBatchProcessor, self.processor) is True
173        assert verifyClass(
174            IBatchProcessor, CustomerProcessor) is True
175
176    def test_parentsExist(self):
177        self.assertFalse(self.processor.parentsExist(None, dict()))
178        self.assertTrue(self.processor.parentsExist(None, self.app))
179
180    def test_entryExists(self):
181        assert self.processor.entryExists(
182            dict(customer_id='ID_NONE'), self.app) is False
183        assert self.processor.entryExists(
184            dict(reg_number='123'), self.app) is True
185
186    def test_getParent(self):
187        parent = self.processor.getParent(None, self.app)
188        assert parent is self.app['customers']
189
190    def test_getEntry(self):
191        assert self.processor.getEntry(
192            dict(customer_id='ID_NONE'), self.app) is None
193        assert self.processor.getEntry(
194            dict(customer_id=self.customer.customer_id), self.app) is self.customer
195
196    def test_addEntry(self):
197        new_customer = Customer()
198        self.processor.addEntry(
199            new_customer, dict(), self.app)
200        assert len(self.app['customers'].keys()) == 2
201
202    def test_checkConversion(self):
203        # Make sure we can check conversions and that the cust_id
204        # counter is not raised during such checks.
205        initial_cust_id = self.app['customers']._curr_cust_id
206        errs, inv_errs, conv_dict = self.processor.checkConversion(
207            dict(reg_number='1', state='approved'))
208        self.assertEqual(len(errs),0)
209        # Empty state is allowed
210        errs, inv_errs, conv_dict = self.processor.checkConversion(
211            dict(reg_number='1', state=''))
212        self.assertEqual(len(errs),0)
213        #self.assertTrue(('state', 'no value provided') in errs)
214        errs, inv_errs, conv_dict = self.processor.checkConversion(
215            dict(reg_number='1', state='nonsense'))
216        self.assertEqual(len(errs),1)
217        self.assertTrue(('state', 'not allowed') in errs)
218        new_cust_id = self.app['customers']._curr_cust_id
219        self.assertEqual(initial_cust_id, new_cust_id)
220        return
221
222    def test_checkUpdateRequirements(self):
223        # Make sure that pg customers can't be updated with wrong transition.
224        err = self.processor.checkUpdateRequirements(self.customer,
225            dict(reg_number='1', state='returning'), self.app)
226        self.assertTrue(err is None)
227
228    def test_delEntry(self):
229        assert self.customer.customer_id in self.app['customers'].keys()
230        self.processor.delEntry(
231            dict(reg_number=self.customer.reg_number), self.app)
232        assert self.customer.customer_id not in self.app['customers'].keys()
233
234    def test_import(self):
235        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
236        num, num_warns, fin_file, fail_file = self.processor.doImport(
237            self.csv_file, CUSTOMER_HEADER_FIELDS)
238        self.assertEqual(num_warns,0)
239        self.assertEqual(len(self.app['customers']), 10)
240        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
241        self.assertEqual(
242            self.app['customers']['X666666'].state, 'requested')
243        # Two new customer_ids have been created.
244        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
245        shutil.rmtree(os.path.dirname(fin_file))
246
247    def test_import_update(self):
248        num, num_warns, fin_file, fail_file = self.processor.doImport(
249            self.csv_file, CUSTOMER_HEADER_FIELDS)
250        shutil.rmtree(os.path.dirname(fin_file))
251        num, num_warns, fin_file, fail_file = self.processor.doImport(
252            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
253        self.assertEqual(num_warns,0)
254        # state has changed
255        self.assertEqual(self.app['customers']['X666666'].state,'approved')
256        # state has not changed
257        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
258        shutil.rmtree(os.path.dirname(fin_file))
259
260    def test_import_duplicate_data(self):
261        num, num_warns, fin_file, fail_file = self.processor.doImport(
262            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
263        content = open(fail_file).read()
264        self.assertEqual(num_warns,2)
265        self.assertEqual(
266            content,
267            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
268            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: reg_number\r\n'
269            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
270
271            )
272        shutil.rmtree(os.path.dirname(fin_file))
273
274class CustomerDocumentProcessorTest(CustomerImportExportSetup):
275
276    def setUp(self):
277        super(CustomerDocumentProcessorTest, self).setUp()
278
279        customer = Customer()
280        self.app['customers'].addCustomer(customer)
281        customer = self.setup_customer(customer)
282        notify(grok.ObjectModifiedEvent(customer))
283        self.customer = self.app['customers'][customer.customer_id]
284
285        # Import customers with subobjects
286        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
287        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
288        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
289            customer_file, CUSTOMER_HEADER_FIELDS)
290        shutil.rmtree(os.path.dirname(fin_file))
291
292        self.processor = CustomerDocumentProcessor()
293        self.csv_file = os.path.join(
294            self.workdir, 'sample_document_data.csv')
295        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
296
297    def test_interface(self):
298        # Make sure we fulfill the interface contracts.
299        assert verifyObject(IBatchProcessor, self.processor) is True
300        assert verifyClass(
301            IBatchProcessor, CustomerDocumentProcessor) is True
302
303    def test_getEntry(self):
304        assert self.processor.getEntry(
305            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
306        assert self.processor.getEntry(
307            dict(customer_id=self.customer.customer_id, document_id='d101'),
308            self.app) is self.customer['documents']['d101']
309
310    def test_delEntry(self):
311        assert self.processor.getEntry(
312            dict(customer_id=self.customer.customer_id, document_id='d101'),
313            self.app) is self.customer['documents']['d101']
314        self.assertEqual(len(self.customer['documents'].keys()),1)
315        self.processor.delEntry(
316            dict(customer_id=self.customer.customer_id, document_id='d101'),
317            self.app)
318        assert self.processor.getEntry(
319            dict(customer_id=self.customer.customer_id, document_id='d101'),
320            self.app) is None
321        self.assertEqual(len(self.customer['documents'].keys()),0)
322
323    def test_addEntry(self):
324        self.assertEqual(len(self.customer['documents'].keys()),1)
325        document1 = createObject(u'waeup.CustomerSampleDocument')
326        document1.document_id = 'd234'
327        self.processor.addEntry(
328            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
329            self.app)
330        self.assertEqual(len(self.customer['documents'].keys()),2)
331        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
332        document2 = createObject(u'waeup.CustomerSampleDocument')
333        document1.document_id = 'nonsense'
334
335    def test_checkConversion(self):
336        errs, inv_errs, conv_dict = self.processor.checkConversion(
337            dict(document_id='d126', class_name='CustomerSampleDocument'))
338        self.assertEqual(len(errs),0)
339        errs, inv_errs, conv_dict = self.processor.checkConversion(
340            dict(document_id='nonsense', class_name='CustomerSampleDocument'))
341        self.assertEqual(len(errs),1)
342        errs, inv_errs, conv_dict = self.processor.checkConversion(
343            dict(document_id='d127', class_name='WrongDocument'))
344        self.assertEqual(len(errs),1)
345        errs, inv_errs, conv_dict = self.processor.checkConversion(
346            dict(document_id='', class_name='CustomerSampleDocument',
347            mode='update'))
348        self.assertEqual(len(errs),1)
349
350    def test_import(self):
351        num, num_warns, fin_file, fail_file = self.processor.doImport(
352            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
353        self.assertEqual(num_warns,1)
354        # document_id must be unique
355        fail_file = open(fail_file).read()
356        self.assertEqual(fail_file,
357            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
358            'CustomerSampleDocument,2,d5,My stolen doc,document_id: id exists\r\n')
359        document = self.processor.getEntry(dict(reg_number='1',
360            document_id='d3'), self.app)
361        self.assertEqual(
362            self.app['customers']['X666666']['documents']['d3'],
363            document)
364        self.assertEqual(document.document_id, 'd3')
365        document = self.processor.getEntry(dict(reg_number='3',
366            document_id='d5'), self.app)
367        shutil.rmtree(os.path.dirname(fin_file))
368        logcontent = open(self.logfile).read()
369        # Logging message from updateEntry
370        self.assertTrue(
371            'INFO - system - CustomerDocument Processor - '
372            'sample_document_data - X666666 - updated: '
373            'document_id=d3, title=My first doc'
374            in logcontent)
375
376    def test_import_update(self):
377        # We perform the same import twice,
378        # the second time in update mode. The number
379        # of warnings must be the same.
380        num, num_warns, fin_file, fail_file = self.processor.doImport(
381            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
382        shutil.rmtree(os.path.dirname(fin_file))
383        num, num_warns, fin_file, fail_file = self.processor.doImport(
384            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
385        # There is one record without document_id and one duplicate
386        self.assertEqual(num_warns,2)
387        shutil.rmtree(os.path.dirname(fin_file))
388
389    def test_import_remove(self):
390        # We perform the same import twice,
391        # the second time in remove mode. The number
392        # of warnings must be the same.
393        num, num_warns, fin_file, fail_file = self.processor.doImport(
394            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
395        shutil.rmtree(os.path.dirname(fin_file))
396        num, num_warns, fin_file, fail_file = self.processor.doImport(
397            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
398        # There is one record without document_id and one duplicate
399        self.assertEqual(num_warns,2)
400        shutil.rmtree(os.path.dirname(fin_file))
401        logcontent = open(self.logfile).read()
402        self.assertTrue(
403            'INFO - system - K1000001 - Document removed: d5'
404            in logcontent)
405
406
407class ContractProcessorTest(CustomerImportExportSetup):
408
409    def setUp(self):
410        super(ContractProcessorTest, self).setUp()
411
412        customer = Customer()
413        self.app['customers'].addCustomer(customer)
414        customer = self.setup_customer(customer)
415        notify(grok.ObjectModifiedEvent(customer))
416        self.customer = self.app['customers'][customer.customer_id]
417
418        # Import customers with subobjects
419        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
420        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
421        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
422            customer_file, CUSTOMER_HEADER_FIELDS)
423        shutil.rmtree(os.path.dirname(fin_file))
424
425        self.processor = ContractProcessor()
426        self.csv_file = os.path.join(
427            self.workdir, 'sample_contract_data.csv')
428        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
429
430    def test_interface(self):
431        # Make sure we fulfill the interface contracts.
432        assert verifyObject(IBatchProcessor, self.processor) is True
433        assert verifyClass(
434            IBatchProcessor, ContractProcessor) is True
435
436    def test_getEntry(self):
437        assert self.processor.getEntry(
438            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
439        assert self.processor.getEntry(
440            dict(customer_id=self.customer.customer_id, contract_id='c101'),
441            self.app) is self.customer['contracts']['c101']
442
443    def test_delEntry(self):
444        assert self.processor.getEntry(
445            dict(customer_id=self.customer.customer_id, contract_id='c101'),
446            self.app) is self.customer['contracts']['c101']
447        self.assertEqual(len(self.customer['contracts'].keys()),1)
448        self.processor.delEntry(
449            dict(customer_id=self.customer.customer_id, contract_id='c101'),
450            self.app)
451        assert self.processor.getEntry(
452            dict(customer_id=self.customer.customer_id, contract_id='c101'),
453            self.app) is None
454        self.assertEqual(len(self.customer['contracts'].keys()),0)
455
456    def test_addEntry(self):
457        self.assertEqual(len(self.customer['contracts'].keys()),1)
458        contract1 = createObject(u'waeup.SampleContract')
459        contract1.contract_id = 'c234'
460        self.processor.addEntry(
461            contract1, dict(customer_id=self.customer.customer_id, contract_id='c234'),
462            self.app)
463        self.assertEqual(len(self.customer['contracts'].keys()),2)
464        self.assertEqual(self.customer['contracts']['c234'].contract_id, 'c234')
465        contract2 = createObject(u'waeup.SampleContract')
466        contract1.contract_id = 'nonsense'
467
468    def test_checkConversion(self):
469        errs, inv_errs, conv_dict = self.processor.checkConversion(
470            dict(contract_id='c126', class_name='SampleContract',
471                 document_object='d101', product_object='SAM'))
472        self.assertEqual(len(errs),0)
473        errs, inv_errs, conv_dict = self.processor.checkConversion(
474            dict(contract_id='nonsense', class_name='SampleContract'))
475        self.assertEqual(len(errs),1)
476        errs, inv_errs, conv_dict = self.processor.checkConversion(
477            dict(contract_id='c127', class_name='WrongContract'))
478        self.assertEqual(len(errs),1)
479
480    def test_import(self):
481        num, num_warns, fin_file, fail_file = self.processor.doImport(
482            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
483        self.assertEqual(num_warns,3)
484        # contract_id must be unique
485        fail_file = open(fail_file).read()
486        self.assertEqual(fail_file,
487            'reg_number,contract_id,title,class_name,product_object,'
488            'document_object,--ERRORS--\r\n'
489            '2,c5,My stolen contract,SampleContract,SAM,d101,'
490            'contract_id: id exists\r\n'
491            '3,c6,My 5th contract,SampleContract,SAM,NONEXISTENT,'
492            'document_object: Invalid value\r\n'
493            '3,c7,My sixt contract,SampleContract,NONEXISTENT,d101,'
494            'product_object: Invalid value\r\n')
495        contract = self.processor.getEntry(dict(reg_number='1',
496            contract_id='c3'), self.app)
497        self.assertEqual(
498            self.app['customers']['X666666']['contracts']['c3'],
499            contract)
500        self.assertEqual(contract.contract_id, 'c3')
501        contract = self.processor.getEntry(dict(reg_number='3',
502            contract_id='c5'), self.app)
503        shutil.rmtree(os.path.dirname(fin_file))
504        logcontent = open(self.logfile).read()
505        # Logging message from updateEntry
506        self.assertTrue(
507            'INFO - system - Contract Processor - '
508            'sample_contract_data - X666666 - updated: '
509            'contract_id=c3, title=My first contract'
510            in logcontent)
511
512    def test_import_update(self):
513        # We perform the same import twice,
514        # the second time in update mode. The number
515        # of warnings must be the same.
516        num, num_warns, fin_file, fail_file = self.processor.doImport(
517            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
518        shutil.rmtree(os.path.dirname(fin_file))
519        num, num_warns, fin_file, fail_file = self.processor.doImport(
520            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
521        # There are one record without contract_id, one duplicate and two
522        # records with wrong object identifier.
523        self.assertEqual(num_warns,4)
524        shutil.rmtree(os.path.dirname(fin_file))
525
526    def test_import_remove(self):
527        # We perform the same import twice,
528        # the second time in remove mode. The number
529        # of warnings must be the same.
530        num, num_warns, fin_file, fail_file = self.processor.doImport(
531            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
532        shutil.rmtree(os.path.dirname(fin_file))
533        num, num_warns, fin_file, fail_file = self.processor.doImport(
534            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
535        # There are one record without contract_id, one duplicate and two
536        # records with wrong object identifier.
537        self.assertEqual(num_warns,4)
538        shutil.rmtree(os.path.dirname(fin_file))
539        logcontent = open(self.logfile).read()
540        self.assertTrue(
541            'INFO - system - K1000001 - Contract removed: c5'
542            in logcontent)
Note: See TracBrowser for help on using the repository browser.