source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12281

Last change on this file since 12281 was 12281, checked in by Henrik Bettermann, 10 years ago

Reorganize batch processor classes to ease customization.

  • Property svn:keywords set to Id
File size: 23.0 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12281 2014-12-21 10:35:54Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from time import time
28from zope.event import notify
29from zope.component import createObject, queryUtility
30from zope.component.hooks import setSite, clearSite
31from zope.catalog.interfaces import ICatalog
32from zope.interface.verify import verifyClass, verifyObject
33from hurry.workflow.interfaces import IWorkflowState
34
35from waeup.ikoba.app import Company
36from waeup.ikoba.interfaces import (
37    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
38from waeup.ikoba.customers.batching import (
39    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
40from waeup.ikoba.customers.customer import Customer
41from waeup.ikoba.customers.documents import CustomerSampleDocument
42from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
43
44CUSTOMER_SAMPLE_DATA = open(
45    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
46    'rb').read()
47
48CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
49    '\n')[0].split(',')
50
51CUSTOMER_SAMPLE_DATA_UPDATE = open(
52    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
53    'rb').read()
54
55CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
56    '\n')[0].split(',')
57
58CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
59    os.path.join(os.path.dirname(__file__),
60                 'sample_customer_data_duplicates.csv'),
61    'rb').read()
62
63CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
64    '\n')[0].split(',')
65
66DOCUMENT_SAMPLE_DATA = open(
67    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
68    'rb').read()
69
70DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
71    '\n')[0].split(',')
72
73CONTRACT_SAMPLE_DATA = open(
74    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
75    'rb').read()
76
77CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
78    '\n')[0].split(',')
79
80class CustomerImportExportSetup(FunctionalTestCase):
81
82    layer = FunctionalLayer
83
84    def setUp(self):
85        super(CustomerImportExportSetup, self).setUp()
86        self.dc_root = tempfile.mkdtemp()
87        self.workdir = tempfile.mkdtemp()
88        app = Company()
89        app['datacenter'].setStoragePath(self.dc_root)
90        self.getRootFolder()['app'] = app
91        self.app = self.getRootFolder()['app']
92        setSite(app)
93        self.logfile = os.path.join(
94            self.app['datacenter'].storage, 'logs', 'customers.log')
95
96        #Add some products
97        self.product = createObject('waeup.Product')
98        self.product.product_id = u'SAM'
99        self.product.title = u'Our Samle Product'
100        self.product.contract_category = u'sample'
101        self.app['products'].addProduct(self.product)
102        return
103
104    def tearDown(self):
105        super(CustomerImportExportSetup, self).tearDown()
106        shutil.rmtree(self.workdir)
107        shutil.rmtree(self.dc_root)
108        clearSite()
109        return
110
111    def setup_for_export(self):
112        customer = Customer()
113        customer.customer_id = u'A111111'
114        self.app['customers'][customer.customer_id] = self.customer = customer
115        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
116        return
117
118    def setup_customer(self, customer):
119        customer.matric_number = u'234'
120        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
121        customer.reg_number = u'123'
122        customer.firstname = u'Anna'
123        customer.lastname = u'Tester'
124        customer.middlename = u'M.'
125        customer.date_of_birth = datetime.date(1981, 2, 4)
126        #customer.sex = 'f'
127        customer.email = 'anna@sample.com'
128        customer.phone = u'+234-123-12345'
129        customer.notice = u'Some notice\nin lines.'
130        customer.nationality = u'NG'
131        # Add document
132        document = createObject('waeup.CustomerSampleDocument')
133        document.title = u'My Document'
134        document.document_id = u'DOC1'
135        customer['documents'].addDocument(document)
136        IWorkflowState(document).setState(VERIFIED)
137        self.document = document
138        # Add contract
139        contract = createObject('waeup.SampleContract')
140        contract.title = u'My Contract'
141        contract.contract_id = u'CON1'
142        contract.product_object = self.product
143        contract.document_object = self.document
144        customer['contracts'].addContract(contract)
145        self.contract = contract
146        return customer
147
148class CustomerProcessorTest(CustomerImportExportSetup):
149
150    layer = FunctionalLayer
151
152    def setUp(self):
153        super(CustomerProcessorTest, self).setUp()
154
155        # Add customer
156        customer = Customer()
157        self.app['customers'].addCustomer(customer)
158        customer = self.setup_customer(customer)
159        notify(grok.ObjectModifiedEvent(customer))
160        self.customer = self.app['customers'][customer.customer_id]
161
162        self.processor = CustomerProcessor()
163        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
164        self.csv_file_update = os.path.join(
165            self.workdir, 'sample_customer_data_update.csv')
166        self.csv_file_duplicates = os.path.join(
167            self.workdir, 'sample_customer_data_duplicates.csv')
168        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
169        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
170        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
171
172    def test_interface(self):
173        # Make sure we fulfill the interface contracts.
174        assert verifyObject(IBatchProcessor, self.processor) is True
175        assert verifyClass(
176            IBatchProcessor, CustomerProcessor) is True
177
178    def test_parentsExist(self):
179        self.assertFalse(self.processor.parentsExist(None, dict()))
180        self.assertTrue(self.processor.parentsExist(None, self.app))
181
182    def test_entryExists(self):
183        assert self.processor.entryExists(
184            dict(customer_id='ID_NONE'), self.app) is False
185        assert self.processor.entryExists(
186            dict(reg_number='123'), self.app) is True
187
188    def test_getParent(self):
189        parent = self.processor.getParent(None, self.app)
190        assert parent is self.app['customers']
191
192    def test_getEntry(self):
193        assert self.processor.getEntry(
194            dict(customer_id='ID_NONE'), self.app) is None
195        assert self.processor.getEntry(
196            dict(customer_id=self.customer.customer_id), self.app) is self.customer
197
198    def test_addEntry(self):
199        new_customer = Customer()
200        self.processor.addEntry(
201            new_customer, dict(), self.app)
202        assert len(self.app['customers'].keys()) == 2
203
204    def test_checkConversion(self):
205        # Make sure we can check conversions and that the cust_id
206        # counter is not raised during such checks.
207        initial_cust_id = self.app['customers']._curr_cust_id
208        errs, inv_errs, conv_dict = self.processor.checkConversion(
209            dict(reg_number='1', state='approved'))
210        self.assertEqual(len(errs),0)
211        # Empty state is allowed
212        errs, inv_errs, conv_dict = self.processor.checkConversion(
213            dict(reg_number='1', state=''))
214        self.assertEqual(len(errs),0)
215        #self.assertTrue(('state', 'no value provided') in errs)
216        errs, inv_errs, conv_dict = self.processor.checkConversion(
217            dict(reg_number='1', state='nonsense'))
218        self.assertEqual(len(errs),1)
219        self.assertTrue(('state', 'not allowed') in errs)
220        new_cust_id = self.app['customers']._curr_cust_id
221        self.assertEqual(initial_cust_id, new_cust_id)
222        return
223
224    def test_checkUpdateRequirements(self):
225        # Make sure that pg customers can't be updated with wrong transition.
226        err = self.processor.checkUpdateRequirements(self.customer,
227            dict(reg_number='1', state='returning'), self.app)
228        self.assertTrue(err is None)
229
230    def test_delEntry(self):
231        assert self.customer.customer_id in self.app['customers'].keys()
232        self.processor.delEntry(
233            dict(reg_number=self.customer.reg_number), self.app)
234        assert self.customer.customer_id not in self.app['customers'].keys()
235
236    def test_import(self):
237        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
238        num, num_warns, fin_file, fail_file = self.processor.doImport(
239            self.csv_file, CUSTOMER_HEADER_FIELDS)
240        self.assertEqual(num_warns,0)
241        self.assertEqual(len(self.app['customers']), 10)
242        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
243        self.assertEqual(
244            self.app['customers']['X666666'].state, 'requested')
245        # Two new customer_ids have been created.
246        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
247        shutil.rmtree(os.path.dirname(fin_file))
248
249    def test_import_update(self):
250        num, num_warns, fin_file, fail_file = self.processor.doImport(
251            self.csv_file, CUSTOMER_HEADER_FIELDS)
252        shutil.rmtree(os.path.dirname(fin_file))
253        num, num_warns, fin_file, fail_file = self.processor.doImport(
254            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
255        self.assertEqual(num_warns,0)
256        # state has changed
257        self.assertEqual(self.app['customers']['X666666'].state,'approved')
258        # state has not changed
259        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
260        shutil.rmtree(os.path.dirname(fin_file))
261
262    def test_import_duplicate_data(self):
263        num, num_warns, fin_file, fail_file = self.processor.doImport(
264            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
265        content = open(fail_file).read()
266        self.assertEqual(num_warns,2)
267        self.assertEqual(
268            content,
269            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
270            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: Invalid input\r\n'
271            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
272
273            )
274        shutil.rmtree(os.path.dirname(fin_file))
275
276class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
277
278    def setUp(self):
279        super(CustomerSampleDocumentProcessorTest, self).setUp()
280
281        customer = Customer()
282        self.app['customers'].addCustomer(customer)
283        customer = self.setup_customer(customer)
284        notify(grok.ObjectModifiedEvent(customer))
285        self.customer = self.app['customers'][customer.customer_id]
286
287        # Import customers with subobjects
288        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
289        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
290        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
291            customer_file, CUSTOMER_HEADER_FIELDS)
292        shutil.rmtree(os.path.dirname(fin_file))
293
294        self.processor = CustomerSampleDocumentProcessor()
295        self.csv_file = os.path.join(
296            self.workdir, 'sample_document_data.csv')
297        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
298
299    def test_interface(self):
300        # Make sure we fulfill the interface contracts.
301        assert verifyObject(IBatchProcessor, self.processor) is True
302        assert verifyClass(
303            IBatchProcessor, CustomerSampleDocumentProcessor) is True
304
305    def test_getEntry(self):
306        assert self.processor.getEntry(
307            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
308        assert self.processor.getEntry(
309            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
310            self.app) is self.customer['documents']['DOC1']
311
312    def test_delEntry(self):
313        assert self.processor.getEntry(
314            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
315            self.app) is self.customer['documents']['DOC1']
316        self.assertEqual(len(self.customer['documents'].keys()),1)
317        self.processor.delEntry(
318            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
319            self.app)
320        assert self.processor.getEntry(
321            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
322            self.app) is None
323        self.assertEqual(len(self.customer['documents'].keys()),0)
324
325    def test_addEntry(self):
326        self.assertEqual(len(self.customer['documents'].keys()),1)
327        document1 = createObject(u'waeup.CustomerSampleDocument')
328        document1.document_id = u'd234'
329        self.processor.addEntry(
330            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
331            self.app)
332        self.assertEqual(len(self.customer['documents'].keys()),2)
333        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
334
335    def test_checkConversion(self):
336        errs, inv_errs, conv_dict = self.processor.checkConversion(
337            dict(document_id='DOC', class_name='CustomerSampleDocument'))
338        self.assertEqual(len(errs),0)
339        errs, inv_errs, conv_dict = self.processor.checkConversion(
340            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
341        self.assertEqual(len(errs),1)
342        errs, inv_errs, conv_dict = self.processor.checkConversion(
343            dict(document_id='DOC', class_name='WrongDocument'))
344        self.assertEqual(len(errs),1)
345
346    def test_import(self):
347        num, num_warns, fin_file, fail_file = self.processor.doImport(
348            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
349        fail_file = open(fail_file).read()
350        self.assertEqual(num_warns,1)
351        # document_id must be unique
352        self.assertEqual(fail_file,
353            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
354            'CustomerSampleDocument,2,d5,My stolen doc,'
355            'This object already exists. Skipping.\r\n')
356        document = self.processor.getEntry(dict(reg_number='1',
357            document_id='d3'), self.app)
358        self.assertEqual(
359            self.app['customers']['X666666']['documents']['d3'],
360            document)
361        self.assertEqual(document.document_id, 'd3')
362        document = self.processor.getEntry(dict(reg_number='3',
363            document_id='d5'), self.app)
364        shutil.rmtree(os.path.dirname(fin_file))
365        logcontent = open(self.logfile).read()
366        # Fetch uuid
367        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
368            if len(i) > 10][0]
369         # Logging message from updateEntry
370        self.assertTrue(
371            'INFO - system - Customer Sample Document Processor - '
372            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
373            % docid in logcontent)
374
375    def test_import_update(self):
376        # We perform the same import twice,
377        # the second time in update mode. The number
378        # of warnings must be the same.
379        num, num_warns, fin_file, fail_file = self.processor.doImport(
380            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
381        shutil.rmtree(os.path.dirname(fin_file))
382        num, num_warns, fin_file, fail_file = self.processor.doImport(
383            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
384        # There is one record without document_id
385        self.assertEqual(num_warns,1)
386        shutil.rmtree(os.path.dirname(fin_file))
387
388    def test_import_remove(self):
389        # We perform the same import twice,
390        # the second time in remove mode. The number
391        # of warnings must be the same.
392        num, num_warns, fin_file, fail_file = self.processor.doImport(
393            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
394        shutil.rmtree(os.path.dirname(fin_file))
395        num, num_warns, fin_file, fail_file = self.processor.doImport(
396            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
397        # There is one record without document_id and one duplicate
398        self.assertEqual(num_warns,2)
399        shutil.rmtree(os.path.dirname(fin_file))
400        logcontent = open(self.logfile).read()
401        self.assertTrue(
402            'INFO - system - K1000001 - Document removed: d5'
403            in logcontent)
404
405
406class SampleContractProcessorTest(CustomerImportExportSetup):
407
408    def setUp(self):
409        super(SampleContractProcessorTest, self).setUp()
410
411        customer = Customer()
412        self.app['customers'].addCustomer(customer)
413        customer = self.setup_customer(customer)
414        notify(grok.ObjectModifiedEvent(customer))
415        self.customer = self.app['customers'][customer.customer_id]
416
417        # Import customers with subobjects
418        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
419        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
420        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
421            customer_file, CUSTOMER_HEADER_FIELDS)
422        shutil.rmtree(os.path.dirname(fin_file))
423
424        self.processor = SampleContractProcessor()
425        self.csv_file = os.path.join(
426            self.workdir, 'sample_contract_data.csv')
427        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
428
429    def test_interface(self):
430        # Make sure we fulfill the interface contracts.
431        assert verifyObject(IBatchProcessor, self.processor) is True
432        assert verifyClass(
433            IBatchProcessor, SampleContractProcessor) is True
434
435    def test_getEntry(self):
436        assert self.processor.getEntry(
437            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
438        assert self.processor.getEntry(
439            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
440            self.app) is self.customer['contracts']['CON1']
441
442    def test_delEntry(self):
443        assert self.processor.getEntry(
444            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
445            self.app) is self.customer['contracts']['CON1']
446        self.assertEqual(len(self.customer['contracts'].keys()),1)
447        self.processor.delEntry(
448            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
449            self.app)
450        assert self.processor.getEntry(
451            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
452            self.app) is None
453        self.assertEqual(len(self.customer['contracts'].keys()),0)
454
455    def test_addEntry(self):
456        self.assertEqual(len(self.customer['contracts'].keys()),1)
457        contract1 = createObject(u'waeup.SampleContract')
458        contract1.contract_id = u'CON2'
459        self.processor.addEntry(
460            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
461            self.app)
462        self.assertEqual(len(self.customer['contracts'].keys()),2)
463        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
464
465    def test_checkConversion(self):
466        errs, inv_errs, conv_dict = self.processor.checkConversion(
467            dict(contract_id='CON3', class_name='SampleContract',
468                 document_object='DOC1', product_object='SAM'))
469        self.assertEqual(len(errs),0)
470        errs, inv_errs, conv_dict = self.processor.checkConversion(
471            dict(contract_id='id with spaces', class_name='SampleContract'))
472        self.assertEqual(len(errs),1)
473        errs, inv_errs, conv_dict = self.processor.checkConversion(
474            dict(contract_id='CON3', class_name='WrongContract'))
475        self.assertEqual(len(errs),1)
476
477    def test_import(self):
478        num, num_warns, fin_file, fail_file = self.processor.doImport(
479            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
480        self.assertEqual(num_warns,3)
481        # contract_id must be unique
482        fail_file = open(fail_file).read()
483        self.assertEqual(fail_file,
484            'reg_number,contract_id,title,class_name,product_object,document_object,--ERRORS--\r\n'
485            '2,c5,My stolen contract,SampleContract,SAM,DOC1,This object already exists. Skipping.\r\n'
486            '3,c6,My 5th contract,SampleContract,SAM,NONEXISTENT,document_object: Invalid value\r\n'
487            '3,c7,My sixt contract,SampleContract,NONEXISTENT,DOC1,product_object: Invalid value\r\n'
488            )
489        contract = self.processor.getEntry(dict(reg_number='1',
490            contract_id='c3'), self.app)
491        self.assertEqual(
492            self.app['customers']['X666666']['contracts']['c3'],
493            contract)
494        self.assertEqual(contract.contract_id, 'c3')
495        contract = self.processor.getEntry(dict(reg_number='3',
496            contract_id='c5'), self.app)
497        shutil.rmtree(os.path.dirname(fin_file))
498        logcontent = open(self.logfile).read()
499        # Logging message from updateEntry
500        # Fetch uuid
501        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
502            if len(i) > 10][0]
503        self.assertTrue(
504            'INFO - system - Sample Contract Processor - sample_contract_data - '
505            'X666666 - %s - updated: title=My 4th contract, '
506            'product_object=SAM, document_object=DOC1' % conid
507            in logcontent)
508
509    def test_import_update(self):
510        # We perform the same import twice,
511        # the second time in update mode. The number
512        # of warnings must be the same.
513        num, num_warns, fin_file, fail_file = self.processor.doImport(
514            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
515        shutil.rmtree(os.path.dirname(fin_file))
516        num, num_warns, fin_file, fail_file = self.processor.doImport(
517            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
518        # There are one record without contract_id and two
519        # records with wrong object identifier.
520        self.assertEqual(num_warns,3)
521        shutil.rmtree(os.path.dirname(fin_file))
522
523    def test_import_remove(self):
524        # We perform the same import twice,
525        # the second time in remove mode. The number
526        # of warnings must be the same.
527        num, num_warns, fin_file, fail_file = self.processor.doImport(
528            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
529        shutil.rmtree(os.path.dirname(fin_file))
530        num, num_warns, fin_file, fail_file = self.processor.doImport(
531            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
532        # There are one record without contract_id, one duplicate and two
533        # records with wrong object identifier.
534        self.assertEqual(num_warns,4)
535        shutil.rmtree(os.path.dirname(fin_file))
536        logcontent = open(self.logfile).read()
537        self.assertTrue(
538            'INFO - system - K1000001 - Contract removed: c5'
539            in logcontent)
Note: See TracBrowser for help on using the repository browser.