- Timestamp:
- 25 Nov 2014, 11:29:42 (10 years ago)
- Location:
- main/waeup.ikoba/trunk/src/waeup/ikoba/customers
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py
r12005 r12054 379 379 380 380 location_fields = [] 381 additional_fields = ['document_id' ]382 additional_headers = [ ]381 additional_fields = ['document_id', 'classname'] 382 additional_headers = ['classname'] 383 383 384 384 def checkHeaders(self, headerfields, mode='ignore'): … … 437 437 errs, inv_errs, conv_dict = super( 438 438 CustomerDocumentProcessor, self).checkConversion(row, mode=mode) 439 439 # We need to check if the classname corresponds with the 440 # processor chosen. This is to avoid accidentally wrong imports. 441 if mode != 'remove': 442 classname = row.get('classname', None) 443 if classname != self.factory_name.strip('waeup.'): 444 errs.append(('classname','wrong processor')) 440 445 # We have to check document_id. 441 446 document_id = row.get('document_id', None) 442 if not document_id:443 document_id = generate_document_id()444 conv_dict['document_id'] = document_id445 return errs, inv_errs, conv_dict446 # document_id must not exist.447 447 if mode == 'create': 448 if not document_id: 449 document_id = generate_document_id() 450 conv_dict['document_id'] = document_id 451 return errs, inv_errs, conv_dict 448 452 cat = queryUtility(ICatalog, name='documents_catalog') 449 453 results = list( 450 454 cat.searchResults(document_id=(document_id, document_id))) 451 455 if results: 456 # document_id must not exist. 452 457 errs.append(('document_id','id exists')) 453 if not document_id.startswith('d'): 454 errs.append(('document_id','invalid format')) 458 else: 459 if not document_id.startswith('d'): 460 errs.append(('document_id','invalid format')) 455 461 return errs, inv_errs, conv_dict -
main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/sample_document_data.csv
r12005 r12054 1 document_id,reg_number,title2 d3,1,My first doc3 d4,2,My second doc4 d5,3,My third doc5 ,1,My 4th doc6 d5,2,My stolen doc1 classname,document_id,reg_number,title 2 CustomerDocument,d3,1,My first doc 3 CustomerDocument,d4,2,My second doc 4 CustomerDocument,d5,3,My third doc 5 CustomerDocument,,1,My 4th doc 6 CustomerDocument,d5,2,My stolen doc -
main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py
r12006 r12054 318 318 def test_checkConversion(self): 319 319 errs, inv_errs, conv_dict = self.processor.checkConversion( 320 dict(document_id='d1266236341955' ))320 dict(document_id='d1266236341955', classname='CustomerDocument')) 321 321 self.assertEqual(len(errs),0) 322 322 errs, inv_errs, conv_dict = self.processor.checkConversion( 323 dict(document_id='nonsense' ))323 dict(document_id='nonsense', classname='CustomerDocument')) 324 324 self.assertEqual(len(errs),1) 325 325 timestamp = ("%d" % int(time()*10000))[1:] 326 326 document_id = "d%s" % timestamp 327 327 errs, inv_errs, conv_dict = self.processor.checkConversion( 328 dict(document_id=document_id ))328 dict(document_id=document_id, classname='CustomerDocument')) 329 329 self.assertEqual(len(errs),0) 330 errs, inv_errs, conv_dict = self.processor.checkConversion( 331 dict(document_id=document_id, classname='WrongDocument')) 332 self.assertEqual(len(errs),1) 330 333 331 334 def test_import(self): … … 336 339 fail_file = open(fail_file).read() 337 340 self.assertEqual(fail_file, 338 ' reg_number,document_id,title,--ERRORS--\r\n'339 ' 2,d5,My stolen doc,document_id: id exists\r\n')341 'classname,reg_number,document_id,title,--ERRORS--\r\n' 342 'CustomerDocument,2,d5,My stolen doc,document_id: id exists\r\n') 340 343 document = self.processor.getEntry(dict(reg_number='1', 341 344 document_id='d3'), self.app)
Note: See TracChangeset for help on using the changeset viewer.