source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 17935

Last change on this file since 17935 was 14202, checked in by Henrik Bettermann, 8 years ago

Fix bug. Customers could also select documents from other customers.

  • Property svn:keywords set to Id
File size: 24.5 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 14202 2016-09-29 06:51:42Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setup_customizable_params(self):
87        self._contract_category = u'sample'
88        return
89
90    def setUp(self):
91        super(CustomerImportExportSetup, self).setUp()
92        self.setup_customizable_params()
93        self.dc_root = tempfile.mkdtemp()
94        self.workdir = tempfile.mkdtemp()
95        app = Company()
96        app['datacenter'].setStoragePath(self.dc_root)
97        self.getRootFolder()['app'] = app
98        self.app = self.getRootFolder()['app']
99        setSite(app)
100        self.logfile = os.path.join(
101            self.app['datacenter'].storage, 'logs', 'customers.log')
102
103        #Add some products
104        self.product = createObject('waeup.Product')
105        self.product.product_id = u'SAM'
106        self.product.title = u'Our Sample Product'
107        self.product.contract_category = self._contract_category
108        self.product.valid_from = datetime.date(2015, 12, 4)
109        self.product.tc_dict = {'en': u'Hello world'}
110        option = ProductOption()
111        option.title = u'Base Fee'
112        option.fee = Decimal('800.6')
113        option.currency = 'USD'
114        self.product.options = [option,]
115        self.app['products'].addProduct(self.product)
116        return
117
118    def tearDown(self):
119        super(CustomerImportExportSetup, self).tearDown()
120        shutil.rmtree(self.workdir)
121        shutil.rmtree(self.dc_root)
122        clearSite()
123        return
124
125    def setup_for_export(self):
126        customer = Customer()
127        customer.customer_id = u'A111111'
128        self.app['customers'][customer.customer_id] = self.customer = customer
129        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
130        return
131
132    def setup_customer(self, customer):
133        customer.matric_number = u'234'
134        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
135        customer.reg_number = u'123'
136        customer.firstname = u'Anna'
137        customer.lastname = u'Tester'
138        customer.middlename = u'M.'
139        customer.email = u'bb@bb.bb'
140        customer.date_of_birth = datetime.date(1981, 2, 4)
141        #customer.sex = 'f'
142        customer.email = u'anna@sample.com'
143        customer.phone = u'+234-123-12345'
144        customer.notice = u'Some notice\nin lines.'
145        customer.nationality = u'NG'
146        # Add document
147        document = createObject('waeup.CustomerSampleDocument')
148        document.title = u'My Document'
149        document.document_id = u'DOC1'
150        customer['documents'].addDocument(document)
151        IWorkflowState(document).setState(VERIFIED)
152        self.document = document
153        # Add contract
154        contract = createObject('waeup.SampleContract')
155        contract.contract_id = u'CON1'
156        customer['contracts'].addContract(contract)
157        contract.product_object = self.product
158        contract.document_object = self.document
159        contract.product_options = self.product.options
160        contract.tc_dict = {'en': u'Hello world'}
161        contract.valid_from = datetime.date(2014, 2, 4)
162        contract.valid_to = datetime.date(2014, 12, 4)
163        self.contract = contract
164        return customer
165
166class CustomerProcessorTest(CustomerImportExportSetup):
167
168    layer = FunctionalLayer
169
170    def setUp(self):
171        super(CustomerProcessorTest, self).setUp()
172
173        # Add customer
174        customer = Customer()
175        self.app['customers'].addCustomer(customer)
176        customer = self.setup_customer(customer)
177        notify(grok.ObjectModifiedEvent(customer))
178        self.customer = self.app['customers'][customer.customer_id]
179
180        self.processor = CustomerProcessor()
181        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
182        self.csv_file_update = os.path.join(
183            self.workdir, 'sample_customer_data_update.csv')
184        self.csv_file_duplicates = os.path.join(
185            self.workdir, 'sample_customer_data_duplicates.csv')
186        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
187        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
188        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
189
190    def test_interface(self):
191        # Make sure we fulfill the interface contracts.
192        assert verifyObject(IBatchProcessor, self.processor) is True
193        assert verifyClass(
194            IBatchProcessor, CustomerProcessor) is True
195
196    def test_parentsExist(self):
197        self.assertFalse(self.processor.parentsExist(None, dict()))
198        self.assertTrue(self.processor.parentsExist(None, self.app))
199
200    def test_entryExists(self):
201        assert self.processor.entryExists(
202            dict(customer_id='ID_NONE'), self.app) is False
203        assert self.processor.entryExists(
204            dict(reg_number='123'), self.app) is True
205
206    def test_getParent(self):
207        parent = self.processor.getParent(None, self.app)
208        assert parent is self.app['customers']
209
210    def test_getEntry(self):
211        assert self.processor.getEntry(
212            dict(customer_id='ID_NONE'), self.app) is None
213        assert self.processor.getEntry(
214            dict(customer_id=self.customer.customer_id), self.app) is self.customer
215
216    def test_addEntry(self):
217        new_customer = Customer()
218        self.processor.addEntry(
219            new_customer, dict(), self.app)
220        assert len(self.app['customers'].keys()) == 2
221
222    def test_checkConversion(self):
223        # Make sure we can check conversions and that the cust_id
224        # counter is not raised during such checks.
225        initial_cust_id = self.app['customers']._curr_cust_id
226        errs, inv_errs, conv_dict = self.processor.checkConversion(
227            dict(reg_number='1', state='approved'))
228        self.assertEqual(len(errs),0)
229        # Empty state is allowed
230        errs, inv_errs, conv_dict = self.processor.checkConversion(
231            dict(reg_number='1', state=''))
232        self.assertEqual(len(errs),0)
233        #self.assertTrue(('state', 'no value provided') in errs)
234        errs, inv_errs, conv_dict = self.processor.checkConversion(
235            dict(reg_number='1', state='nonsense'))
236        self.assertEqual(len(errs),1)
237        self.assertTrue(('state', 'not allowed') in errs)
238        new_cust_id = self.app['customers']._curr_cust_id
239        self.assertEqual(initial_cust_id, new_cust_id)
240        return
241
242    def test_checkUpdateRequirements(self):
243        # Make sure that pg customers can't be updated with wrong transition.
244        err = self.processor.checkUpdateRequirements(self.customer,
245            dict(reg_number='1', state='returning'), self.app)
246        self.assertTrue(err is None)
247
248    def test_delEntry(self):
249        assert self.customer.customer_id in self.app['customers'].keys()
250        self.processor.delEntry(
251            dict(reg_number=self.customer.reg_number), self.app)
252        assert self.customer.customer_id not in self.app['customers'].keys()
253
254    def test_import(self):
255        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
256        num, num_warns, fin_file, fail_file = self.processor.doImport(
257            self.csv_file, CUSTOMER_HEADER_FIELDS)
258        self.assertEqual(num_warns,0)
259        self.assertEqual(len(self.app['customers']), 10)
260        # Three empty rows have been skipped.
261        self.assertEqual(num, 12)
262        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
263        self.assertEqual(
264            self.app['customers']['X666666'].state, 'requested')
265        # Two new customer_ids have been created.
266        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
267        shutil.rmtree(os.path.dirname(fin_file))
268
269    def test_import_update(self):
270        num, num_warns, fin_file, fail_file = self.processor.doImport(
271            self.csv_file, CUSTOMER_HEADER_FIELDS)
272        shutil.rmtree(os.path.dirname(fin_file))
273        num, num_warns, fin_file, fail_file = self.processor.doImport(
274            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
275        self.assertEqual(num_warns,0)
276        # state has changed
277        self.assertEqual(self.app['customers']['X666666'].state,'approved')
278        # state has not changed
279        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
280        shutil.rmtree(os.path.dirname(fin_file))
281
282    def test_import_duplicate_data(self):
283        num, num_warns, fin_file, fail_file = self.processor.doImport(
284            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
285        content = open(fail_file).read()
286        self.assertEqual(num_warns,3)
287        self.assertEqual(
288            content,
289            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
290            '1,<IGNORE>,Frank,Meyer,m,1234,<IGNORE>,1990-01-06,F123456,cc@aa.ng,reg_number: Invalid input\r\n'
291            '3,<IGNORE>,Uli,Schulz,m,1234,<IGNORE>,1990-01-07,A123456,dd@aa.ng,This object already exists. Skipping.\r\n'
292            '4,<IGNORE>,Berta,Pieri,m,1234,<IGNORE>,1991-01-07,A234567,aa@aa.ng,email: Invalid input\r\n'
293            )
294        shutil.rmtree(os.path.dirname(fin_file))
295
296class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
297
298    def setUp(self):
299        super(CustomerSampleDocumentProcessorTest, self).setUp()
300
301        customer = Customer()
302        self.app['customers'].addCustomer(customer)
303        customer = self.setup_customer(customer)
304        notify(grok.ObjectModifiedEvent(customer))
305        self.customer = self.app['customers'][customer.customer_id]
306
307        # Import customers with subobjects
308        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
309        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
310        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
311            customer_file, CUSTOMER_HEADER_FIELDS)
312        shutil.rmtree(os.path.dirname(fin_file))
313
314        self.processor = CustomerSampleDocumentProcessor()
315        self.csv_file = os.path.join(
316            self.workdir, 'sample_document_data.csv')
317        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
318
319    def test_interface(self):
320        # Make sure we fulfill the interface contracts.
321        assert verifyObject(IBatchProcessor, self.processor) is True
322        assert verifyClass(
323            IBatchProcessor, CustomerSampleDocumentProcessor) is True
324
325    def test_getEntry(self):
326        assert self.processor.getEntry(
327            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
328        assert self.processor.getEntry(
329            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
330            self.app) is self.customer['documents']['DOC1']
331
332    def test_delEntry(self):
333        assert self.processor.getEntry(
334            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
335            self.app) is self.customer['documents']['DOC1']
336        self.assertEqual(len(self.customer['documents'].keys()),1)
337        self.processor.delEntry(
338            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
339            self.app)
340        assert self.processor.getEntry(
341            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
342            self.app) is None
343        self.assertEqual(len(self.customer['documents'].keys()),0)
344
345    def test_addEntry(self):
346        self.assertEqual(len(self.customer['documents'].keys()),1)
347        document1 = createObject(u'waeup.CustomerSampleDocument')
348        document1.document_id = u'd234'
349        self.processor.addEntry(
350            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
351            self.app)
352        self.assertEqual(len(self.customer['documents'].keys()),2)
353        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
354
355    def test_checkConversion(self):
356        errs, inv_errs, conv_dict = self.processor.checkConversion(
357            dict(document_id='DOC', class_name='CustomerSampleDocument'))
358        self.assertEqual(len(errs),0)
359        errs, inv_errs, conv_dict = self.processor.checkConversion(
360            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
361        self.assertEqual(len(errs),1)
362        errs, inv_errs, conv_dict = self.processor.checkConversion(
363            dict(document_id='DOC', class_name='WrongDocument'))
364        self.assertEqual(len(errs),1)
365
366    def test_import(self):
367        num, num_warns, fin_file, fail_file = self.processor.doImport(
368            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
369        fail_file = open(fail_file).read()
370        self.assertEqual(num_warns,1)
371        # document_id must be unique
372        self.assertEqual(fail_file,
373            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
374            'CustomerSampleDocument,2,d5,My stolen doc,'
375            'This object already exists. Skipping.\r\n')
376        document = self.processor.getEntry(dict(reg_number='1',
377            document_id='d3'), self.app)
378        self.assertEqual(
379            self.app['customers']['X666666']['documents']['d3'],
380            document)
381        self.assertEqual(document.document_id, 'd3')
382        document = self.processor.getEntry(dict(reg_number='3',
383            document_id='d5'), self.app)
384        shutil.rmtree(os.path.dirname(fin_file))
385        logcontent = open(self.logfile).read()
386        # Fetch uuid
387        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
388            if len(i) > 10][0]
389         # Logging message from updateEntry
390        self.assertTrue(
391            'INFO - system - Customer Sample Document Processor - '
392            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
393            % docid in logcontent)
394
395    def test_import_update(self):
396        # We perform the same import twice,
397        # the second time in update mode. The number
398        # of warnings must be the same.
399        num, num_warns, fin_file, fail_file = self.processor.doImport(
400            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
401        shutil.rmtree(os.path.dirname(fin_file))
402        num, num_warns, fin_file, fail_file = self.processor.doImport(
403            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
404        # There is one record without document_id
405        self.assertEqual(num_warns,1)
406        shutil.rmtree(os.path.dirname(fin_file))
407
408    def test_import_remove(self):
409        # We perform the same import twice,
410        # the second time in remove mode. The number
411        # of warnings must be the same.
412        num, num_warns, fin_file, fail_file = self.processor.doImport(
413            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
414        shutil.rmtree(os.path.dirname(fin_file))
415        num, num_warns, fin_file, fail_file = self.processor.doImport(
416            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
417        # There is one record without document_id and one duplicate
418        self.assertEqual(num_warns,2)
419        shutil.rmtree(os.path.dirname(fin_file))
420        logcontent = open(self.logfile).read()
421        self.assertTrue(
422            'INFO - system - K1000001 - Document removed: d5'
423            in logcontent)
424
425
426class SampleContractProcessorTest(CustomerImportExportSetup):
427
428    def setUp(self):
429        super(SampleContractProcessorTest, self).setUp()
430
431        customer = Customer()
432        self.app['customers'].addCustomer(customer)
433        customer = self.setup_customer(customer)
434        notify(grok.ObjectModifiedEvent(customer))
435        self.customer = self.app['customers'][customer.customer_id]
436
437        # Import customers with subobjects
438        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
439        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
440        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
441            customer_file, CUSTOMER_HEADER_FIELDS)
442        shutil.rmtree(os.path.dirname(fin_file))
443
444        self.processor = SampleContractProcessor()
445        self.csv_file = os.path.join(
446            self.workdir, 'sample_contract_data.csv')
447        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
448
449    def test_interface(self):
450        # Make sure we fulfill the interface contracts.
451        assert verifyObject(IBatchProcessor, self.processor) is True
452        assert verifyClass(
453            IBatchProcessor, SampleContractProcessor) is True
454
455    def test_getEntry(self):
456        assert self.processor.getEntry(
457            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
458        assert self.processor.getEntry(
459            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
460            self.app) is self.customer['contracts']['CON1']
461
462    def test_delEntry(self):
463        assert self.processor.getEntry(
464            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
465            self.app) is self.customer['contracts']['CON1']
466        self.assertEqual(len(self.customer['contracts'].keys()),1)
467        self.processor.delEntry(
468            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
469            self.app)
470        assert self.processor.getEntry(
471            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
472            self.app) is None
473        self.assertEqual(len(self.customer['contracts'].keys()),0)
474
475    def test_addEntry(self):
476        self.assertEqual(len(self.customer['contracts'].keys()),1)
477        contract1 = createObject(u'waeup.SampleContract')
478        contract1.contract_id = u'CON2'
479        self.processor.addEntry(
480            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
481            self.app)
482        self.assertEqual(len(self.customer['contracts'].keys()),2)
483        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
484
485    def test_checkConversion(self):
486        errs, inv_errs, conv_dict = self.processor.checkConversion(
487            dict(contract_id='CON3', class_name='SampleContract',
488                 document_object='DOC1', product_object='SAM'))
489        self.assertEqual(len(errs),0)
490        errs, inv_errs, conv_dict = self.processor.checkConversion(
491            dict(contract_id='id with spaces', class_name='SampleContract'))
492        self.assertEqual(len(errs),1)
493        errs, inv_errs, conv_dict = self.processor.checkConversion(
494            dict(contract_id='CON3', class_name='WrongContract'))
495        self.assertEqual(len(errs),1)
496
497    def test_import(self):
498        num, num_warns, fin_file, fail_file = self.processor.doImport(
499            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
500        self.assertEqual(num_warns,3)
501        # contract_id must be unique
502        fail_file = open(fail_file).read()
503        self.assertEqual(fail_file,
504            'reg_number,valid_from,contract_id,title,class_name,valid_to,product_object,tc_dict,document_object,product_options,--ERRORS--\r\n'
505            '2,<IGNORE>,c5,<IGNORE>,SampleContract,<IGNORE>,SAM,<IGNORE>,DOC1,<IGNORE>,This object already exists. Skipping.\r\n'
506            '3,<IGNORE>,c6,<IGNORE>,SampleContract,<IGNORE>,SAM,<IGNORE>,NONEXISTENT,<IGNORE>,document_object: Invalid value\r\n'
507            '3,<IGNORE>,c7,<IGNORE>,SampleContract,<IGNORE>,NONEXISTENT,<IGNORE>,DOC1,<IGNORE>,product_object: Invalid value\r\n'
508            )
509        contract = self.processor.getEntry(dict(reg_number='1',
510            contract_id='c3'), self.app)
511        self.assertEqual(
512            self.app['customers']['X666666']['contracts']['c3'],
513            contract)
514        self.assertEqual(contract.product_options[0].title, 'Base Fee')
515        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
516        self.assertEqual(contract.product_options[0].currency, 'USD')
517        self.assertEqual(contract.contract_id, 'c3')
518        self.assertEqual(contract.title, u'Nice Contract')
519        self.assertEqual(contract.tc_dict, {'en':'Hello World'})
520        contract = self.processor.getEntry(dict(reg_number='3',
521            contract_id='c5'), self.app)
522        shutil.rmtree(os.path.dirname(fin_file))
523        logcontent = open(self.logfile).read()
524        # Logging message from updateEntry
525        # Fetch uuid
526        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
527            if len(i) > 10][0]
528        self.assertTrue(
529            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
530            'X666666 - %s - updated: product_object=SAM, document_object=DOC1'
531            % conid
532            in logcontent)
533
534    def test_import_update(self):
535        # We perform the same import twice,
536        # the second time in update mode. No record can be imported.
537        num, num_warns, fin_file, fail_file = self.processor.doImport(
538            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
539        shutil.rmtree(os.path.dirname(fin_file))
540        num, num_warns, fin_file, fail_file = self.processor.doImport(
541            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
542        # There are one record without contract_id and two
543        # records with wrong object identifier.
544        # The other 4 records cannot be imported because DOC1 does
545        # not belong to these customers.
546        self.assertEqual(num_warns,7)
547        shutil.rmtree(os.path.dirname(fin_file))
548
549    def test_import_remove(self):
550        # We perform the same import twice,
551        # the second time in remove mode. The number
552        # of warnings must be the same.
553        num, num_warns, fin_file, fail_file = self.processor.doImport(
554            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
555        shutil.rmtree(os.path.dirname(fin_file))
556        num, num_warns, fin_file, fail_file = self.processor.doImport(
557            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
558        # There are one record without contract_id, one duplicate and two
559        # records with wrong object identifier.
560        self.assertEqual(num_warns,4)
561        shutil.rmtree(os.path.dirname(fin_file))
562        logcontent = open(self.logfile).read()
563        self.assertTrue(
564            'INFO - system - K1000001 - Contract removed: c5'
565            in logcontent)
Note: See TracBrowser for help on using the repository browser.