Changeset 7623 for main/waeup.sirp/trunk/src/waeup/sirp/students
- Timestamp:
- 10 Feb 2012, 12:08:47 (13 years ago)
- Location:
- main/waeup.sirp/trunk/src/waeup/sirp/students
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.sirp/trunk/src/waeup/sirp/students/batching.py
r7612 r7623 38 38 IStudent, IStudentStudyCourse, 39 39 IStudentUpdateByRegNo, IStudentUpdateByMatricNo, 40 IStudentStudyLevel, ICourseTicket) 40 IStudentStudyLevel, ICourseTicket, 41 IStudentOnlinePayment) 41 42 from waeup.sirp.students.workflow import IMPORTABLE_STATES 42 43 from waeup.sirp.utils.batching import BatchProcessor … … 510 511 return errs, inv_errs, conv_dict 511 512 return errs, inv_errs, conv_dict 513 514 class StudentOnlinePaymentProcessor(BatchProcessor): 515 """A batch processor for IStudentOnlinePayment objects. 516 """ 517 grok.implements(IBatchProcessor) 518 grok.provides(IBatchProcessor) 519 grok.context(Interface) 520 util_name = 'paymentimporter' 521 grok.name(util_name) 522 523 name = u'Payment Importer' 524 iface = IStudentOnlinePayment 525 factory_name = 'waeup.StudentOnlinePayment' 526 527 location_fields = [] 528 529 mode = None 530 531 @property 532 def available_fields(self): 533 return sorted(list(set( 534 ['student_id','reg_number','matric_number','p_id'] + getFields( 535 self.iface).keys()))) 536 537 def checkHeaders(self, headerfields, mode='ignore'): 538 if not 'reg_number' in headerfields and not 'student_id' \ 539 in headerfields and not 'matric_number' in headerfields: 540 raise FatalCSVError( 541 "Need at least columns student_id " + 542 "or reg_number or matric_number for import!") 543 if not 'p_id' in headerfields: 544 raise FatalCSVError( 545 "Need p_id for import!") 546 # Check for fields to be ignored... 547 not_ignored_fields = [x for x in headerfields 548 if not x.startswith('--')] 549 if len(set(not_ignored_fields)) < len(not_ignored_fields): 550 raise FatalCSVError( 551 "Double headers: each column name may only appear once.") 552 return True 553 554 def getParent(self, row, site): 555 if not 'students' in site.keys(): 556 return None 557 if 'student_id' in row.keys() and row['student_id']: 558 if row['student_id'] in site['students']: 559 student = site['students'][row['student_id']] 560 return student['payments'] 561 elif 'reg_number' in row.keys() and row['reg_number']: 562 reg_number = row['reg_number'] 563 #import pdb; pdb.set_trace() 564 cat = queryUtility(ICatalog, name='students_catalog') 565 results = list( 566 cat.searchResults(reg_number=(reg_number, reg_number))) 567 if results: 568 return results[0]['payments'] 569 elif 'matric_number' in row.keys() and row['matric_number']: 570 matric_number = row['matric_number'] 571 cat = queryUtility(ICatalog, name='students_catalog') 572 results = list( 573 cat.searchResults(matric_number=(matric_number, matric_number))) 574 if results: 575 return results[0]['payments'] 576 return None 577 578 def parentsExist(self, row, site): 579 return self.getParent(row, site) is not None 580 581 def entryExists(self, row, site): 582 return self.getEntry(row, site) is not None 583 584 def getEntry(self, row, site): 585 payments = self.getParent(row, site) 586 if payments is None: 587 return None 588 if row['p_id'].startswith('p'): 589 entry = payments.get(row['p_id']) 590 else: 591 # For data migration from old SRP 592 entry = payments.get('p' + row['p_id'][6:]) 593 return entry 594 595 def addEntry(self, obj, row, site): 596 parent = self.getParent(row, site) 597 if not row['p_id'].startswith('p'): 598 # For data migration from old SRP 599 obj.p_id = 'p' + row['p_id'][6:] 600 parent[obj.p_id] = obj 601 else: 602 parent[row['p_id']] = obj 603 return 604 605 def checkConversion(self, row, mode='ignore'): 606 """Validates all values in row. 607 """ 608 converter = IObjectConverter(self.iface) 609 errs, inv_errs, conv_dict = converter.fromStringDict( 610 row, self.factory_name) 611 # We have to check p_id. 612 if row['p_id'].startswith('p'): 613 if not len(row['p_id']) == 14: 614 errs.append(('p_id','invalid length')) 615 return errs, inv_errs, conv_dict 616 else: 617 if not len(row['p_id']) == 19: 618 errs.append(('p_id','invalid length')) 619 return errs, inv_errs, conv_dict 620 return errs, inv_errs, conv_dict -
main/waeup.sirp/trunk/src/waeup/sirp/students/tests/test_batching.py
r7548 r7623 23 23 import unittest 24 24 import datetime 25 from time import time 25 26 from zope.component import createObject 26 27 from zope.component.hooks import setSite, clearSite … … 32 33 from waeup.sirp.students.batching import ( 33 34 StudentProcessor, StudentStudyCourseProcessor, 34 StudentStudyLevelProcessor, CourseTicketProcessor) 35 StudentStudyLevelProcessor, CourseTicketProcessor, 36 StudentOnlinePaymentProcessor) 35 37 from waeup.sirp.students.student import Student 36 38 from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase … … 84 86 85 87 COURSETICKET_HEADER_FIELDS = COURSETICKET_SAMPLE_DATA.split( 88 '\n')[0].split(',') 89 90 PAYMENT_SAMPLE_DATA = open( 91 os.path.join(os.path.dirname(__file__), 'sample_payment_data.csv'), 92 'rb').read() 93 94 PAYMENT_HEADER_FIELDS = PAYMENT_SAMPLE_DATA.split( 86 95 '\n')[0].split(',') 87 96 … … 524 533 shutil.rmtree(os.path.dirname(fin_file)) 525 534 535 class PaymentImporterTest(FunctionalTestCase): 536 537 layer = FunctionalLayer 538 539 def setUp(self): 540 super(PaymentImporterTest, self).setUp() 541 self.dc_root = tempfile.mkdtemp() 542 self.workdir = tempfile.mkdtemp() 543 app = University() 544 app['datacenter'].setStoragePath(self.dc_root) 545 self.getRootFolder()['app'] = app 546 self.app = self.getRootFolder()['app'] 547 setSite(app) 548 549 # Add student with payment 550 student = Student() 551 student.firstname = u'Anna' 552 student.lastname = u'Tester' 553 student.reg_number = u'123' 554 student.matric_number = u'234' 555 self.app['students'].addStudent(student) 556 self.student = self.app['students'][student.student_id] 557 payment = createObject(u'waeup.StudentOnlinePayment') 558 payment.p_id = 'p123' 559 self.student['payments'][payment.p_id] = payment 560 561 # Import students with subobjects 562 student_file = os.path.join(self.workdir, 'sample_student_data.csv') 563 open(student_file, 'wb').write(STUDENT_SAMPLE_DATA) 564 num, num_warns, fin_file, fail_file = StudentProcessor().doImport( 565 student_file, STUDENT_HEADER_FIELDS) 566 shutil.rmtree(os.path.dirname(fin_file)) 567 568 self.importer = StudentOnlinePaymentProcessor() 569 self.csv_file = os.path.join( 570 self.workdir, 'sample_payment_data.csv') 571 open(self.csv_file, 'wb').write(PAYMENT_SAMPLE_DATA) 572 573 def tearDown(self): 574 super(PaymentImporterTest, self).tearDown() 575 shutil.rmtree(self.workdir) 576 shutil.rmtree(self.dc_root) 577 clearSite() 578 return 579 580 def test_interface(self): 581 # Make sure we fulfill the interface contracts. 582 assert verifyObject(IBatchProcessor, self.importer) is True 583 assert verifyClass( 584 IBatchProcessor, StudentOnlinePaymentProcessor) is True 585 586 def test_getEntry(self): 587 assert self.importer.getEntry( 588 dict(student_id='ID_NONE', p_id='nonsense'), self.app) is None 589 assert self.importer.getEntry( 590 dict(student_id=self.student.student_id, p_id='p123'), 591 self.app) is self.student['payments']['p123'] 592 assert self.importer.getEntry( 593 dict(student_id=self.student.student_id, p_id='XXXXXX123'), 594 self.app) is self.student['payments']['p123'] 595 596 def test_addEntry(self): 597 self.assertEqual(len(self.student['payments'].keys()),1) 598 payment1 = createObject(u'waeup.StudentOnlinePayment') 599 payment1.p_id = 'p234' 600 self.importer.addEntry( 601 payment1, dict(student_id=self.student.student_id, p_id='p234'), 602 self.app) 603 self.assertEqual(len(self.student['payments'].keys()),2) 604 self.assertEqual(self.student['payments']['p234'].p_id, 'p234') 605 payment2 = createObject(u'waeup.StudentOnlinePayment') 606 payment1.p_id = 'nonsense' 607 # payment1.p_id will be replaced if p_id doesn't start with 'p' 608 self.importer.addEntry( 609 payment2, dict(student_id=self.student.student_id, p_id='XXXXXX456'), 610 self.app) 611 self.assertEqual(len(self.student['payments'].keys()),3) 612 self.assertEqual(self.student['payments']['p456'].p_id, 'p456') 613 614 def test_checkConversion(self): 615 errs, inv_errs, conv_dict = self.importer.checkConversion( 616 dict(reg_number='1', p_id='3816951266236341955')) 617 self.assertEqual(len(errs),0) 618 errs, inv_errs, conv_dict = self.importer.checkConversion( 619 dict(reg_number='1', p_id='p1266236341955')) 620 self.assertEqual(len(errs),0) 621 errs, inv_errs, conv_dict = self.importer.checkConversion( 622 dict(reg_number='1', p_id='nonsense')) 623 self.assertEqual(len(errs),1) 624 timestamp = "%d" % int(time()*1000) 625 p_id = "p%s" % timestamp 626 errs, inv_errs, conv_dict = self.importer.checkConversion( 627 dict(reg_number='1', p_id=p_id)) 628 self.assertEqual(len(errs),0) 629 630 def test_import(self): 631 num, num_warns, fin_file, fail_file = self.importer.doImport( 632 self.csv_file, PAYMENT_HEADER_FIELDS,'create') 633 self.assertEqual(num_warns,0) 634 payment = self.importer.getEntry(dict(reg_number='1', 635 p_id='p1290797973744'), self.app) 636 self.assertEqual(payment.p_id, 'p1290797973744') 637 cdate = payment.creation_date.strftime("%Y-%m-%d %H:%M:%S") 638 self.assertEqual(cdate, "2010-11-26 19:59:33") 639 shutil.rmtree(os.path.dirname(fin_file)) 526 640 527 641 def test_suite(): … … 529 643 for testcase in [ 530 644 StudentImporterTest,StudentStudyCourseImporterTest, 531 StudentStudyLevelImporterTest,CourseTicketImporterTest,]: 645 StudentStudyLevelImporterTest,CourseTicketImporterTest, 646 PaymentImporterTest,]: 532 647 suite.addTest(unittest.TestLoader().loadTestsFromTestCase( 533 648 testcase
Note: See TracChangeset for help on using the changeset viewer.