source: main/waeup.kofa/branches/uli-async-update/src/waeup/kofa/university/tests/test_batching.py @ 9251

Last change on this file since 9251 was 9169, checked in by uli, 12 years ago

Merge changes from trunk, r8786-HEAD

  • Property svn:keywords set to Id
File size: 29.8 KB
Line 
1## $Id: test_batching.py 9169 2012-09-10 11:05:07Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import (
27    IPrincipalRoleMap, IPrincipalRoleManager)
28from zope.testbrowser.testing import Browser
29from zope.interface.verify import verifyClass, verifyObject
30from waeup.kofa.interfaces import IBatchProcessor
31from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
32from waeup.kofa.app import University
33from waeup.kofa.university.batching import (
34    FacultyProcessor, DepartmentProcessor, CourseProcessor,
35    CertificateProcessor, CertificateCourseProcessor)
36from waeup.kofa.university.certificate import Certificate, CertificateCourse
37from waeup.kofa.university.course import Course
38from waeup.kofa.university import Faculty, Department
39from waeup.kofa.university.batching import FacultyProcessor
40
41FACULTY_SAMPLE_DATA = open(
42    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
43    'rb').read()
44
45FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
46    '\n')[0].split(',')
47
48FACULTY_SAMPLE_DATA_UPDATE = open(
49    os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'),
50    'rb').read()
51
52FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split(
53    '\n')[0].split(',')
54
55DEPARTMENT_SAMPLE_DATA = open(
56    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
57    'rb').read()
58
59DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
60    '\n')[0].split(',')
61
62CERTIFICATE_SAMPLE_DATA = open(
63    os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'),
64    'rb').read()
65
66CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split(
67    '\n')[0].split(',')
68
69COURSE_SAMPLE_DATA = open(
70    os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'),
71    'rb').read()
72
73COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split(
74    '\n')[0].split(',')
75
76class UniversityProcessorSetup(FunctionalTestCase):
77
78    layer = FunctionalLayer
79
80    def setUp(self):
81        super(UniversityProcessorSetup, self).setUp()
82        self.dc_root = tempfile.mkdtemp()
83        self.workdir = tempfile.mkdtemp()
84        app = University()
85        self.getRootFolder()['app'] = app
86        self.app = self.getRootFolder()['app']
87        setSite(app)
88
89        self.app['users'].addUser('bob', 'bobssecret')
90        self.app['users'].addUser('anne', 'annessecret')
91
92        # Populate university
93        self.certificate = createObject('waeup.Certificate')
94        self.certificate.code = 'CRT1'
95        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
96        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
97        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
98            self.certificate)
99        self.course = createObject('waeup.Course')
100        self.course.code = 'CRS1'
101        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
102            self.course)
103        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
104        #    self.course, level=100)
105       
106        self.logfile = os.path.join(
107            self.app['datacenter'].storage, 'logs', 'main.log')
108        return
109        return
110
111    def tearDown(self):
112        super(UniversityProcessorSetup, self).tearDown()
113        shutil.rmtree(self.workdir)
114        shutil.rmtree(self.dc_root)
115        clearSite()
116        return
117
118class TestFacultyProcessor(UniversityProcessorSetup):
119
120    def setUp(self):
121        super(TestFacultyProcessor, self).setUp()
122
123        self.browser = Browser()
124        self.browser.handleErrors = False
125        self.datacenter_path = 'http://localhost/app/datacenter'
126        self.app['datacenter'].setStoragePath(self.dc_root)
127
128        self.proc = FacultyProcessor()
129        self.site1 = dict(faculties=dict())
130        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
131        self.row = dict(code='FAC')
132
133        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
134        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
135        self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')
136        open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)
137        return
138
139    def test_ifaces(self):
140        # Make sure we fullfill all interface contracts
141        verifyClass(IBatchProcessor, FacultyProcessor)
142        verifyObject(IBatchProcessor, self.proc)
143        return
144
145    def test_get_entry(self):
146        # if a faculty exists already, we will get it
147        result1 = self.proc.getEntry(self.row, self.site1)
148        result2 = self.proc.getEntry(self.row, self.site2)
149        self.assertTrue(result1 is None)
150        self.assertEqual(result2, 'pseudo faculty')
151        return
152
153    def test_del_entry(self):
154        # make sure we can del entries.
155        self.proc.delEntry(self.row, self.site2)
156        self.assertTrue('FAC' not in self.site2.keys())
157        return
158
159    def test_checkConversion(self):
160        # Make sure we can check conversions.
161        errs, inv_errs, conv_dict = self.proc.checkConversion(
162            dict(faculty_code='ABC', local_roles='[]'))
163        self.assertEqual(len(errs),0)
164
165        errs, inv_errs, conv_dict = self.proc.checkConversion(
166            dict(faculty_code='ABC',
167            local_roles="['nonsense'"
168            ))
169        self.assertEqual(len(errs),1)
170        self.assertEqual(errs, [('local_roles', 'Error')])
171        errs, inv_errs, conv_dict = self.proc.checkConversion(
172            dict(faculty_code='ABC',
173            local_roles="('abc')"
174            ))
175        self.assertEqual(len(errs),1)
176        self.assertEqual(errs, [('local_roles', 'no list')])
177        errs, inv_errs, conv_dict = self.proc.checkConversion(
178            dict(faculty_code='ABC',
179            local_roles="[('ABC')]"
180            ))
181        self.assertEqual(len(errs),1)
182        self.assertEqual(errs, [('local_roles', 'no dicts')])
183        errs, inv_errs, conv_dict = self.proc.checkConversion(
184            dict(faculty_code='ABC',
185            local_roles="('abc')"
186            ))
187        self.assertEqual(len(errs),1)
188        self.assertEqual(errs, [('local_roles', 'no list')])
189        errs, inv_errs, conv_dict = self.proc.checkConversion(
190            dict(faculty_code='ABC', local_roles=
191            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
192            ))
193        self.assertEqual(len(errs),1)
194        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
195        errs, inv_errs, conv_dict = self.proc.checkConversion(
196            dict(faculty_code='ABC', local_roles=
197            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
198            ))
199        self.assertEqual(len(errs),1)
200        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
201        errs, inv_errs, conv_dict = self.proc.checkConversion(
202            dict(faculty_code='ABC', local_roles=
203            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
204            ))
205        self.assertEqual(len(errs),1)
206        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
207        errs, inv_errs, conv_dict = self.proc.checkConversion(
208            dict(faculty_code='ABC', local_roles=
209            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
210            ))
211        self.assertEqual(len(errs),1)
212        self.assertEqual(errs, [('local_roles', 'john does not exist')])
213        errs, inv_errs, conv_dict = self.proc.checkConversion(
214            dict(faculty_code='ABC', local_roles=
215            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
216            ))
217        self.assertEqual(len(errs),0)
218        return
219
220    def test_import(self):
221        num, num_warns, fin_file, fail_file = self.proc.doImport(
222            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
223        content = open(fail_file).read()
224        self.assertEqual(num_warns,5)
225        self.assertEqual(
226            content,
227            'code,local_roles,--ERRORS--\r\n'
228            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
229            'local_roles: alice does not exist\r\n'
230            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
231            'local_roles: waeup.local.Boss not allowed\r\n'
232            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
233            'FGH,[,local_roles: Error\r\n'
234            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
235            'local_roles: user_name or local_role missing\r\n'
236            )
237        # Bob got a local role in faculty ABC.
238        abc = self.app['faculties']['ABC']
239        role_map = IPrincipalRoleMap(abc)
240        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
241        self.assertEqual(user_name, 'bob')
242        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
243        shutil.rmtree(os.path.dirname(fin_file))
244        return
245
246    def test_import_update(self):
247        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
248        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
249        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
250
251        role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1'])
252        role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons')
253        role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1'])
254        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1)
255
256        role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2'])
257        role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons')
258        role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2'])
259        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
260
261        role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3'])
262        role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons')
263        role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3'])
264        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
265
266        role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4'])
267        role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons')
268        role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4'])
269        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
270
271        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
272        self.assertEqual(user_name, 'alfons')
273        self.assertEqual(local_role, 'alfonsrole')
274
275        num, num_warns, fin_file, fail_file = self.proc.doImport(
276            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
277        self.assertEqual(num_warns,0)
278        # Local roles have been removed in FAC1 due to deletion marker.
279        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0)
280        # Old local roles have been removed and new roles have been added in FAC2.
281        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
282        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
283        self.assertEqual(user_name, 'bob')
284        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
285        # Local roles are not touched in FAC3 due to ignore marker.
286        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
287        local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0]
288        self.assertEqual(user_name, 'alfons')
289        self.assertEqual(local_role, 'alfonsrole')
290        # Local roles are not touched in FAC4 due to empty cell.
291        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
292        local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0]
293        self.assertEqual(user_name, 'alfons')
294        self.assertEqual(local_role, 'alfonsrole')
295        shutil.rmtree(os.path.dirname(fin_file))
296        return
297       
298    def test_import_update_logging(self):
299        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
300        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
301        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
302        num, num_warns, fin_file, fail_file = self.proc.doImport(
303            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
304        self.assertEqual(num_warns,0)
305        logcontent = open(self.logfile).read()
306        # Logging message from updateEntry
307        self.assertTrue(
308            'INFO - system - Department Processor - DEP2 - '
309            'Record updated: local_roles=anne|waeup.local.DepartmentManager, '
310            'code=DEP2'
311            in logcontent)
312
313    def test_upload_import_reupload(self):
314        self.browser.addHeader('Authorization', 'Basic mgr:mgrpw')
315        self.browser.open(self.datacenter_path)
316        self.assertEqual(self.browser.headers['Status'], '200 Ok')
317        self.assertEqual(self.browser.url, self.datacenter_path)
318        self.browser.getLink("Upload data").click()
319        file = open(self.csv_file_faculty)
320        ctrl = self.browser.getControl(name='uploadfile:file')
321        file_ctrl = ctrl.mech_control
322        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
323        self.browser.getControl('Upload').click()
324        self.browser.getLink('Process data').click()
325        self.browser.getControl(name="select").click()
326        importerselect = self.browser.getControl(name='importer')
327        importerselect.getControl('Faculty Processor').selected = True
328        modeselect = self.browser.getControl(name='mode')
329        modeselect.getControl(value='create').selected = True
330        self.browser.getControl('Proceed to step 3').click()
331        self.assertTrue('Header fields OK' in self.browser.contents)
332        self.browser.getControl('Perform import').click()
333        self.assertTrue('Successfully processed 1 rows' in self.browser.contents)
334        # We import the same file a second time.
335        self.browser.open(self.datacenter_path)
336        self.browser.getLink("Upload data").click()
337        file = open(self.csv_file_faculty)
338        ctrl = self.browser.getControl(name='uploadfile:file')
339        file_ctrl = ctrl.mech_control
340        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
341        self.browser.getControl('Upload').click()
342        self.assertTrue(
343            'File with same name was uploaded earlier' in self.browser.contents)
344        return
345
346class TestDepartmentProcessor(UniversityProcessorSetup):
347
348    def setUp(self):
349        super(TestDepartmentProcessor, self).setUp()
350        self.proc = DepartmentProcessor()
351        self.site0 = dict()
352        self.site1 = dict(faculties=dict())
353        self.site2 = dict(faculties=dict(FAC=dict()))
354        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
355        self.row = dict(code='DPT', faculty_code='FAC')
356
357        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
358        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
359        return
360
361    def test_ifaces(self):
362        # Make sure we fullfill all interface contracts
363        verifyClass(IBatchProcessor, DepartmentProcessor)
364        verifyObject(IBatchProcessor, self.proc)
365        return
366
367    def test_parents_exist(self):
368        # make sure we lookup parents correctly.
369        result0 = self.proc.parentsExist(self.row, self.site0)
370        result1 = self.proc.parentsExist(self.row, self.site1)
371        result2 = self.proc.parentsExist(self.row, self.site2)
372        result3 = self.proc.parentsExist(self.row, self.site3)
373        self.assertTrue(result0 is False)
374        self.assertTrue(result1 is False)
375        self.assertTrue(result2 is True)
376        self.assertTrue(result3 is True)
377        return
378
379    def test_entry_exists(self):
380        # make sure we lookup entries correctly.
381        result0 = self.proc.entryExists(self.row, dict())
382        result1 = self.proc.entryExists(self.row, self.site1)
383        result2 = self.proc.entryExists(self.row, self.site2)
384        result3 = self.proc.entryExists(self.row, self.site3)
385        self.assertTrue(result0 is False)
386        self.assertTrue(result1 is False)
387        self.assertTrue(result2 is False)
388        self.assertTrue(result3 is True)
389        return
390
391    def test_get_entry(self):
392        # we can get a dept. if it exists
393        result1 = self.proc.getEntry(self.row, self.site2)
394        result2 = self.proc.getEntry(self.row, self.site3)
395        self.assertTrue(result1 is None)
396        self.assertEqual(result2, 'pseudo department')
397        return
398
399    def test_del_entry(self):
400        # we can delete departments
401        self.proc.delEntry(self.row, self.site3)
402        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
403        return
404
405    def test_import(self):
406        num, num_warns, fin_file, fail_file = self.proc.doImport(
407            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
408        content = open(fail_file).read()
409        self.assertEqual(num_warns,6)
410        self.assertEqual(
411            content,
412            'faculty_code,code,local_roles,--ERRORS--\r\n'
413            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
414            'local_roles: alice does not exist\r\n'
415            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
416            'local_roles: waeup.local.Boss not allowed\r\n'
417            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
418            'FAC1,DEP2,[,local_roles: Error\r\n'
419            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
420            'local_roles: user_name or local_role missing\r\n'
421            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
422            'Not all parents do exist yet. Skipping\r\n'
423            )
424        # Anne got a local role in department DEP2.
425        dep = self.app['faculties']['FAC1']['DEP2']
426        role_map = IPrincipalRoleMap(dep)
427        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
428        self.assertEqual(user_name, 'anne')
429        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
430        shutil.rmtree(os.path.dirname(fin_file))
431        return
432
433class CourseProcessorTests(UniversityProcessorSetup):
434
435    def setUp(self):
436        super(CourseProcessorTests, self).setUp()
437        self.proc = CourseProcessor()
438        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
439        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
440        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
441        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
442
443        self.csv_file_course = os.path.join(self.workdir, 'sample_course_data.csv')
444        open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA)
445        return
446
447    def test_ifaces(self):
448        # Make sure we fullfill all interface contracts
449        verifyClass(IBatchProcessor, CourseProcessor)
450        verifyObject(IBatchProcessor, self.proc)
451        return
452
453    def test_parents_exist(self):
454        # make sure we lookup parents correctly
455        result1 = self.proc.parentsExist(self.row1, self.app)
456        result2 = self.proc.parentsExist(self.row2, self.app)
457        result3 = self.proc.parentsExist(self.row3, self.app)
458        self.assertTrue(result1 is False)
459        self.assertTrue(result2 is False)
460        self.assertTrue(result3 is True)
461        return
462
463    def test_entry_exists(self):
464        # make sure we find an entry if it exists
465        result1 = self.proc.entryExists(self.row1, self.app)
466        result2 = self.proc.entryExists(self.row2, self.app)
467        result3 = self.proc.entryExists(self.row3, self.app)
468        result4 = self.proc.entryExists(self.row4, self.app)
469        self.assertTrue(result1 is False)
470        self.assertTrue(result2 is False)
471        self.assertTrue(result3 is False)
472        self.assertTrue(result4 is True)
473        return
474
475    def test_get_entry(self):
476        # make sure we can get an entry if it exists
477        result1 = self.proc.getEntry(self.row1, self.app)
478        result2 = self.proc.getEntry(self.row4, self.app)
479        self.assertTrue(result1 is None)
480        self.assertTrue(result2 is self.course)
481        return
482
483    def test_del_entry(self):
484        # make sure we can delete entries
485        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
486        self.proc.delEntry(self.row4, self.app)
487        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
488        return
489
490    def test_import(self):
491        num, num_warns, fin_file, fail_file = self.proc.doImport(
492            self.csv_file_course, COURSE_HEADER_FIELDS)
493        content = open(fail_file).read()
494        self.assertEqual(num_warns,6)
495        self.assertEqual(
496            content,
497            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
498            'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",'
499            'local_roles: alice does not exist\r\n'
500            'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
501            'local_roles: waeup.local.Boss not allowed\r\n'
502            'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n'
503            'FAC1,DEP1,CRS2,[,local_roles: Error\r\n'
504            'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",'
505            'local_roles: user_name or local_role missing\r\n'
506            'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",'
507            'Not all parents do exist yet. Skipping\r\n'
508            )
509        # Anne got a local role in course CRS2.
510        dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2']
511        role_map = IPrincipalRoleMap(dep)
512        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
513        self.assertEqual(user_name, 'anne')
514        self.assertEqual(local_role, 'waeup.local.Lecturer')
515        shutil.rmtree(os.path.dirname(fin_file))
516        return
517
518
519class CertificateProcessorTests(UniversityProcessorSetup):
520
521    def setUp(self):
522        super(CertificateProcessorTests, self).setUp()
523        self.proc = CertificateProcessor()
524        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
525        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
526        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
527        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
528
529        self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv')
530        open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA)
531        return
532
533    def test_ifaces(self):
534        # Make sure we fullfill all interface contracts
535        verifyClass(IBatchProcessor, CourseProcessor)
536        verifyObject(IBatchProcessor, self.proc)
537        return
538
539    def test_parents_exist(self):
540        # make sure we lookup parents correctly
541        result1 = self.proc.parentsExist(self.row1, self.app)
542        result2 = self.proc.parentsExist(self.row2, self.app)
543        result3 = self.proc.parentsExist(self.row3, self.app)
544        self.assertTrue(result1 is False)
545        self.assertTrue(result2 is False)
546        self.assertTrue(result3 is True)
547        return
548
549    def test_entry_exists(self):
550        # make sure we find an entry if it exists
551        result1 = self.proc.entryExists(self.row1, self.app)
552        result2 = self.proc.entryExists(self.row2, self.app)
553        result3 = self.proc.entryExists(self.row3, self.app)
554        result4 = self.proc.entryExists(self.row4, self.app)
555        self.assertTrue(result1 is False)
556        self.assertTrue(result2 is False)
557        self.assertTrue(result3 is False)
558        self.assertTrue(result4 is True)
559        return
560
561    def test_get_entry(self):
562        # make sure we can get an entry if it exists
563        result1 = self.proc.getEntry(self.row1, self.app)
564        result2 = self.proc.getEntry(self.row4, self.app)
565        self.assertTrue(result1 is None)
566        self.assertTrue(result2 is self.certificate)
567        return
568
569    def test_del_entry(self):
570        # make sure we can delete entries
571        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
572        self.proc.delEntry(self.row4, self.app)
573        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
574        return
575
576    def test_import(self):
577        num, num_warns, fin_file, fail_file = self.proc.doImport(
578            self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS)
579        content = open(fail_file).read()
580        self.assertEqual(num_warns,6)
581        self.assertEqual(
582            content,
583            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
584            'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
585            'local_roles: alice does not exist\r\n'
586            'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
587            'local_roles: waeup.local.Boss not allowed\r\n'
588            'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n'
589            'FAC1,DEP1,CRT2,[,local_roles: Error\r\n'
590            'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",'
591            'local_roles: user_name or local_role missing\r\n'
592            'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
593            'Not all parents do exist yet. Skipping\r\n'
594            )
595        # Anne got a local role in certificate CRT2.
596        dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2']
597        role_map = IPrincipalRoleMap(dep)
598        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
599        self.assertEqual(user_name, 'anne')
600        self.assertEqual(local_role, 'waeup.local.CourseAdviser100')
601        shutil.rmtree(os.path.dirname(fin_file))
602       
603        logcontent = open(self.logfile).read()
604        # Logging message from updateEntry
605        self.assertTrue(
606            'INFO - system - Certificate Processor - CRT2 - Record updated: '
607            'local_roles=anne|waeup.local.CourseAdviser100, code=CRT2'
608            in logcontent)       
609       
610        return
611
612
613class CertCourseProcessorTests(FunctionalTestCase):
614
615    layer = FunctionalLayer
616
617    def setUp(self):
618        super(CertCourseProcessorTests, self).setUp()
619        self.proc = CertificateCourseProcessor()
620        self.site0 = dict()
621        self.site1 = dict(faculties=dict())
622        self.site2 = dict(faculties=dict(FAC=dict()))
623        self.department1 = Department(code='DPT')
624        self.department2 = Department(code='DPT')
625        self.certificate = Certificate(code='CRT')
626        self.department2.certificates['CRT'] = self.certificate
627        self.course = Course(code='CRS')
628        self.department2.certificates['CRT'].addCertCourse(self.course)
629        self.cert_course = self.certificate['CRS_100']
630        self.department2.courses['CRS'] = self.course
631        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
632        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
633        self.row = dict(
634            department_code='DPT',
635            faculty_code='FAC',
636            certificate_code='CRT',
637            course=self.course, level='100',
638            code='CRS_100')
639        return
640
641    def test_ifaces(self):
642        # Make sure we fullfill all interface contracts
643        verifyClass(IBatchProcessor, CertificateCourseProcessor)
644        verifyObject(IBatchProcessor, self.proc)
645        return
646
647    def test_parents_exist(self):
648        # make sure we can find all certificate parents
649        result0 = self.proc.parentsExist(self.row, self.site0)
650        result1 = self.proc.parentsExist(self.row, self.site1)
651        result2 = self.proc.parentsExist(self.row, self.site2)
652        result3 = self.proc.parentsExist(self.row, self.site3)
653        result4 = self.proc.parentsExist(self.row, self.site4)
654        self.assertTrue(result0 is False)
655        self.assertTrue(result1 is False)
656        self.assertTrue(result2 is False)
657        self.assertTrue(result3 is False)
658        self.assertTrue(result4 is True)
659        return
660
661    def test_entry_exists(self):
662        # make sure we find an entry if it exists
663        result0 = self.proc.entryExists(self.row, self.site0)
664        result1 = self.proc.entryExists(self.row, self.site1)
665        result2 = self.proc.entryExists(self.row, self.site2)
666        result3 = self.proc.entryExists(self.row, self.site3)
667        result4 = self.proc.entryExists(self.row, self.site4)
668        self.assertTrue(result0 is False)
669        self.assertTrue(result1 is False)
670        self.assertTrue(result2 is False)
671        self.assertTrue(result3 is False)
672        self.assertTrue(result4 is True)
673        return
674
675    def test_get_entry(self):
676        # make sure we can get an entry if it exists
677        result1 = self.proc.getEntry(self.row, self.site3)
678        result2 = self.proc.getEntry(self.row, self.site4)
679        self.assertTrue(result1 is None)
680        self.assertTrue(result2 is self.cert_course)
681        return
682
683    def test_del_entry(self):
684        # make sure we can delete entries
685        self.assertTrue('CRS_100' in self.certificate.keys())
686        self.proc.delEntry(self.row, self.site4)
687        self.assertTrue('CRS_100' not in self.certificate.keys())
688        return
Note: See TracBrowser for help on using the repository browser.