Changeset 12125
- Timestamp:
- 3 Dec 2014, 12:18:21 (10 years ago)
- Location:
- main/waeup.ikoba/trunk/src/waeup/ikoba/customers
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py
r12124 r12125 41 41 from waeup.ikoba.interfaces import MessageFactory as _ 42 42 from waeup.ikoba.documents.utils import generate_document_id 43 from waeup.ikoba.customers.utils import generate_contract_id 43 44 from waeup.ikoba.customers.interfaces import ( 44 45 ICustomer, ICustomerUpdateByRegNo, … … 357 358 raw_header = reader.next() 358 359 for num, field in enumerate(headerfields): 359 if field not in ['customer_id', 'reg_number', 'document_id', 'code', 'level' 360 if field not in ['customer_id', 'reg_number', 'document_id', 361 'code', 'level', 'contract_id' 360 362 ] and mode == 'remove': 361 363 continue … … 555 557 errs.append(('contract_id','id exists')) 556 558 else: 557 if not contract_id.startswith(' a'):559 if not contract_id.startswith('c'): 558 560 errs.append(('contract_id','invalid format')) 559 561 return errs, inv_errs, conv_dict -
main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py
r12124 r12125 36 36 from waeup.ikoba.interfaces import IBatchProcessor, FatalCSVError, IUserAccount 37 37 from waeup.ikoba.customers.batching import ( 38 CustomerProcessor, CustomerDocumentProcessor )38 CustomerProcessor, CustomerDocumentProcessor, ContractProcessor) 39 39 from waeup.ikoba.customers.customer import Customer 40 40 from waeup.ikoba.customers.documents import CustomerSampleDocument … … 68 68 69 69 DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split( 70 '\n')[0].split(',') 71 72 CONTRACT_SAMPLE_DATA = open( 73 os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'), 74 'rb').read() 75 76 CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split( 70 77 '\n')[0].split(',') 71 78 … … 83 90 self.app = self.getRootFolder()['app'] 84 91 setSite(app) 85 86 92 self.logfile = os.path.join( 87 93 self.app['datacenter'].storage, 'logs', 'customers.log') 94 95 #Add some products 96 self.product = createObject('waeup.Product') 97 self.product.product_id = u'SAM' 98 self.product.title = u'Our Samle Product' 99 self.product.contract_category = u'sample' 100 self.app['products'].addProduct(self.product) 88 101 return 89 102 … … 103 116 104 117 def setup_customer(self, customer): 105 # set predictable values for `customer`106 118 customer.matric_number = u'234' 107 119 customer.perm_address = u'Customerroad 21\nLagos 123456\n' … … 116 128 customer.notice = u'Some notice\nin lines.' 117 129 customer.nationality = u'NG' 118 119 document = CustomerSampleDocument()130 # Add document 131 document = createObject('waeup.CustomerSampleDocument') 120 132 document.title = u'My Document' 121 133 customer['documents'].addDocument(document) 134 self.document = document 135 # Add contract 136 contract = createObject('waeup.SampleContract') 137 contract.title = u'My Contract' 138 customer['contracts'].addContract(contract) 139 self.contract = contract 122 140 return customer 123 141 … … 255 273 super(CustomerDocumentProcessorTest, self).setUp() 256 274 257 # Add customer with document258 275 customer = Customer() 259 customer.firstname = u'Anna'260 customer.lastname = u'Tester'261 customer.reg_number = u'123'262 276 self.app['customers'].addCustomer(customer) 277 customer = self.setup_customer(customer) 278 notify(grok.ObjectModifiedEvent(customer)) 263 279 self.customer = self.app['customers'][customer.customer_id] 264 document = createObject(u'waeup.CustomerSampleDocument')265 document.document_id = 'd120'266 self.customer['documents'][document.document_id] = document267 280 268 281 # Import customers with subobjects … … 288 301 dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None 289 302 assert self.processor.getEntry( 290 dict(customer_id=self.customer.customer_id, document_id='d1 20'),291 self.app) is self.customer['documents']['d1 20']303 dict(customer_id=self.customer.customer_id, document_id='d101'), 304 self.app) is self.customer['documents']['d101'] 292 305 293 306 def test_delEntry(self): 294 307 assert self.processor.getEntry( 295 dict(customer_id=self.customer.customer_id, document_id='d1 20'),296 self.app) is self.customer['documents']['d1 20']308 dict(customer_id=self.customer.customer_id, document_id='d101'), 309 self.app) is self.customer['documents']['d101'] 297 310 self.assertEqual(len(self.customer['documents'].keys()),1) 298 311 self.processor.delEntry( 299 dict(customer_id=self.customer.customer_id, document_id='d1 20'),312 dict(customer_id=self.customer.customer_id, document_id='d101'), 300 313 self.app) 301 314 assert self.processor.getEntry( 302 dict(customer_id=self.customer.customer_id, document_id='d1 20'),315 dict(customer_id=self.customer.customer_id, document_id='d101'), 303 316 self.app) is None 304 317 self.assertEqual(len(self.customer['documents'].keys()),0) … … 307 320 self.assertEqual(len(self.customer['documents'].keys()),1) 308 321 document1 = createObject(u'waeup.CustomerSampleDocument') 309 document1.document_id = ' p234'322 document1.document_id = 'd234' 310 323 self.processor.addEntry( 311 document1, dict(customer_id=self.customer.customer_id, document_id=' p234'),324 document1, dict(customer_id=self.customer.customer_id, document_id='d234'), 312 325 self.app) 313 326 self.assertEqual(len(self.customer['documents'].keys()),2) 314 self.assertEqual(self.customer['documents'][' p234'].document_id, 'p234')327 self.assertEqual(self.customer['documents']['d234'].document_id, 'd234') 315 328 document2 = createObject(u'waeup.CustomerSampleDocument') 316 329 document1.document_id = 'nonsense' … … 318 331 def test_checkConversion(self): 319 332 errs, inv_errs, conv_dict = self.processor.checkConversion( 320 dict(document_id='d126 6236341955', class_name='CustomerSampleDocument'))333 dict(document_id='d126', class_name='CustomerSampleDocument')) 321 334 self.assertEqual(len(errs),0) 322 335 errs, inv_errs, conv_dict = self.processor.checkConversion( 323 336 dict(document_id='nonsense', class_name='CustomerSampleDocument')) 324 337 self.assertEqual(len(errs),1) 325 timestamp = ("%d" % int(time()*10000))[1:] 326 document_id = "d%s" % timestamp 327 errs, inv_errs, conv_dict = self.processor.checkConversion( 328 dict(document_id=document_id, class_name='CustomerSampleDocument')) 329 self.assertEqual(len(errs),0) 330 errs, inv_errs, conv_dict = self.processor.checkConversion( 331 dict(document_id=document_id, class_name='WrongDocument')) 338 errs, inv_errs, conv_dict = self.processor.checkConversion( 339 dict(document_id='d127', class_name='WrongDocument')) 340 self.assertEqual(len(errs),1) 341 errs, inv_errs, conv_dict = self.processor.checkConversion( 342 dict(document_id='', class_name='CustomerSampleDocument', 343 mode='update')) 332 344 self.assertEqual(len(errs),1) 333 345 … … 387 399 'INFO - system - K1000001 - Document removed: d5' 388 400 in logcontent) 401 402 403 class ContractProcessorTest(CustomerImportExportSetup): 404 405 def setUp(self): 406 super(ContractProcessorTest, self).setUp() 407 408 customer = Customer() 409 self.app['customers'].addCustomer(customer) 410 customer = self.setup_customer(customer) 411 notify(grok.ObjectModifiedEvent(customer)) 412 self.customer = self.app['customers'][customer.customer_id] 413 414 # Import customers with subobjects 415 customer_file = os.path.join(self.workdir, 'sample_customer_data.csv') 416 open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA) 417 num, num_warns, fin_file, fail_file = CustomerProcessor().doImport( 418 customer_file, CUSTOMER_HEADER_FIELDS) 419 shutil.rmtree(os.path.dirname(fin_file)) 420 421 self.processor = ContractProcessor() 422 self.csv_file = os.path.join( 423 self.workdir, 'sample_contract_data.csv') 424 open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA) 425 426 def test_interface(self): 427 # Make sure we fulfill the interface contracts. 428 assert verifyObject(IBatchProcessor, self.processor) is True 429 assert verifyClass( 430 IBatchProcessor, ContractProcessor) is True 431 432 def test_getEntry(self): 433 assert self.processor.getEntry( 434 dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None 435 assert self.processor.getEntry( 436 dict(customer_id=self.customer.customer_id, contract_id='c101'), 437 self.app) is self.customer['contracts']['c101'] 438 439 def test_delEntry(self): 440 assert self.processor.getEntry( 441 dict(customer_id=self.customer.customer_id, contract_id='c101'), 442 self.app) is self.customer['contracts']['c101'] 443 self.assertEqual(len(self.customer['contracts'].keys()),1) 444 self.processor.delEntry( 445 dict(customer_id=self.customer.customer_id, contract_id='c101'), 446 self.app) 447 assert self.processor.getEntry( 448 dict(customer_id=self.customer.customer_id, contract_id='c101'), 449 self.app) is None 450 self.assertEqual(len(self.customer['contracts'].keys()),0) 451 452 def test_addEntry(self): 453 self.assertEqual(len(self.customer['contracts'].keys()),1) 454 contract1 = createObject(u'waeup.SampleContract') 455 contract1.contract_id = 'c234' 456 self.processor.addEntry( 457 contract1, dict(customer_id=self.customer.customer_id, contract_id='c234'), 458 self.app) 459 self.assertEqual(len(self.customer['contracts'].keys()),2) 460 self.assertEqual(self.customer['contracts']['c234'].contract_id, 'c234') 461 contract2 = createObject(u'waeup.SampleContract') 462 contract1.contract_id = 'nonsense' 463 464 def test_checkConversion(self): 465 errs, inv_errs, conv_dict = self.processor.checkConversion( 466 dict(contract_id='c126', class_name='SampleContract')) 467 self.assertEqual(len(errs),0) 468 errs, inv_errs, conv_dict = self.processor.checkConversion( 469 dict(contract_id='nonsense', class_name='SampleContract')) 470 self.assertEqual(len(errs),1) 471 errs, inv_errs, conv_dict = self.processor.checkConversion( 472 dict(contract_id='c127', class_name='WrongContract')) 473 self.assertEqual(len(errs),1) 474 475 def test_import(self): 476 num, num_warns, fin_file, fail_file = self.processor.doImport( 477 self.csv_file, CONTRACT_HEADER_FIELDS,'create') 478 self.assertEqual(num_warns,1) 479 # contract_id must be unique 480 fail_file = open(fail_file).read() 481 self.assertEqual(fail_file, 482 'class_name,reg_number,contract_id,title,--ERRORS--\r\n' 483 'SampleContract,2,c5,My stolen contract,contract_id: id exists\r\n') 484 contract = self.processor.getEntry(dict(reg_number='1', 485 contract_id='c3'), self.app) 486 self.assertEqual( 487 self.app['customers']['X666666']['contracts']['c3'], 488 contract) 489 self.assertEqual(contract.contract_id, 'c3') 490 contract = self.processor.getEntry(dict(reg_number='3', 491 contract_id='c5'), self.app) 492 shutil.rmtree(os.path.dirname(fin_file)) 493 logcontent = open(self.logfile).read() 494 # Logging message from updateEntry 495 self.assertTrue( 496 'INFO - system - Contract Processor - ' 497 'sample_contract_data - X666666 - updated: ' 498 'contract_id=c3, title=My first contract' 499 in logcontent) 500 501 def test_import_update(self): 502 # We perform the same import twice, 503 # the second time in update mode. The number 504 # of warnings must be the same. 505 num, num_warns, fin_file, fail_file = self.processor.doImport( 506 self.csv_file, CONTRACT_HEADER_FIELDS,'create') 507 shutil.rmtree(os.path.dirname(fin_file)) 508 num, num_warns, fin_file, fail_file = self.processor.doImport( 509 self.csv_file, CONTRACT_HEADER_FIELDS,'update') 510 # There is one record without contract_id and one duplicate 511 self.assertEqual(num_warns,2) 512 shutil.rmtree(os.path.dirname(fin_file)) 513 514 def test_import_remove(self): 515 # We perform the same import twice, 516 # the second time in remove mode. The number 517 # of warnings must be the same. 518 num, num_warns, fin_file, fail_file = self.processor.doImport( 519 self.csv_file, CONTRACT_HEADER_FIELDS,'create') 520 shutil.rmtree(os.path.dirname(fin_file)) 521 num, num_warns, fin_file, fail_file = self.processor.doImport( 522 self.csv_file, CONTRACT_HEADER_FIELDS,'remove') 523 # There is one record without contract_id and one duplicate 524 self.assertEqual(num_warns,2) 525 shutil.rmtree(os.path.dirname(fin_file)) 526 logcontent = open(self.logfile).read() 527 self.assertTrue( 528 'INFO - system - K1000001 - Contract removed: c5' 529 in logcontent)
Note: See TracChangeset for help on using the changeset viewer.