source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 9324

Last change on this file since 9324 was 9293, checked in by Henrik Bettermann, 12 years ago

Update current level_verdict when importing verdicts.

  • Property svn:keywords set to Id
File size: 29.8 KB
Line 
1## $Id: test_batching.py 9293 2012-10-04 14:56:44Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import (
27    IPrincipalRoleMap, IPrincipalRoleManager)
28from zope.testbrowser.testing import Browser
29from zope.interface.verify import verifyClass, verifyObject
30from waeup.kofa.interfaces import IBatchProcessor
31from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
32from waeup.kofa.app import University
33from waeup.kofa.university.batching import (
34    FacultyProcessor, DepartmentProcessor, CourseProcessor,
35    CertificateProcessor, CertificateCourseProcessor)
36from waeup.kofa.university.certificate import Certificate, CertificateCourse
37from waeup.kofa.university.course import Course
38from waeup.kofa.university import Faculty, Department
39from waeup.kofa.university.batching import FacultyProcessor
40
41FACULTY_SAMPLE_DATA = open(
42    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
43    'rb').read()
44
45FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
46    '\n')[0].split(',')
47
48FACULTY_SAMPLE_DATA_UPDATE = open(
49    os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'),
50    'rb').read()
51
52FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split(
53    '\n')[0].split(',')
54
55DEPARTMENT_SAMPLE_DATA = open(
56    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
57    'rb').read()
58
59DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
60    '\n')[0].split(',')
61
62CERTIFICATE_SAMPLE_DATA = open(
63    os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'),
64    'rb').read()
65
66CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split(
67    '\n')[0].split(',')
68
69COURSE_SAMPLE_DATA = open(
70    os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'),
71    'rb').read()
72
73COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split(
74    '\n')[0].split(',')
75
76class UniversityProcessorSetup(FunctionalTestCase):
77
78    layer = FunctionalLayer
79
80    def setUp(self):
81        super(UniversityProcessorSetup, self).setUp()
82        self.dc_root = tempfile.mkdtemp()
83        self.workdir = tempfile.mkdtemp()
84        app = University()
85        self.getRootFolder()['app'] = app
86        self.app = self.getRootFolder()['app']
87        setSite(app)
88
89        self.app['users'].addUser('bob', 'bobssecret')
90        self.app['users'].addUser('anne', 'annessecret')
91
92        # Populate university
93        self.certificate = createObject('waeup.Certificate')
94        self.certificate.code = 'CRT1'
95        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
96        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
97        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
98            self.certificate)
99        self.course = createObject('waeup.Course')
100        self.course.code = 'CRS1'
101        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
102            self.course)
103        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
104        #    self.course, level=100)
105       
106        self.logfile = os.path.join(
107            self.app['datacenter'].storage, 'logs', 'main.log')
108        return
109
110    def tearDown(self):
111        super(UniversityProcessorSetup, self).tearDown()
112        shutil.rmtree(self.workdir)
113        shutil.rmtree(self.dc_root)
114        clearSite()
115        return
116
117class TestFacultyProcessor(UniversityProcessorSetup):
118
119    def setUp(self):
120        super(TestFacultyProcessor, self).setUp()
121
122        self.browser = Browser()
123        self.browser.handleErrors = False
124        self.datacenter_path = 'http://localhost/app/datacenter'
125        self.app['datacenter'].setStoragePath(self.dc_root)
126
127        self.proc = FacultyProcessor()
128        self.site1 = dict(faculties=dict())
129        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
130        self.row = dict(code='FAC')
131
132        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
133        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
134        self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')
135        open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)
136        return
137
138    def test_ifaces(self):
139        # Make sure we fullfill all interface contracts
140        verifyClass(IBatchProcessor, FacultyProcessor)
141        verifyObject(IBatchProcessor, self.proc)
142        return
143
144    def test_get_entry(self):
145        # if a faculty exists already, we will get it
146        result1 = self.proc.getEntry(self.row, self.site1)
147        result2 = self.proc.getEntry(self.row, self.site2)
148        self.assertTrue(result1 is None)
149        self.assertEqual(result2, 'pseudo faculty')
150        return
151
152    def test_del_entry(self):
153        # make sure we can del entries.
154        self.proc.delEntry(self.row, self.site2)
155        self.assertTrue('FAC' not in self.site2.keys())
156        return
157
158    def test_checkConversion(self):
159        # Make sure we can check conversions.
160        errs, inv_errs, conv_dict = self.proc.checkConversion(
161            dict(faculty_code='ABC', local_roles='[]'))
162        self.assertEqual(len(errs),0)
163
164        errs, inv_errs, conv_dict = self.proc.checkConversion(
165            dict(faculty_code='ABC',
166            local_roles="['nonsense'"
167            ))
168        self.assertEqual(len(errs),1)
169        self.assertEqual(errs, [('local_roles', 'Error')])
170        errs, inv_errs, conv_dict = self.proc.checkConversion(
171            dict(faculty_code='ABC',
172            local_roles="('abc')"
173            ))
174        self.assertEqual(len(errs),1)
175        self.assertEqual(errs, [('local_roles', 'no list')])
176        errs, inv_errs, conv_dict = self.proc.checkConversion(
177            dict(faculty_code='ABC',
178            local_roles="[('ABC')]"
179            ))
180        self.assertEqual(len(errs),1)
181        self.assertEqual(errs, [('local_roles', 'no dicts')])
182        errs, inv_errs, conv_dict = self.proc.checkConversion(
183            dict(faculty_code='ABC',
184            local_roles="('abc')"
185            ))
186        self.assertEqual(len(errs),1)
187        self.assertEqual(errs, [('local_roles', 'no list')])
188        errs, inv_errs, conv_dict = self.proc.checkConversion(
189            dict(faculty_code='ABC', local_roles=
190            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
191            ))
192        self.assertEqual(len(errs),1)
193        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
194        errs, inv_errs, conv_dict = self.proc.checkConversion(
195            dict(faculty_code='ABC', local_roles=
196            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
197            ))
198        self.assertEqual(len(errs),1)
199        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
200        errs, inv_errs, conv_dict = self.proc.checkConversion(
201            dict(faculty_code='ABC', local_roles=
202            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
203            ))
204        self.assertEqual(len(errs),1)
205        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
206        errs, inv_errs, conv_dict = self.proc.checkConversion(
207            dict(faculty_code='ABC', local_roles=
208            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
209            ))
210        self.assertEqual(len(errs),1)
211        self.assertEqual(errs, [('local_roles', 'john does not exist')])
212        errs, inv_errs, conv_dict = self.proc.checkConversion(
213            dict(faculty_code='ABC', local_roles=
214            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
215            ))
216        self.assertEqual(len(errs),0)
217        return
218
219    def test_import(self):
220        num, num_warns, fin_file, fail_file = self.proc.doImport(
221            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
222        content = open(fail_file).read()
223        self.assertEqual(num_warns,5)
224        self.assertEqual(
225            content,
226            'code,local_roles,--ERRORS--\r\n'
227            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
228            'local_roles: alice does not exist\r\n'
229            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
230            'local_roles: waeup.local.Boss not allowed\r\n'
231            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
232            'FGH,[,local_roles: Error\r\n'
233            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
234            'local_roles: user_name or local_role missing\r\n'
235            )
236        # Bob got a local role in faculty ABC.
237        abc = self.app['faculties']['ABC']
238        role_map = IPrincipalRoleMap(abc)
239        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
240        self.assertEqual(user_name, 'bob')
241        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
242        shutil.rmtree(os.path.dirname(fin_file))
243        return
244
245    def test_import_update(self):
246        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
247        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
248        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
249
250        role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1'])
251        role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons')
252        role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1'])
253        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1)
254
255        role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2'])
256        role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons')
257        role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2'])
258        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
259
260        role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3'])
261        role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons')
262        role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3'])
263        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
264
265        role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4'])
266        role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons')
267        role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4'])
268        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
269
270        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
271        self.assertEqual(user_name, 'alfons')
272        self.assertEqual(local_role, 'alfonsrole')
273
274        num, num_warns, fin_file, fail_file = self.proc.doImport(
275            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
276        self.assertEqual(num_warns,0)
277        # Local roles have been removed in FAC1 due to deletion marker.
278        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0)
279        # Old local roles have been removed and new roles have been added in FAC2.
280        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
281        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
282        self.assertEqual(user_name, 'bob')
283        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
284        # Local roles are not touched in FAC3 due to ignore marker.
285        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
286        local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0]
287        self.assertEqual(user_name, 'alfons')
288        self.assertEqual(local_role, 'alfonsrole')
289        # Local roles are not touched in FAC4 due to empty cell.
290        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
291        local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0]
292        self.assertEqual(user_name, 'alfons')
293        self.assertEqual(local_role, 'alfonsrole')
294        shutil.rmtree(os.path.dirname(fin_file))
295        return
296       
297    def test_import_update_logging(self):
298        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
299        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
300        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
301        num, num_warns, fin_file, fail_file = self.proc.doImport(
302            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
303        self.assertEqual(num_warns,0)
304        logcontent = open(self.logfile).read()
305        # Logging message from updateEntry
306        self.assertTrue(
307            'INFO - system - Department Processor - DEP2 - '
308            'Record updated: local_roles=anne|waeup.local.DepartmentManager, '
309            'code=DEP2'
310            in logcontent)
311        shutil.rmtree(os.path.dirname(fin_file))
312
313    def test_upload_import_reupload(self):
314        self.browser.addHeader('Authorization', 'Basic mgr:mgrpw')
315        self.browser.open(self.datacenter_path)
316        self.assertEqual(self.browser.headers['Status'], '200 Ok')
317        self.assertEqual(self.browser.url, self.datacenter_path)
318        self.browser.getLink("Upload data").click()
319        file = open(self.csv_file_faculty)
320        ctrl = self.browser.getControl(name='uploadfile:file')
321        file_ctrl = ctrl.mech_control
322        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
323        self.browser.getControl('Upload').click()
324        self.browser.getLink('Process data').click()
325        self.browser.getControl(name="select").click()
326        importerselect = self.browser.getControl(name='importer')
327        importerselect.getControl('Faculty Processor').selected = True
328        modeselect = self.browser.getControl(name='mode')
329        modeselect.getControl(value='create').selected = True
330        self.browser.getControl('Proceed to step 3').click()
331        self.assertTrue('Header fields OK' in self.browser.contents)
332        self.browser.getControl('Perform import').click()
333        self.assertTrue('Successfully processed 1 rows' in self.browser.contents)
334        # We import the same file a second time.
335        self.browser.open(self.datacenter_path)
336        self.browser.getLink("Upload data").click()
337        file = open(self.csv_file_faculty)
338        ctrl = self.browser.getControl(name='uploadfile:file')
339        file_ctrl = ctrl.mech_control
340        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
341        self.browser.getControl('Upload').click()
342        self.assertTrue(
343            'File with same name was uploaded earlier' in self.browser.contents)
344        return
345
346class TestDepartmentProcessor(UniversityProcessorSetup):
347
348    def setUp(self):
349        super(TestDepartmentProcessor, self).setUp()
350        self.proc = DepartmentProcessor()
351        self.site0 = dict()
352        self.site1 = dict(faculties=dict())
353        self.site2 = dict(faculties=dict(FAC=dict()))
354        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
355        self.row = dict(code='DPT', faculty_code='FAC')
356
357        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
358        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
359        return
360
361    def test_ifaces(self):
362        # Make sure we fullfill all interface contracts
363        verifyClass(IBatchProcessor, DepartmentProcessor)
364        verifyObject(IBatchProcessor, self.proc)
365        return
366
367    def test_parents_exist(self):
368        # make sure we lookup parents correctly.
369        result0 = self.proc.parentsExist(self.row, self.site0)
370        result1 = self.proc.parentsExist(self.row, self.site1)
371        result2 = self.proc.parentsExist(self.row, self.site2)
372        result3 = self.proc.parentsExist(self.row, self.site3)
373        self.assertTrue(result0 is False)
374        self.assertTrue(result1 is False)
375        self.assertTrue(result2 is True)
376        self.assertTrue(result3 is True)
377        return
378
379    def test_entry_exists(self):
380        # make sure we lookup entries correctly.
381        result0 = self.proc.entryExists(self.row, dict())
382        result1 = self.proc.entryExists(self.row, self.site1)
383        result2 = self.proc.entryExists(self.row, self.site2)
384        result3 = self.proc.entryExists(self.row, self.site3)
385        self.assertTrue(result0 is False)
386        self.assertTrue(result1 is False)
387        self.assertTrue(result2 is False)
388        self.assertTrue(result3 is True)
389        return
390
391    def test_get_entry(self):
392        # we can get a dept. if it exists
393        result1 = self.proc.getEntry(self.row, self.site2)
394        result2 = self.proc.getEntry(self.row, self.site3)
395        self.assertTrue(result1 is None)
396        self.assertEqual(result2, 'pseudo department')
397        return
398
399    def test_del_entry(self):
400        # we can delete departments
401        self.proc.delEntry(self.row, self.site3)
402        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
403        return
404
405    def test_import(self):
406        num, num_warns, fin_file, fail_file = self.proc.doImport(
407            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
408        content = open(fail_file).read()
409        self.assertEqual(num_warns,6)
410        self.assertEqual(
411            content,
412            'faculty_code,code,local_roles,--ERRORS--\r\n'
413            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
414            'local_roles: alice does not exist\r\n'
415            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
416            'local_roles: waeup.local.Boss not allowed\r\n'
417            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
418            'FAC1,DEP2,[,local_roles: Error\r\n'
419            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
420            'local_roles: user_name or local_role missing\r\n'
421            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
422            'Not all parents do exist yet. Skipping\r\n'
423            )
424        # Anne got a local role in department DEP2.
425        dep = self.app['faculties']['FAC1']['DEP2']
426        role_map = IPrincipalRoleMap(dep)
427        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
428        self.assertEqual(user_name, 'anne')
429        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
430        shutil.rmtree(os.path.dirname(fin_file))
431        return
432
433class CourseProcessorTests(UniversityProcessorSetup):
434
435    def setUp(self):
436        super(CourseProcessorTests, self).setUp()
437        self.proc = CourseProcessor()
438        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
439        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
440        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
441        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
442
443        self.csv_file_course = os.path.join(self.workdir, 'sample_course_data.csv')
444        open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA)
445        return
446
447    def test_ifaces(self):
448        # Make sure we fullfill all interface contracts
449        verifyClass(IBatchProcessor, CourseProcessor)
450        verifyObject(IBatchProcessor, self.proc)
451        return
452
453    def test_parents_exist(self):
454        # make sure we lookup parents correctly
455        result1 = self.proc.parentsExist(self.row1, self.app)
456        result2 = self.proc.parentsExist(self.row2, self.app)
457        result3 = self.proc.parentsExist(self.row3, self.app)
458        self.assertTrue(result1 is False)
459        self.assertTrue(result2 is False)
460        self.assertTrue(result3 is True)
461        return
462
463    def test_entry_exists(self):
464        # make sure we find an entry if it exists
465        result1 = self.proc.entryExists(self.row1, self.app)
466        result2 = self.proc.entryExists(self.row2, self.app)
467        result3 = self.proc.entryExists(self.row3, self.app)
468        result4 = self.proc.entryExists(self.row4, self.app)
469        self.assertTrue(result1 is False)
470        self.assertTrue(result2 is False)
471        self.assertTrue(result3 is False)
472        self.assertTrue(result4 is True)
473        return
474
475    def test_get_entry(self):
476        # make sure we can get an entry if it exists
477        result1 = self.proc.getEntry(self.row1, self.app)
478        result2 = self.proc.getEntry(self.row4, self.app)
479        self.assertTrue(result1 is None)
480        self.assertTrue(result2 is self.course)
481        return
482
483    def test_del_entry(self):
484        # make sure we can delete entries
485        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
486        self.proc.delEntry(self.row4, self.app)
487        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
488        return
489
490    def test_import(self):
491        num, num_warns, fin_file, fail_file = self.proc.doImport(
492            self.csv_file_course, COURSE_HEADER_FIELDS)
493        content = open(fail_file).read()
494        self.assertEqual(num_warns,6)
495        self.assertEqual(
496            content,
497            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
498            'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",'
499            'local_roles: alice does not exist\r\n'
500            'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
501            'local_roles: waeup.local.Boss not allowed\r\n'
502            'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n'
503            'FAC1,DEP1,CRS2,[,local_roles: Error\r\n'
504            'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",'
505            'local_roles: user_name or local_role missing\r\n'
506            'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",'
507            'Not all parents do exist yet. Skipping\r\n'
508            )
509        # Anne got a local role in course CRS2.
510        dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2']
511        role_map = IPrincipalRoleMap(dep)
512        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
513        self.assertEqual(user_name, 'anne')
514        self.assertEqual(local_role, 'waeup.local.Lecturer')
515        shutil.rmtree(os.path.dirname(fin_file))
516        return
517
518
519class CertificateProcessorTests(UniversityProcessorSetup):
520
521    def setUp(self):
522        super(CertificateProcessorTests, self).setUp()
523        self.proc = CertificateProcessor()
524        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
525        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
526        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
527        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
528
529        self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv')
530        open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA)
531        return
532
533    def test_ifaces(self):
534        # Make sure we fullfill all interface contracts
535        verifyClass(IBatchProcessor, CourseProcessor)
536        verifyObject(IBatchProcessor, self.proc)
537        return
538
539    def test_parents_exist(self):
540        # make sure we lookup parents correctly
541        result1 = self.proc.parentsExist(self.row1, self.app)
542        result2 = self.proc.parentsExist(self.row2, self.app)
543        result3 = self.proc.parentsExist(self.row3, self.app)
544        self.assertTrue(result1 is False)
545        self.assertTrue(result2 is False)
546        self.assertTrue(result3 is True)
547        return
548
549    def test_entry_exists(self):
550        # make sure we find an entry if it exists
551        result1 = self.proc.entryExists(self.row1, self.app)
552        result2 = self.proc.entryExists(self.row2, self.app)
553        result3 = self.proc.entryExists(self.row3, self.app)
554        result4 = self.proc.entryExists(self.row4, self.app)
555        self.assertTrue(result1 is False)
556        self.assertTrue(result2 is False)
557        self.assertTrue(result3 is False)
558        self.assertTrue(result4 is True)
559        return
560
561    def test_get_entry(self):
562        # make sure we can get an entry if it exists
563        result1 = self.proc.getEntry(self.row1, self.app)
564        result2 = self.proc.getEntry(self.row4, self.app)
565        self.assertTrue(result1 is None)
566        self.assertTrue(result2 is self.certificate)
567        return
568
569    def test_del_entry(self):
570        # make sure we can delete entries
571        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
572        self.proc.delEntry(self.row4, self.app)
573        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
574        return
575
576    def test_import(self):
577        num, num_warns, fin_file, fail_file = self.proc.doImport(
578            self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS)
579        content = open(fail_file).read()
580        self.assertEqual(num_warns,6)
581        self.assertEqual(
582            content,
583            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
584            'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
585            'local_roles: alice does not exist\r\n'
586            'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
587            'local_roles: waeup.local.Boss not allowed\r\n'
588            'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n'
589            'FAC1,DEP1,CRT2,[,local_roles: Error\r\n'
590            'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",'
591            'local_roles: user_name or local_role missing\r\n'
592            'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
593            'Not all parents do exist yet. Skipping\r\n'
594            )
595        # Anne got a local role in certificate CRT2.
596        dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2']
597        role_map = IPrincipalRoleMap(dep)
598        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
599        self.assertEqual(user_name, 'anne')
600        self.assertEqual(local_role, 'waeup.local.CourseAdviser100')
601        shutil.rmtree(os.path.dirname(fin_file))
602       
603        logcontent = open(self.logfile).read()
604        # Logging message from updateEntry
605        self.assertTrue(
606            'INFO - system - Certificate Processor - CRT2 - Record updated: '
607            'local_roles=anne|waeup.local.CourseAdviser100, code=CRT2'
608            in logcontent)       
609       
610        return
611
612
613class CertCourseProcessorTests(FunctionalTestCase):
614
615    layer = FunctionalLayer
616
617    def setUp(self):
618        super(CertCourseProcessorTests, self).setUp()
619        self.proc = CertificateCourseProcessor()
620        self.site0 = dict()
621        self.site1 = dict(faculties=dict())
622        self.site2 = dict(faculties=dict(FAC=dict()))
623        self.department1 = Department(code='DPT')
624        self.department2 = Department(code='DPT')
625        self.certificate = Certificate(code='CRT')
626        self.department2.certificates['CRT'] = self.certificate
627        self.course = Course(code='CRS')
628        self.department2.certificates['CRT'].addCertCourse(self.course)
629        self.cert_course = self.certificate['CRS_100']
630        self.department2.courses['CRS'] = self.course
631        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
632        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
633        self.row = dict(
634            department_code='DPT',
635            faculty_code='FAC',
636            certificate_code='CRT',
637            course=self.course, level='100',
638            code='CRS_100')
639        return
640
641    def test_ifaces(self):
642        # Make sure we fullfill all interface contracts
643        verifyClass(IBatchProcessor, CertificateCourseProcessor)
644        verifyObject(IBatchProcessor, self.proc)
645        return
646
647    def test_parents_exist(self):
648        # make sure we can find all certificate parents
649        result0 = self.proc.parentsExist(self.row, self.site0)
650        result1 = self.proc.parentsExist(self.row, self.site1)
651        result2 = self.proc.parentsExist(self.row, self.site2)
652        result3 = self.proc.parentsExist(self.row, self.site3)
653        result4 = self.proc.parentsExist(self.row, self.site4)
654        self.assertTrue(result0 is False)
655        self.assertTrue(result1 is False)
656        self.assertTrue(result2 is False)
657        self.assertTrue(result3 is False)
658        self.assertTrue(result4 is True)
659        return
660
661    def test_entry_exists(self):
662        # make sure we find an entry if it exists
663        result0 = self.proc.entryExists(self.row, self.site0)
664        result1 = self.proc.entryExists(self.row, self.site1)
665        result2 = self.proc.entryExists(self.row, self.site2)
666        result3 = self.proc.entryExists(self.row, self.site3)
667        result4 = self.proc.entryExists(self.row, self.site4)
668        self.assertTrue(result0 is False)
669        self.assertTrue(result1 is False)
670        self.assertTrue(result2 is False)
671        self.assertTrue(result3 is False)
672        self.assertTrue(result4 is True)
673        return
674
675    def test_get_entry(self):
676        # make sure we can get an entry if it exists
677        result1 = self.proc.getEntry(self.row, self.site3)
678        result2 = self.proc.getEntry(self.row, self.site4)
679        self.assertTrue(result1 is None)
680        self.assertTrue(result2 is self.cert_course)
681        return
682
683    def test_del_entry(self):
684        # make sure we can delete entries
685        self.assertTrue('CRS_100' in self.certificate.keys())
686        self.proc.delEntry(self.row, self.site4)
687        self.assertTrue('CRS_100' not in self.certificate.keys())
688        return
Note: See TracBrowser for help on using the repository browser.