source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 9283

Last change on this file since 9283 was 9283, checked in by Henrik Bettermann, 12 years ago

Remove temporary dirs in tests.

  • Property svn:keywords set to Id
File size: 29.9 KB
Line 
1## $Id: test_batching.py 9283 2012-10-03 21:36:14Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import (
27    IPrincipalRoleMap, IPrincipalRoleManager)
28from zope.testbrowser.testing import Browser
29from zope.interface.verify import verifyClass, verifyObject
30from waeup.kofa.interfaces import IBatchProcessor
31from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
32from waeup.kofa.app import University
33from waeup.kofa.university.batching import (
34    FacultyProcessor, DepartmentProcessor, CourseProcessor,
35    CertificateProcessor, CertificateCourseProcessor)
36from waeup.kofa.university.certificate import Certificate, CertificateCourse
37from waeup.kofa.university.course import Course
38from waeup.kofa.university import Faculty, Department
39from waeup.kofa.university.batching import FacultyProcessor
40
41FACULTY_SAMPLE_DATA = open(
42    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
43    'rb').read()
44
45FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
46    '\n')[0].split(',')
47
48FACULTY_SAMPLE_DATA_UPDATE = open(
49    os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'),
50    'rb').read()
51
52FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split(
53    '\n')[0].split(',')
54
55DEPARTMENT_SAMPLE_DATA = open(
56    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
57    'rb').read()
58
59DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
60    '\n')[0].split(',')
61
62CERTIFICATE_SAMPLE_DATA = open(
63    os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'),
64    'rb').read()
65
66CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split(
67    '\n')[0].split(',')
68
69COURSE_SAMPLE_DATA = open(
70    os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'),
71    'rb').read()
72
73COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split(
74    '\n')[0].split(',')
75
76class UniversityProcessorSetup(FunctionalTestCase):
77
78    layer = FunctionalLayer
79
80    def setUp(self):
81        super(UniversityProcessorSetup, self).setUp()
82        self.dc_root = tempfile.mkdtemp()
83        self.workdir = tempfile.mkdtemp()
84        app = University()
85        self.getRootFolder()['app'] = app
86        self.app = self.getRootFolder()['app']
87        setSite(app)
88
89        self.app['users'].addUser('bob', 'bobssecret')
90        self.app['users'].addUser('anne', 'annessecret')
91
92        # Populate university
93        self.certificate = createObject('waeup.Certificate')
94        self.certificate.code = 'CRT1'
95        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
96        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
97        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
98            self.certificate)
99        self.course = createObject('waeup.Course')
100        self.course.code = 'CRS1'
101        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
102            self.course)
103        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
104        #    self.course, level=100)
105       
106        self.logfile = os.path.join(
107            self.app['datacenter'].storage, 'logs', 'main.log')
108        return
109        return
110
111    def tearDown(self):
112        super(UniversityProcessorSetup, self).tearDown()
113        shutil.rmtree(self.workdir)
114        shutil.rmtree(self.dc_root)
115        clearSite()
116        return
117
118class TestFacultyProcessor(UniversityProcessorSetup):
119
120    def setUp(self):
121        super(TestFacultyProcessor, self).setUp()
122
123        self.browser = Browser()
124        self.browser.handleErrors = False
125        self.datacenter_path = 'http://localhost/app/datacenter'
126        self.app['datacenter'].setStoragePath(self.dc_root)
127
128        self.proc = FacultyProcessor()
129        self.site1 = dict(faculties=dict())
130        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
131        self.row = dict(code='FAC')
132
133        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
134        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
135        self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')
136        open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)
137        return
138
139    def test_ifaces(self):
140        # Make sure we fullfill all interface contracts
141        verifyClass(IBatchProcessor, FacultyProcessor)
142        verifyObject(IBatchProcessor, self.proc)
143        return
144
145    def test_get_entry(self):
146        # if a faculty exists already, we will get it
147        result1 = self.proc.getEntry(self.row, self.site1)
148        result2 = self.proc.getEntry(self.row, self.site2)
149        self.assertTrue(result1 is None)
150        self.assertEqual(result2, 'pseudo faculty')
151        return
152
153    def test_del_entry(self):
154        # make sure we can del entries.
155        self.proc.delEntry(self.row, self.site2)
156        self.assertTrue('FAC' not in self.site2.keys())
157        return
158
159    def test_checkConversion(self):
160        # Make sure we can check conversions.
161        errs, inv_errs, conv_dict = self.proc.checkConversion(
162            dict(faculty_code='ABC', local_roles='[]'))
163        self.assertEqual(len(errs),0)
164
165        errs, inv_errs, conv_dict = self.proc.checkConversion(
166            dict(faculty_code='ABC',
167            local_roles="['nonsense'"
168            ))
169        self.assertEqual(len(errs),1)
170        self.assertEqual(errs, [('local_roles', 'Error')])
171        errs, inv_errs, conv_dict = self.proc.checkConversion(
172            dict(faculty_code='ABC',
173            local_roles="('abc')"
174            ))
175        self.assertEqual(len(errs),1)
176        self.assertEqual(errs, [('local_roles', 'no list')])
177        errs, inv_errs, conv_dict = self.proc.checkConversion(
178            dict(faculty_code='ABC',
179            local_roles="[('ABC')]"
180            ))
181        self.assertEqual(len(errs),1)
182        self.assertEqual(errs, [('local_roles', 'no dicts')])
183        errs, inv_errs, conv_dict = self.proc.checkConversion(
184            dict(faculty_code='ABC',
185            local_roles="('abc')"
186            ))
187        self.assertEqual(len(errs),1)
188        self.assertEqual(errs, [('local_roles', 'no list')])
189        errs, inv_errs, conv_dict = self.proc.checkConversion(
190            dict(faculty_code='ABC', local_roles=
191            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
192            ))
193        self.assertEqual(len(errs),1)
194        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
195        errs, inv_errs, conv_dict = self.proc.checkConversion(
196            dict(faculty_code='ABC', local_roles=
197            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
198            ))
199        self.assertEqual(len(errs),1)
200        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
201        errs, inv_errs, conv_dict = self.proc.checkConversion(
202            dict(faculty_code='ABC', local_roles=
203            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
204            ))
205        self.assertEqual(len(errs),1)
206        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
207        errs, inv_errs, conv_dict = self.proc.checkConversion(
208            dict(faculty_code='ABC', local_roles=
209            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
210            ))
211        self.assertEqual(len(errs),1)
212        self.assertEqual(errs, [('local_roles', 'john does not exist')])
213        errs, inv_errs, conv_dict = self.proc.checkConversion(
214            dict(faculty_code='ABC', local_roles=
215            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
216            ))
217        self.assertEqual(len(errs),0)
218        return
219
220    def test_import(self):
221        num, num_warns, fin_file, fail_file = self.proc.doImport(
222            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
223        content = open(fail_file).read()
224        self.assertEqual(num_warns,5)
225        self.assertEqual(
226            content,
227            'code,local_roles,--ERRORS--\r\n'
228            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
229            'local_roles: alice does not exist\r\n'
230            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
231            'local_roles: waeup.local.Boss not allowed\r\n'
232            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
233            'FGH,[,local_roles: Error\r\n'
234            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
235            'local_roles: user_name or local_role missing\r\n'
236            )
237        # Bob got a local role in faculty ABC.
238        abc = self.app['faculties']['ABC']
239        role_map = IPrincipalRoleMap(abc)
240        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
241        self.assertEqual(user_name, 'bob')
242        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
243        shutil.rmtree(os.path.dirname(fin_file))
244        return
245
246    def test_import_update(self):
247        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
248        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
249        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
250
251        role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1'])
252        role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons')
253        role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1'])
254        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1)
255
256        role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2'])
257        role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons')
258        role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2'])
259        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
260
261        role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3'])
262        role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons')
263        role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3'])
264        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
265
266        role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4'])
267        role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons')
268        role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4'])
269        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
270
271        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
272        self.assertEqual(user_name, 'alfons')
273        self.assertEqual(local_role, 'alfonsrole')
274
275        num, num_warns, fin_file, fail_file = self.proc.doImport(
276            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
277        self.assertEqual(num_warns,0)
278        # Local roles have been removed in FAC1 due to deletion marker.
279        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0)
280        # Old local roles have been removed and new roles have been added in FAC2.
281        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
282        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
283        self.assertEqual(user_name, 'bob')
284        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
285        # Local roles are not touched in FAC3 due to ignore marker.
286        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
287        local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0]
288        self.assertEqual(user_name, 'alfons')
289        self.assertEqual(local_role, 'alfonsrole')
290        # Local roles are not touched in FAC4 due to empty cell.
291        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
292        local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0]
293        self.assertEqual(user_name, 'alfons')
294        self.assertEqual(local_role, 'alfonsrole')
295        shutil.rmtree(os.path.dirname(fin_file))
296        return
297       
298    def test_import_update_logging(self):
299        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
300        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
301        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
302        num, num_warns, fin_file, fail_file = self.proc.doImport(
303            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
304        self.assertEqual(num_warns,0)
305        logcontent = open(self.logfile).read()
306        # Logging message from updateEntry
307        self.assertTrue(
308            'INFO - system - Department Processor - DEP2 - '
309            'Record updated: local_roles=anne|waeup.local.DepartmentManager, '
310            'code=DEP2'
311            in logcontent)
312        shutil.rmtree(os.path.dirname(fin_file))
313
314    def test_upload_import_reupload(self):
315        self.browser.addHeader('Authorization', 'Basic mgr:mgrpw')
316        self.browser.open(self.datacenter_path)
317        self.assertEqual(self.browser.headers['Status'], '200 Ok')
318        self.assertEqual(self.browser.url, self.datacenter_path)
319        self.browser.getLink("Upload data").click()
320        file = open(self.csv_file_faculty)
321        ctrl = self.browser.getControl(name='uploadfile:file')
322        file_ctrl = ctrl.mech_control
323        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
324        self.browser.getControl('Upload').click()
325        self.browser.getLink('Process data').click()
326        self.browser.getControl(name="select").click()
327        importerselect = self.browser.getControl(name='importer')
328        importerselect.getControl('Faculty Processor').selected = True
329        modeselect = self.browser.getControl(name='mode')
330        modeselect.getControl(value='create').selected = True
331        self.browser.getControl('Proceed to step 3').click()
332        self.assertTrue('Header fields OK' in self.browser.contents)
333        self.browser.getControl('Perform import').click()
334        self.assertTrue('Successfully processed 1 rows' in self.browser.contents)
335        # We import the same file a second time.
336        self.browser.open(self.datacenter_path)
337        self.browser.getLink("Upload data").click()
338        file = open(self.csv_file_faculty)
339        ctrl = self.browser.getControl(name='uploadfile:file')
340        file_ctrl = ctrl.mech_control
341        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
342        self.browser.getControl('Upload').click()
343        self.assertTrue(
344            'File with same name was uploaded earlier' in self.browser.contents)
345        return
346
347class TestDepartmentProcessor(UniversityProcessorSetup):
348
349    def setUp(self):
350        super(TestDepartmentProcessor, self).setUp()
351        self.proc = DepartmentProcessor()
352        self.site0 = dict()
353        self.site1 = dict(faculties=dict())
354        self.site2 = dict(faculties=dict(FAC=dict()))
355        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
356        self.row = dict(code='DPT', faculty_code='FAC')
357
358        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
359        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
360        return
361
362    def test_ifaces(self):
363        # Make sure we fullfill all interface contracts
364        verifyClass(IBatchProcessor, DepartmentProcessor)
365        verifyObject(IBatchProcessor, self.proc)
366        return
367
368    def test_parents_exist(self):
369        # make sure we lookup parents correctly.
370        result0 = self.proc.parentsExist(self.row, self.site0)
371        result1 = self.proc.parentsExist(self.row, self.site1)
372        result2 = self.proc.parentsExist(self.row, self.site2)
373        result3 = self.proc.parentsExist(self.row, self.site3)
374        self.assertTrue(result0 is False)
375        self.assertTrue(result1 is False)
376        self.assertTrue(result2 is True)
377        self.assertTrue(result3 is True)
378        return
379
380    def test_entry_exists(self):
381        # make sure we lookup entries correctly.
382        result0 = self.proc.entryExists(self.row, dict())
383        result1 = self.proc.entryExists(self.row, self.site1)
384        result2 = self.proc.entryExists(self.row, self.site2)
385        result3 = self.proc.entryExists(self.row, self.site3)
386        self.assertTrue(result0 is False)
387        self.assertTrue(result1 is False)
388        self.assertTrue(result2 is False)
389        self.assertTrue(result3 is True)
390        return
391
392    def test_get_entry(self):
393        # we can get a dept. if it exists
394        result1 = self.proc.getEntry(self.row, self.site2)
395        result2 = self.proc.getEntry(self.row, self.site3)
396        self.assertTrue(result1 is None)
397        self.assertEqual(result2, 'pseudo department')
398        return
399
400    def test_del_entry(self):
401        # we can delete departments
402        self.proc.delEntry(self.row, self.site3)
403        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
404        return
405
406    def test_import(self):
407        num, num_warns, fin_file, fail_file = self.proc.doImport(
408            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
409        content = open(fail_file).read()
410        self.assertEqual(num_warns,6)
411        self.assertEqual(
412            content,
413            'faculty_code,code,local_roles,--ERRORS--\r\n'
414            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
415            'local_roles: alice does not exist\r\n'
416            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
417            'local_roles: waeup.local.Boss not allowed\r\n'
418            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
419            'FAC1,DEP2,[,local_roles: Error\r\n'
420            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
421            'local_roles: user_name or local_role missing\r\n'
422            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
423            'Not all parents do exist yet. Skipping\r\n'
424            )
425        # Anne got a local role in department DEP2.
426        dep = self.app['faculties']['FAC1']['DEP2']
427        role_map = IPrincipalRoleMap(dep)
428        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
429        self.assertEqual(user_name, 'anne')
430        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
431        shutil.rmtree(os.path.dirname(fin_file))
432        return
433
434class CourseProcessorTests(UniversityProcessorSetup):
435
436    def setUp(self):
437        super(CourseProcessorTests, self).setUp()
438        self.proc = CourseProcessor()
439        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
440        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
441        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
442        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
443
444        self.csv_file_course = os.path.join(self.workdir, 'sample_course_data.csv')
445        open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA)
446        return
447
448    def test_ifaces(self):
449        # Make sure we fullfill all interface contracts
450        verifyClass(IBatchProcessor, CourseProcessor)
451        verifyObject(IBatchProcessor, self.proc)
452        return
453
454    def test_parents_exist(self):
455        # make sure we lookup parents correctly
456        result1 = self.proc.parentsExist(self.row1, self.app)
457        result2 = self.proc.parentsExist(self.row2, self.app)
458        result3 = self.proc.parentsExist(self.row3, self.app)
459        self.assertTrue(result1 is False)
460        self.assertTrue(result2 is False)
461        self.assertTrue(result3 is True)
462        return
463
464    def test_entry_exists(self):
465        # make sure we find an entry if it exists
466        result1 = self.proc.entryExists(self.row1, self.app)
467        result2 = self.proc.entryExists(self.row2, self.app)
468        result3 = self.proc.entryExists(self.row3, self.app)
469        result4 = self.proc.entryExists(self.row4, self.app)
470        self.assertTrue(result1 is False)
471        self.assertTrue(result2 is False)
472        self.assertTrue(result3 is False)
473        self.assertTrue(result4 is True)
474        return
475
476    def test_get_entry(self):
477        # make sure we can get an entry if it exists
478        result1 = self.proc.getEntry(self.row1, self.app)
479        result2 = self.proc.getEntry(self.row4, self.app)
480        self.assertTrue(result1 is None)
481        self.assertTrue(result2 is self.course)
482        return
483
484    def test_del_entry(self):
485        # make sure we can delete entries
486        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
487        self.proc.delEntry(self.row4, self.app)
488        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
489        return
490
491    def test_import(self):
492        num, num_warns, fin_file, fail_file = self.proc.doImport(
493            self.csv_file_course, COURSE_HEADER_FIELDS)
494        content = open(fail_file).read()
495        self.assertEqual(num_warns,6)
496        self.assertEqual(
497            content,
498            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
499            'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",'
500            'local_roles: alice does not exist\r\n'
501            'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
502            'local_roles: waeup.local.Boss not allowed\r\n'
503            'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n'
504            'FAC1,DEP1,CRS2,[,local_roles: Error\r\n'
505            'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",'
506            'local_roles: user_name or local_role missing\r\n'
507            'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",'
508            'Not all parents do exist yet. Skipping\r\n'
509            )
510        # Anne got a local role in course CRS2.
511        dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2']
512        role_map = IPrincipalRoleMap(dep)
513        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
514        self.assertEqual(user_name, 'anne')
515        self.assertEqual(local_role, 'waeup.local.Lecturer')
516        shutil.rmtree(os.path.dirname(fin_file))
517        return
518
519
520class CertificateProcessorTests(UniversityProcessorSetup):
521
522    def setUp(self):
523        super(CertificateProcessorTests, self).setUp()
524        self.proc = CertificateProcessor()
525        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
526        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
527        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
528        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
529
530        self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv')
531        open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA)
532        return
533
534    def test_ifaces(self):
535        # Make sure we fullfill all interface contracts
536        verifyClass(IBatchProcessor, CourseProcessor)
537        verifyObject(IBatchProcessor, self.proc)
538        return
539
540    def test_parents_exist(self):
541        # make sure we lookup parents correctly
542        result1 = self.proc.parentsExist(self.row1, self.app)
543        result2 = self.proc.parentsExist(self.row2, self.app)
544        result3 = self.proc.parentsExist(self.row3, self.app)
545        self.assertTrue(result1 is False)
546        self.assertTrue(result2 is False)
547        self.assertTrue(result3 is True)
548        return
549
550    def test_entry_exists(self):
551        # make sure we find an entry if it exists
552        result1 = self.proc.entryExists(self.row1, self.app)
553        result2 = self.proc.entryExists(self.row2, self.app)
554        result3 = self.proc.entryExists(self.row3, self.app)
555        result4 = self.proc.entryExists(self.row4, self.app)
556        self.assertTrue(result1 is False)
557        self.assertTrue(result2 is False)
558        self.assertTrue(result3 is False)
559        self.assertTrue(result4 is True)
560        return
561
562    def test_get_entry(self):
563        # make sure we can get an entry if it exists
564        result1 = self.proc.getEntry(self.row1, self.app)
565        result2 = self.proc.getEntry(self.row4, self.app)
566        self.assertTrue(result1 is None)
567        self.assertTrue(result2 is self.certificate)
568        return
569
570    def test_del_entry(self):
571        # make sure we can delete entries
572        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
573        self.proc.delEntry(self.row4, self.app)
574        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
575        return
576
577    def test_import(self):
578        num, num_warns, fin_file, fail_file = self.proc.doImport(
579            self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS)
580        content = open(fail_file).read()
581        self.assertEqual(num_warns,6)
582        self.assertEqual(
583            content,
584            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
585            'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
586            'local_roles: alice does not exist\r\n'
587            'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
588            'local_roles: waeup.local.Boss not allowed\r\n'
589            'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n'
590            'FAC1,DEP1,CRT2,[,local_roles: Error\r\n'
591            'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",'
592            'local_roles: user_name or local_role missing\r\n'
593            'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
594            'Not all parents do exist yet. Skipping\r\n'
595            )
596        # Anne got a local role in certificate CRT2.
597        dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2']
598        role_map = IPrincipalRoleMap(dep)
599        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
600        self.assertEqual(user_name, 'anne')
601        self.assertEqual(local_role, 'waeup.local.CourseAdviser100')
602        shutil.rmtree(os.path.dirname(fin_file))
603       
604        logcontent = open(self.logfile).read()
605        # Logging message from updateEntry
606        self.assertTrue(
607            'INFO - system - Certificate Processor - CRT2 - Record updated: '
608            'local_roles=anne|waeup.local.CourseAdviser100, code=CRT2'
609            in logcontent)       
610       
611        return
612
613
614class CertCourseProcessorTests(FunctionalTestCase):
615
616    layer = FunctionalLayer
617
618    def setUp(self):
619        super(CertCourseProcessorTests, self).setUp()
620        self.proc = CertificateCourseProcessor()
621        self.site0 = dict()
622        self.site1 = dict(faculties=dict())
623        self.site2 = dict(faculties=dict(FAC=dict()))
624        self.department1 = Department(code='DPT')
625        self.department2 = Department(code='DPT')
626        self.certificate = Certificate(code='CRT')
627        self.department2.certificates['CRT'] = self.certificate
628        self.course = Course(code='CRS')
629        self.department2.certificates['CRT'].addCertCourse(self.course)
630        self.cert_course = self.certificate['CRS_100']
631        self.department2.courses['CRS'] = self.course
632        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
633        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
634        self.row = dict(
635            department_code='DPT',
636            faculty_code='FAC',
637            certificate_code='CRT',
638            course=self.course, level='100',
639            code='CRS_100')
640        return
641
642    def test_ifaces(self):
643        # Make sure we fullfill all interface contracts
644        verifyClass(IBatchProcessor, CertificateCourseProcessor)
645        verifyObject(IBatchProcessor, self.proc)
646        return
647
648    def test_parents_exist(self):
649        # make sure we can find all certificate parents
650        result0 = self.proc.parentsExist(self.row, self.site0)
651        result1 = self.proc.parentsExist(self.row, self.site1)
652        result2 = self.proc.parentsExist(self.row, self.site2)
653        result3 = self.proc.parentsExist(self.row, self.site3)
654        result4 = self.proc.parentsExist(self.row, self.site4)
655        self.assertTrue(result0 is False)
656        self.assertTrue(result1 is False)
657        self.assertTrue(result2 is False)
658        self.assertTrue(result3 is False)
659        self.assertTrue(result4 is True)
660        return
661
662    def test_entry_exists(self):
663        # make sure we find an entry if it exists
664        result0 = self.proc.entryExists(self.row, self.site0)
665        result1 = self.proc.entryExists(self.row, self.site1)
666        result2 = self.proc.entryExists(self.row, self.site2)
667        result3 = self.proc.entryExists(self.row, self.site3)
668        result4 = self.proc.entryExists(self.row, self.site4)
669        self.assertTrue(result0 is False)
670        self.assertTrue(result1 is False)
671        self.assertTrue(result2 is False)
672        self.assertTrue(result3 is False)
673        self.assertTrue(result4 is True)
674        return
675
676    def test_get_entry(self):
677        # make sure we can get an entry if it exists
678        result1 = self.proc.getEntry(self.row, self.site3)
679        result2 = self.proc.getEntry(self.row, self.site4)
680        self.assertTrue(result1 is None)
681        self.assertTrue(result2 is self.cert_course)
682        return
683
684    def test_del_entry(self):
685        # make sure we can delete entries
686        self.assertTrue('CRS_100' in self.certificate.keys())
687        self.proc.delEntry(self.row, self.site4)
688        self.assertTrue('CRS_100' not in self.certificate.keys())
689        return
Note: See TracBrowser for help on using the repository browser.