Changeset 9211 for main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/tests/test_batching.py
- Timestamp:
- 21 Sep 2012, 08:19:35 (12 years ago)
- Location:
- main/waeup.kofa/branches/uli-zc-async
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/branches/uli-zc-async
- Property svn:mergeinfo changed
/main/waeup.kofa/branches/uli-async-update removed /main/waeup.kofa/branches/uli-autoinclude-less removed /main/waeup.kofa/trunk removed
- Property svn:mergeinfo changed
-
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/tests/test_batching.py
r9209 r9211 19 19 # Tests for university related batching 20 20 import unittest 21 import tempfile 22 import shutil 23 import os 24 from zope.component.hooks import setSite, clearSite 25 from zope.component import createObject 26 from zope.securitypolicy.interfaces import ( 27 IPrincipalRoleMap, IPrincipalRoleManager) 28 from zope.testbrowser.testing import Browser 21 29 22 from zope.interface.verify import verifyClass, verifyObject 30 23 from waeup.kofa.interfaces import IBatchProcessor 31 24 from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer 32 from waeup.kofa.app import University33 25 from waeup.kofa.university.batching import ( 34 26 FacultyProcessor, DepartmentProcessor, CourseProcessor, … … 36 28 from waeup.kofa.university.certificate import Certificate, CertificateCourse 37 29 from waeup.kofa.university.course import Course 38 from waeup.kofa.university import Faculty, Department 39 from waeup.kofa.university.batching import FacultyProcessor 40 41 FACULTY_SAMPLE_DATA = open( 42 os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'), 43 'rb').read() 44 45 FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split( 46 '\n')[0].split(',') 47 48 FACULTY_SAMPLE_DATA_UPDATE = open( 49 os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'), 50 'rb').read() 51 52 FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split( 53 '\n')[0].split(',') 54 55 DEPARTMENT_SAMPLE_DATA = open( 56 os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'), 57 'rb').read() 58 59 DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split( 60 '\n')[0].split(',') 61 62 CERTIFICATE_SAMPLE_DATA = open( 63 os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'), 64 'rb').read() 65 66 CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split( 67 '\n')[0].split(',') 68 69 COURSE_SAMPLE_DATA = open( 70 os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'), 71 'rb').read() 72 73 COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split( 74 '\n')[0].split(',') 75 76 class UniversityProcessorSetup(FunctionalTestCase): 77 78 layer = FunctionalLayer 79 80 def setUp(self): 81 super(UniversityProcessorSetup, self).setUp() 82 self.dc_root = tempfile.mkdtemp() 83 self.workdir = tempfile.mkdtemp() 84 app = University() 85 self.getRootFolder()['app'] = app 86 self.app = self.getRootFolder()['app'] 87 setSite(app) 88 89 self.app['users'].addUser('bob', 'bobssecret') 90 self.app['users'].addUser('anne', 'annessecret') 91 92 # Populate university 93 self.certificate = createObject('waeup.Certificate') 94 self.certificate.code = 'CRT1' 95 self.app['faculties']['FAC1'] = Faculty(code='FAC1') 96 self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1') 97 self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate( 98 self.certificate) 99 self.course = createObject('waeup.Course') 100 self.course.code = 'CRS1' 101 self.app['faculties']['FAC1']['DEP1'].courses.addCourse( 102 self.course) 103 #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse( 104 # self.course, level=100) 105 106 self.logfile = os.path.join( 107 self.app['datacenter'].storage, 'logs', 'main.log') 108 return 109 return 110 111 def tearDown(self): 112 super(UniversityProcessorSetup, self).tearDown() 113 shutil.rmtree(self.workdir) 114 shutil.rmtree(self.dc_root) 115 clearSite() 116 return 117 118 class TestFacultyProcessor(UniversityProcessorSetup): 119 120 def setUp(self): 121 super(TestFacultyProcessor, self).setUp() 122 123 self.browser = Browser() 124 self.browser.handleErrors = False 125 self.datacenter_path = 'http://localhost/app/datacenter' 126 self.app['datacenter'].setStoragePath(self.dc_root) 127 30 from waeup.kofa.university.department import Department 31 32 33 class TestFacultyProcessor(unittest.TestCase): 34 35 def setUp(self): 128 36 self.proc = FacultyProcessor() 129 37 self.site1 = dict(faculties=dict()) 130 38 self.site2 = dict(faculties=dict(FAC='pseudo faculty')) 131 39 self.row = dict(code='FAC') 132 133 self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')134 open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)135 self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')136 open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)137 40 return 138 41 … … 157 60 return 158 61 159 def test_checkConversion(self): 160 # Make sure we can check conversions. 161 errs, inv_errs, conv_dict = self.proc.checkConversion( 162 dict(faculty_code='ABC', local_roles='[]')) 163 self.assertEqual(len(errs),0) 164 165 errs, inv_errs, conv_dict = self.proc.checkConversion( 166 dict(faculty_code='ABC', 167 local_roles="['nonsense'" 168 )) 169 self.assertEqual(len(errs),1) 170 self.assertEqual(errs, [('local_roles', 'Error')]) 171 errs, inv_errs, conv_dict = self.proc.checkConversion( 172 dict(faculty_code='ABC', 173 local_roles="('abc')" 174 )) 175 self.assertEqual(len(errs),1) 176 self.assertEqual(errs, [('local_roles', 'no list')]) 177 errs, inv_errs, conv_dict = self.proc.checkConversion( 178 dict(faculty_code='ABC', 179 local_roles="[('ABC')]" 180 )) 181 self.assertEqual(len(errs),1) 182 self.assertEqual(errs, [('local_roles', 'no dicts')]) 183 errs, inv_errs, conv_dict = self.proc.checkConversion( 184 dict(faculty_code='ABC', 185 local_roles="('abc')" 186 )) 187 self.assertEqual(len(errs),1) 188 self.assertEqual(errs, [('local_roles', 'no list')]) 189 errs, inv_errs, conv_dict = self.proc.checkConversion( 190 dict(faculty_code='ABC', local_roles= 191 "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]" 192 )) 193 self.assertEqual(len(errs),1) 194 self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')]) 195 errs, inv_errs, conv_dict = self.proc.checkConversion( 196 dict(faculty_code='ABC', local_roles= 197 "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]" 198 )) 199 self.assertEqual(len(errs),1) 200 self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')]) 201 errs, inv_errs, conv_dict = self.proc.checkConversion( 202 dict(faculty_code='ABC', local_roles= 203 "[{'user_name':'bob','local_role':'waeup.local.Boss'},]" 204 )) 205 self.assertEqual(len(errs),1) 206 self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')]) 207 errs, inv_errs, conv_dict = self.proc.checkConversion( 208 dict(faculty_code='ABC', local_roles= 209 "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]" 210 )) 211 self.assertEqual(len(errs),1) 212 self.assertEqual(errs, [('local_roles', 'john does not exist')]) 213 errs, inv_errs, conv_dict = self.proc.checkConversion( 214 dict(faculty_code='ABC', local_roles= 215 "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]" 216 )) 217 self.assertEqual(len(errs),0) 218 return 219 220 def test_import(self): 221 num, num_warns, fin_file, fail_file = self.proc.doImport( 222 self.csv_file_faculty, FACULTY_HEADER_FIELDS) 223 content = open(fail_file).read() 224 self.assertEqual(num_warns,5) 225 self.assertEqual( 226 content, 227 'code,local_roles,--ERRORS--\r\n' 228 'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",' 229 'local_roles: alice does not exist\r\n' 230 'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",' 231 'local_roles: waeup.local.Boss not allowed\r\n' 232 'EFG,[(\'anything\')],local_roles: no dicts\r\n' 233 'FGH,[,local_roles: Error\r\n' 234 'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",' 235 'local_roles: user_name or local_role missing\r\n' 236 ) 237 # Bob got a local role in faculty ABC. 238 abc = self.app['faculties']['ABC'] 239 role_map = IPrincipalRoleMap(abc) 240 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 241 self.assertEqual(user_name, 'bob') 242 self.assertEqual(local_role, 'waeup.local.DepartmentManager') 243 shutil.rmtree(os.path.dirname(fin_file)) 244 return 245 246 def test_import_update(self): 247 self.app['faculties']['FAC2'] = Faculty(code='FAC2') 248 self.app['faculties']['FAC3'] = Faculty(code='FAC3') 249 self.app['faculties']['FAC4'] = Faculty(code='FAC4') 250 251 role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1']) 252 role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons') 253 role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1']) 254 self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1) 255 256 role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2']) 257 role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons') 258 role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2']) 259 self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1) 260 261 role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3']) 262 role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons') 263 role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3']) 264 self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1) 265 266 role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4']) 267 role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons') 268 role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4']) 269 self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1) 270 271 local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0] 272 self.assertEqual(user_name, 'alfons') 273 self.assertEqual(local_role, 'alfonsrole') 274 275 num, num_warns, fin_file, fail_file = self.proc.doImport( 276 self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update') 277 self.assertEqual(num_warns,0) 278 # Local roles have been removed in FAC1 due to deletion marker. 279 self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0) 280 # Old local roles have been removed and new roles have been added in FAC2. 281 self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1) 282 local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0] 283 self.assertEqual(user_name, 'bob') 284 self.assertEqual(local_role, 'waeup.local.DepartmentManager') 285 # Local roles are not touched in FAC3 due to ignore marker. 286 self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1) 287 local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0] 288 self.assertEqual(user_name, 'alfons') 289 self.assertEqual(local_role, 'alfonsrole') 290 # Local roles are not touched in FAC4 due to empty cell. 291 self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1) 292 local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0] 293 self.assertEqual(user_name, 'alfons') 294 self.assertEqual(local_role, 'alfonsrole') 295 shutil.rmtree(os.path.dirname(fin_file)) 296 return 297 298 def test_import_update_logging(self): 299 self.app['faculties']['FAC2'] = Faculty(code='FAC2') 300 self.app['faculties']['FAC3'] = Faculty(code='FAC3') 301 self.app['faculties']['FAC4'] = Faculty(code='FAC4') 302 num, num_warns, fin_file, fail_file = self.proc.doImport( 303 self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update') 304 self.assertEqual(num_warns,0) 305 logcontent = open(self.logfile).read() 306 # Logging message from updateEntry 307 self.assertTrue( 308 'INFO - system - Department Processor - DEP2 - ' 309 'Record updated: local_roles=anne|waeup.local.DepartmentManager, ' 310 'code=DEP2' 311 in logcontent) 312 313 def test_upload_import_reupload(self): 314 self.browser.addHeader('Authorization', 'Basic mgr:mgrpw') 315 self.browser.open(self.datacenter_path) 316 self.assertEqual(self.browser.headers['Status'], '200 Ok') 317 self.assertEqual(self.browser.url, self.datacenter_path) 318 self.browser.getLink("Upload data").click() 319 file = open(self.csv_file_faculty) 320 ctrl = self.browser.getControl(name='uploadfile:file') 321 file_ctrl = ctrl.mech_control 322 file_ctrl.add_file(file, filename='sample_faculty_data.csv') 323 self.browser.getControl('Upload').click() 324 self.browser.getLink('Process data').click() 325 self.browser.getControl(name="select").click() 326 importerselect = self.browser.getControl(name='importer') 327 importerselect.getControl('Faculty Processor').selected = True 328 modeselect = self.browser.getControl(name='mode') 329 modeselect.getControl(value='create').selected = True 330 self.browser.getControl('Proceed to step 3').click() 331 self.assertTrue('Header fields OK' in self.browser.contents) 332 self.browser.getControl('Perform import').click() 333 self.assertTrue('Successfully processed 1 rows' in self.browser.contents) 334 # We import the same file a second time. 335 self.browser.open(self.datacenter_path) 336 self.browser.getLink("Upload data").click() 337 file = open(self.csv_file_faculty) 338 ctrl = self.browser.getControl(name='uploadfile:file') 339 file_ctrl = ctrl.mech_control 340 file_ctrl.add_file(file, filename='sample_faculty_data.csv') 341 self.browser.getControl('Upload').click() 342 self.assertTrue( 343 'File with same name was uploaded earlier' in self.browser.contents) 344 return 345 346 class TestDepartmentProcessor(UniversityProcessorSetup): 347 348 def setUp(self): 349 super(TestDepartmentProcessor, self).setUp() 62 class TestDepartmentProcessors(unittest.TestCase): 63 64 def setUp(self): 350 65 self.proc = DepartmentProcessor() 351 66 self.site0 = dict() … … 354 69 self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department'))) 355 70 self.row = dict(code='DPT', faculty_code='FAC') 356 357 self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')358 open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)359 71 return 360 72 … … 403 115 return 404 116 405 def test_import(self): 406 num, num_warns, fin_file, fail_file = self.proc.doImport( 407 self.csv_file_department, DEPARTMENT_HEADER_FIELDS) 408 content = open(fail_file).read() 409 self.assertEqual(num_warns,6) 410 self.assertEqual( 411 content, 412 'faculty_code,code,local_roles,--ERRORS--\r\n' 413 'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",' 414 'local_roles: alice does not exist\r\n' 415 'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",' 416 'local_roles: waeup.local.Boss not allowed\r\n' 417 'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n' 418 'FAC1,DEP2,[,local_roles: Error\r\n' 419 'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",' 420 'local_roles: user_name or local_role missing\r\n' 421 'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",' 422 'Not all parents do exist yet. Skipping\r\n' 423 ) 424 # Anne got a local role in department DEP2. 425 dep = self.app['faculties']['FAC1']['DEP2'] 426 role_map = IPrincipalRoleMap(dep) 427 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 428 self.assertEqual(user_name, 'anne') 429 self.assertEqual(local_role, 'waeup.local.DepartmentManager') 430 shutil.rmtree(os.path.dirname(fin_file)) 431 return 432 433 class CourseProcessorTests(UniversityProcessorSetup): 117 class CourseProcessorTests(FunctionalTestCase): 118 119 layer = FunctionalLayer 434 120 435 121 def setUp(self): 436 122 super(CourseProcessorTests, self).setUp() 437 123 self.proc = CourseProcessor() 438 self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1") 439 self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1") 440 self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2") 441 self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1") 442 443 self.csv_file_course = os.path.join(self.workdir, 'sample_course_data.csv') 444 open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA) 124 self.site0 = dict() 125 self.site1 = dict(faculties=dict()) 126 self.site2 = dict(faculties=dict(FAC=dict())) 127 self.department1 = Department(code='DPT') 128 self.department2 = Department(code='DPT') 129 self.course = Course(code='CRS') 130 self.department2.courses['CRS'] = self.course 131 self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1))) 132 self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2))) 133 self.row = dict(department_code='DPT', faculty_code='FAC', code="CRS") 445 134 return 446 135 … … 453 142 def test_parents_exist(self): 454 143 # make sure we lookup parents correctly 455 result1 = self.proc.parentsExist(self.row1, self.app) 456 result2 = self.proc.parentsExist(self.row2, self.app) 457 result3 = self.proc.parentsExist(self.row3, self.app) 144 result0 = self.proc.parentsExist(self.row, self.site0) 145 result1 = self.proc.parentsExist(self.row, self.site1) 146 result2 = self.proc.parentsExist(self.row, self.site2) 147 result3 = self.proc.parentsExist(self.row, self.site3) 148 result4 = self.proc.parentsExist(self.row, self.site4) 149 self.assertTrue(result0 is False) 458 150 self.assertTrue(result1 is False) 459 151 self.assertTrue(result2 is False) 460 152 self.assertTrue(result3 is True) 153 self.assertTrue(result4 is True) 461 154 return 462 155 463 156 def test_entry_exists(self): 464 157 # make sure we find an entry if it exists 465 result1 = self.proc.entryExists(self.row1, self.app) 466 result2 = self.proc.entryExists(self.row2, self.app) 467 result3 = self.proc.entryExists(self.row3, self.app) 468 result4 = self.proc.entryExists(self.row4, self.app) 158 result0 = self.proc.entryExists(self.row, self.site0) 159 result1 = self.proc.entryExists(self.row, self.site1) 160 result2 = self.proc.entryExists(self.row, self.site2) 161 result3 = self.proc.entryExists(self.row, self.site3) 162 result4 = self.proc.entryExists(self.row, self.site4) 163 self.assertTrue(result0 is False) 469 164 self.assertTrue(result1 is False) 470 165 self.assertTrue(result2 is False) … … 475 170 def test_get_entry(self): 476 171 # make sure we can get an entry if it exists 477 result1 = self.proc.getEntry(self.row 1, self.app)478 result2 = self.proc.getEntry(self.row 4, self.app)172 result1 = self.proc.getEntry(self.row, self.site3) 173 result2 = self.proc.getEntry(self.row, self.site4) 479 174 self.assertTrue(result1 is None) 480 175 self.assertTrue(result2 is self.course) … … 483 178 def test_del_entry(self): 484 179 # make sure we can delete entries 485 self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys()) 486 self.proc.delEntry(self.row4, self.app) 487 self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys()) 488 return 489 490 def test_import(self): 491 num, num_warns, fin_file, fail_file = self.proc.doImport( 492 self.csv_file_course, COURSE_HEADER_FIELDS) 493 content = open(fail_file).read() 494 self.assertEqual(num_warns,6) 495 self.assertEqual( 496 content, 497 'faculty_code,department_code,code,local_roles,--ERRORS--\r\n' 498 'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",' 499 'local_roles: alice does not exist\r\n' 500 'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",' 501 'local_roles: waeup.local.Boss not allowed\r\n' 502 'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n' 503 'FAC1,DEP1,CRS2,[,local_roles: Error\r\n' 504 'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",' 505 'local_roles: user_name or local_role missing\r\n' 506 'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",' 507 'Not all parents do exist yet. Skipping\r\n' 508 ) 509 # Anne got a local role in course CRS2. 510 dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2'] 511 role_map = IPrincipalRoleMap(dep) 512 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 513 self.assertEqual(user_name, 'anne') 514 self.assertEqual(local_role, 'waeup.local.Lecturer') 515 shutil.rmtree(os.path.dirname(fin_file)) 516 return 517 518 519 class CertificateProcessorTests(UniversityProcessorSetup): 180 self.assertTrue('CRS' in self.department2.courses.keys()) 181 self.proc.delEntry(self.row, self.site4) 182 self.assertTrue('CRS' not in self.department2.courses.keys()) 183 return 184 185 class CertificateProcessorTests(FunctionalTestCase): 186 187 layer = FunctionalLayer 520 188 521 189 def setUp(self): 522 190 super(CertificateProcessorTests, self).setUp() 523 191 self.proc = CertificateProcessor() 524 self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1") 525 self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1") 526 self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2") 527 self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1") 528 529 self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv') 530 open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA) 531 return 532 533 def test_ifaces(self): 534 # Make sure we fullfill all interface contracts 535 verifyClass(IBatchProcessor, CourseProcessor) 192 self.site0 = dict() 193 self.site1 = dict(faculties=dict()) 194 self.site2 = dict(faculties=dict(FAC=dict())) 195 self.department1 = Department(code='DPT') 196 self.department2 = Department(code='DPT') 197 self.certificate = Certificate(code='CRT') 198 self.department2.certificates['CRT'] = self.certificate 199 self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1))) 200 self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2))) 201 self.row = dict(department_code='DPT', faculty_code='FAC', code="CRT") 202 return 203 204 def test_ifaces(self): 205 # Make sure we fullfill all interface contracts 206 verifyClass(IBatchProcessor, CertificateProcessor) 536 207 verifyObject(IBatchProcessor, self.proc) 537 208 return 538 209 539 210 def test_parents_exist(self): 540 # make sure we lookup parents correctly 541 result1 = self.proc.parentsExist(self.row1, self.app) 542 result2 = self.proc.parentsExist(self.row2, self.app) 543 result3 = self.proc.parentsExist(self.row3, self.app) 211 # make sure we can find all certificate parents 212 result0 = self.proc.parentsExist(self.row, self.site0) 213 result1 = self.proc.parentsExist(self.row, self.site1) 214 result2 = self.proc.parentsExist(self.row, self.site2) 215 result3 = self.proc.parentsExist(self.row, self.site3) 216 result4 = self.proc.parentsExist(self.row, self.site4) 217 self.assertTrue(result0 is False) 544 218 self.assertTrue(result1 is False) 545 219 self.assertTrue(result2 is False) 546 220 self.assertTrue(result3 is True) 221 self.assertTrue(result4 is True) 547 222 return 548 223 549 224 def test_entry_exists(self): 550 225 # make sure we find an entry if it exists 551 result1 = self.proc.entryExists(self.row1, self.app) 552 result2 = self.proc.entryExists(self.row2, self.app) 553 result3 = self.proc.entryExists(self.row3, self.app) 554 result4 = self.proc.entryExists(self.row4, self.app) 226 result0 = self.proc.entryExists(self.row, self.site0) 227 result1 = self.proc.entryExists(self.row, self.site1) 228 result2 = self.proc.entryExists(self.row, self.site2) 229 result3 = self.proc.entryExists(self.row, self.site3) 230 result4 = self.proc.entryExists(self.row, self.site4) 231 self.assertTrue(result0 is False) 555 232 self.assertTrue(result1 is False) 556 233 self.assertTrue(result2 is False) … … 561 238 def test_get_entry(self): 562 239 # make sure we can get an entry if it exists 563 result1 = self.proc.getEntry(self.row 1, self.app)564 result2 = self.proc.getEntry(self.row 4, self.app)240 result1 = self.proc.getEntry(self.row, self.site3) 241 result2 = self.proc.getEntry(self.row, self.site4) 565 242 self.assertTrue(result1 is None) 566 243 self.assertTrue(result2 is self.certificate) … … 569 246 def test_del_entry(self): 570 247 # make sure we can delete entries 571 self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys()) 572 self.proc.delEntry(self.row4, self.app) 573 self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys()) 574 return 575 576 def test_import(self): 577 num, num_warns, fin_file, fail_file = self.proc.doImport( 578 self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS) 579 content = open(fail_file).read() 580 self.assertEqual(num_warns,6) 581 self.assertEqual( 582 content, 583 'faculty_code,department_code,code,local_roles,--ERRORS--\r\n' 584 'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",' 585 'local_roles: alice does not exist\r\n' 586 'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",' 587 'local_roles: waeup.local.Boss not allowed\r\n' 588 'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n' 589 'FAC1,DEP1,CRT2,[,local_roles: Error\r\n' 590 'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",' 591 'local_roles: user_name or local_role missing\r\n' 592 'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",' 593 'Not all parents do exist yet. Skipping\r\n' 594 ) 595 # Anne got a local role in certificate CRT2. 596 dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2'] 597 role_map = IPrincipalRoleMap(dep) 598 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 599 self.assertEqual(user_name, 'anne') 600 self.assertEqual(local_role, 'waeup.local.CourseAdviser100') 601 shutil.rmtree(os.path.dirname(fin_file)) 602 603 logcontent = open(self.logfile).read() 604 # Logging message from updateEntry 605 self.assertTrue( 606 'INFO - system - Certificate Processor - CRT2 - Record updated: ' 607 'local_roles=anne|waeup.local.CourseAdviser100, code=CRT2' 608 in logcontent) 609 610 return 611 248 self.assertTrue('CRT' in self.department2.certificates.keys()) 249 self.proc.delEntry(self.row, self.site4) 250 self.assertTrue('CRT' not in self.department2.certificates.keys()) 251 return 612 252 613 253 class CertCourseProcessorTests(FunctionalTestCase): … … 626 266 self.department2.certificates['CRT'] = self.certificate 627 267 self.course = Course(code='CRS') 628 self.department2.certificates['CRT'].addC ertCourse(self.course)268 self.department2.certificates['CRT'].addCourseRef(self.course) 629 269 self.cert_course = self.certificate['CRS_100'] 630 270 self.department2.courses['CRS'] = self.course
Note: See TracChangeset for help on using the changeset viewer.