Changeset 11995 for main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests
- Timestamp:
- 19 Nov 2014, 16:05:30 (10 years ago)
- Location:
- main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests
- Files:
-
- 1 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py
r11964 r11995 2 2 ## $Id: test_batching.py 11756 2014-07-09 12:46:08Z henrik $ 3 3 ## 4 ## Copyright (C) 201 1Uli Fouquet & Henrik Bettermann4 ## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann 5 5 ## This program is free software; you can redistribute it and/or modify 6 6 ## it under the terms of the GNU General Public License as published by … … 35 35 from waeup.ikoba.app import Company 36 36 from waeup.ikoba.interfaces import IBatchProcessor, FatalCSVError, IUserAccount 37 from waeup.ikoba.customers.batching import CustomerProcessor 37 from waeup.ikoba.customers.batching import ( 38 CustomerProcessor, CustomerDocumentProcessor) 38 39 from waeup.ikoba.customers.customer import Customer 39 40 from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase … … 59 60 60 61 CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split( 62 '\n')[0].split(',') 63 64 DOCUMENT_SAMPLE_DATA = open( 65 os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'), 66 'rb').read() 67 68 DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split( 61 69 '\n')[0].split(',') 62 70 … … 238 246 shutil.rmtree(os.path.dirname(fin_file)) 239 247 248 class CustomerDocumentProcessorTest(CustomerImportExportSetup): 249 250 def setUp(self): 251 super(CustomerDocumentProcessorTest, self).setUp() 252 253 # Add customer with document 254 customer = Customer() 255 customer.firstname = u'Anna' 256 customer.lastname = u'Tester' 257 customer.reg_number = u'123' 258 self.app['customers'].addCustomer(customer) 259 self.customer = self.app['customers'][customer.customer_id] 260 document = createObject(u'waeup.CustomerDocument') 261 document.id = 'd120' 262 self.customer['documents'][document.id] = document 263 264 # Import customers with subobjects 265 customer_file = os.path.join(self.workdir, 'sample_customer_data.csv') 266 open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA) 267 num, num_warns, fin_file, fail_file = CustomerProcessor().doImport( 268 customer_file, CUSTOMER_HEADER_FIELDS) 269 shutil.rmtree(os.path.dirname(fin_file)) 270 271 self.processor = CustomerDocumentProcessor() 272 self.csv_file = os.path.join( 273 self.workdir, 'sample_document_data.csv') 274 open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA) 275 276 def test_interface(self): 277 # Make sure we fulfill the interface contracts. 278 assert verifyObject(IBatchProcessor, self.processor) is True 279 assert verifyClass( 280 IBatchProcessor, CustomerDocumentProcessor) is True 281 282 def test_getEntry(self): 283 assert self.processor.getEntry( 284 dict(customer_id='ID_NONE', id='nonsense'), self.app) is None 285 assert self.processor.getEntry( 286 dict(customer_id=self.customer.customer_id, id='d120'), 287 self.app) is self.customer['documents']['d120'] 288 289 def test_delEntry(self): 290 assert self.processor.getEntry( 291 dict(customer_id=self.customer.customer_id, id='d120'), 292 self.app) is self.customer['documents']['d120'] 293 self.assertEqual(len(self.customer['documents'].keys()),1) 294 self.processor.delEntry( 295 dict(customer_id=self.customer.customer_id, id='d120'), 296 self.app) 297 assert self.processor.getEntry( 298 dict(customer_id=self.customer.customer_id, id='d120'), 299 self.app) is None 300 self.assertEqual(len(self.customer['documents'].keys()),0) 301 302 def test_addEntry(self): 303 self.assertEqual(len(self.customer['documents'].keys()),1) 304 document1 = createObject(u'waeup.CustomerDocument') 305 document1.id = 'p234' 306 self.processor.addEntry( 307 document1, dict(customer_id=self.customer.customer_id, id='p234'), 308 self.app) 309 self.assertEqual(len(self.customer['documents'].keys()),2) 310 self.assertEqual(self.customer['documents']['p234'].id, 'p234') 311 document2 = createObject(u'waeup.CustomerDocument') 312 document1.id = 'nonsense' 313 314 def test_checkConversion(self): 315 errs, inv_errs, conv_dict = self.processor.checkConversion( 316 dict(id='d1266236341955')) 317 self.assertEqual(len(errs),0) 318 errs, inv_errs, conv_dict = self.processor.checkConversion( 319 dict(id='nonsense')) 320 self.assertEqual(len(errs),1) 321 timestamp = ("%d" % int(time()*10000))[1:] 322 id = "d%s" % timestamp 323 errs, inv_errs, conv_dict = self.processor.checkConversion( 324 dict(id=id)) 325 self.assertEqual(len(errs),0) 326 327 def test_import(self): 328 num, num_warns, fin_file, fail_file = self.processor.doImport( 329 self.csv_file, DOCUMENT_HEADER_FIELDS,'create') 330 self.assertEqual(num_warns,0) 331 document = self.processor.getEntry(dict(reg_number='1', 332 id='d1266236341953'), self.app) 333 self.assertEqual( 334 self.app['customers']['X666666']['documents']['d1266236341953'], 335 document) 336 self.assertEqual(document.id, 'd1266236341953') 337 document = self.processor.getEntry(dict(reg_number='3', 338 id='d1266236341955'), self.app) 339 shutil.rmtree(os.path.dirname(fin_file)) 340 logcontent = open(self.logfile).read() 341 # Logging message from updateEntry 342 self.assertTrue( 343 'INFO - system - CustomerDocument Processor - ' 344 'sample_document_data - X666666 - updated: ' 345 'id=d1266236341953, title=My first doc' 346 in logcontent) 347 348 def test_import_update(self): 349 # We perform the same import twice, 350 # the second time in update mode. The number 351 # of warnings must be the same. 352 num, num_warns, fin_file, fail_file = self.processor.doImport( 353 self.csv_file, DOCUMENT_HEADER_FIELDS,'create') 354 shutil.rmtree(os.path.dirname(fin_file)) 355 num, num_warns, fin_file, fail_file = self.processor.doImport( 356 self.csv_file, DOCUMENT_HEADER_FIELDS,'update') 357 self.assertEqual(num_warns,1) # There is one record without id 358 shutil.rmtree(os.path.dirname(fin_file)) 359 360 def test_import_remove(self): 361 # We perform the same import twice, 362 # the second time in remove mode. The number 363 # of warnings must be the same. 364 num, num_warns, fin_file, fail_file = self.processor.doImport( 365 self.csv_file, DOCUMENT_HEADER_FIELDS,'create') 366 shutil.rmtree(os.path.dirname(fin_file)) 367 num, num_warns, fin_file, fail_file = self.processor.doImport( 368 self.csv_file, DOCUMENT_HEADER_FIELDS,'remove') 369 self.assertEqual(num_warns,1) # There is one record without id 370 shutil.rmtree(os.path.dirname(fin_file)) 371 logcontent = open(self.logfile).read() 372 self.assertTrue( 373 'INFO - system - K1000001 - Document removed: d1266236341955' 374 in logcontent)
Note: See TracChangeset for help on using the changeset viewer.