source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 9031

Last change on this file since 9031 was 9002, checked in by Henrik Bettermann, 13 years ago

Add lecturer local role which has no dynamic role assignment yet.

Add test_import to CourseProcessorTests?.

  • Property svn:keywords set to Id
File size: 26.7 KB
Line 
1## $Id: test_batching.py 9002 2012-07-13 16:36:21Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import (
27    IPrincipalRoleMap, IPrincipalRoleManager)
28from zope.interface.verify import verifyClass, verifyObject
29from waeup.kofa.interfaces import IBatchProcessor
30from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
31from waeup.kofa.app import University
32from waeup.kofa.university.batching import (
33    FacultyProcessor, DepartmentProcessor, CourseProcessor,
34    CertificateProcessor, CertificateCourseProcessor)
35from waeup.kofa.university.certificate import Certificate, CertificateCourse
36from waeup.kofa.university.course import Course
37from waeup.kofa.university import Faculty, Department
38from waeup.kofa.university.batching import FacultyProcessor
39
40FACULTY_SAMPLE_DATA = open(
41    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
42    'rb').read()
43
44FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
45    '\n')[0].split(',')
46
47FACULTY_SAMPLE_DATA_UPDATE = open(
48    os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'),
49    'rb').read()
50
51FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split(
52    '\n')[0].split(',')
53
54DEPARTMENT_SAMPLE_DATA = open(
55    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
56    'rb').read()
57
58DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
59    '\n')[0].split(',')
60
61CERTIFICATE_SAMPLE_DATA = open(
62    os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'),
63    'rb').read()
64
65CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split(
66    '\n')[0].split(',')
67
68COURSE_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'),
70    'rb').read()
71
72COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75class UniversityProcessorSetup(FunctionalTestCase):
76
77    layer = FunctionalLayer
78
79    def setUp(self):
80        super(UniversityProcessorSetup, self).setUp()
81        self.dc_root = tempfile.mkdtemp()
82        self.workdir = tempfile.mkdtemp()
83        app = University()
84        self.getRootFolder()['app'] = app
85        self.app = self.getRootFolder()['app']
86        setSite(app)
87
88        self.app['users'].addUser('bob', 'bobssecret')
89        self.app['users'].addUser('anne', 'annessecret')
90
91        # Populate university
92        self.certificate = createObject('waeup.Certificate')
93        self.certificate.code = 'CRT1'
94        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
95        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
96        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
97            self.certificate)
98        self.course = createObject('waeup.Course')
99        self.course.code = 'CRS1'
100        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
101            self.course)
102        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
103        #    self.course, level=100)
104        return
105
106    def tearDown(self):
107        super(UniversityProcessorSetup, self).tearDown()
108        shutil.rmtree(self.workdir)
109        shutil.rmtree(self.dc_root)
110        clearSite()
111        return
112
113class TestFacultyProcessor(UniversityProcessorSetup):
114
115    def setUp(self):
116        super(TestFacultyProcessor, self).setUp()
117
118        self.proc = FacultyProcessor()
119        self.site1 = dict(faculties=dict())
120        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
121        self.row = dict(code='FAC')
122
123        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
124        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
125        self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')
126        open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)
127        return
128
129    def test_ifaces(self):
130        # Make sure we fullfill all interface contracts
131        verifyClass(IBatchProcessor, FacultyProcessor)
132        verifyObject(IBatchProcessor, self.proc)
133        return
134
135    def test_get_entry(self):
136        # if a faculty exists already, we will get it
137        result1 = self.proc.getEntry(self.row, self.site1)
138        result2 = self.proc.getEntry(self.row, self.site2)
139        self.assertTrue(result1 is None)
140        self.assertEqual(result2, 'pseudo faculty')
141        return
142
143    def test_del_entry(self):
144        # make sure we can del entries.
145        self.proc.delEntry(self.row, self.site2)
146        self.assertTrue('FAC' not in self.site2.keys())
147        return
148
149    def test_checkConversion(self):
150        # Make sure we can check conversions.
151        errs, inv_errs, conv_dict = self.proc.checkConversion(
152            dict(faculty_code='ABC', local_roles='[]'))
153        self.assertEqual(len(errs),0)
154
155        errs, inv_errs, conv_dict = self.proc.checkConversion(
156            dict(faculty_code='ABC',
157            local_roles="['nonsense'"
158            ))
159        self.assertEqual(len(errs),1)
160        self.assertEqual(errs, [('local_roles', 'Error')])
161        errs, inv_errs, conv_dict = self.proc.checkConversion(
162            dict(faculty_code='ABC',
163            local_roles="('abc')"
164            ))
165        self.assertEqual(len(errs),1)
166        self.assertEqual(errs, [('local_roles', 'no list')])
167        errs, inv_errs, conv_dict = self.proc.checkConversion(
168            dict(faculty_code='ABC',
169            local_roles="[('ABC')]"
170            ))
171        self.assertEqual(len(errs),1)
172        self.assertEqual(errs, [('local_roles', 'no dicts')])
173        errs, inv_errs, conv_dict = self.proc.checkConversion(
174            dict(faculty_code='ABC',
175            local_roles="('abc')"
176            ))
177        self.assertEqual(len(errs),1)
178        self.assertEqual(errs, [('local_roles', 'no list')])
179        errs, inv_errs, conv_dict = self.proc.checkConversion(
180            dict(faculty_code='ABC', local_roles=
181            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
182            ))
183        self.assertEqual(len(errs),1)
184        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
185        errs, inv_errs, conv_dict = self.proc.checkConversion(
186            dict(faculty_code='ABC', local_roles=
187            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
188            ))
189        self.assertEqual(len(errs),1)
190        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
191        errs, inv_errs, conv_dict = self.proc.checkConversion(
192            dict(faculty_code='ABC', local_roles=
193            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
194            ))
195        self.assertEqual(len(errs),1)
196        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
197        errs, inv_errs, conv_dict = self.proc.checkConversion(
198            dict(faculty_code='ABC', local_roles=
199            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
200            ))
201        self.assertEqual(len(errs),1)
202        self.assertEqual(errs, [('local_roles', 'john does not exist')])
203        errs, inv_errs, conv_dict = self.proc.checkConversion(
204            dict(faculty_code='ABC', local_roles=
205            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
206            ))
207        self.assertEqual(len(errs),0)
208        return
209
210    def test_import(self):
211        num, num_warns, fin_file, fail_file = self.proc.doImport(
212            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
213        content = open(fail_file).read()
214        self.assertEqual(num_warns,5)
215        self.assertEqual(
216            content,
217            'code,local_roles,--ERRORS--\r\n'
218            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
219            'local_roles: alice does not exist\r\n'
220            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
221            'local_roles: waeup.local.Boss not allowed\r\n'
222            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
223            'FGH,[,local_roles: Error\r\n'
224            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
225            'local_roles: user_name or local_role missing\r\n'
226            )
227        # Bob got a local role in faculty ABC.
228        abc = self.app['faculties']['ABC']
229        role_map = IPrincipalRoleMap(abc)
230        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
231        self.assertEqual(user_name, 'bob')
232        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
233        shutil.rmtree(os.path.dirname(fin_file))
234        return
235
236    def test_import_update(self):
237        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
238        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
239        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
240
241        role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1'])
242        role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons')
243        role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1'])
244        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1)
245
246        role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2'])
247        role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons')
248        role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2'])
249        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
250
251        role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3'])
252        role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons')
253        role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3'])
254        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
255
256        role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4'])
257        role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons')
258        role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4'])
259        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
260
261        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
262        self.assertEqual(user_name, 'alfons')
263        self.assertEqual(local_role, 'alfonsrole')
264
265        num, num_warns, fin_file, fail_file = self.proc.doImport(
266            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
267        self.assertEqual(num_warns,0)
268        # Local roles have been removed in FAC1 due to deletion marker.
269        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0)
270        # Old local roles have been removed and new roles have been added in FAC2.
271        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
272        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
273        self.assertEqual(user_name, 'bob')
274        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
275        # Local roles are not touched in FAC3 due to ignore marker.
276        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
277        local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0]
278        self.assertEqual(user_name, 'alfons')
279        self.assertEqual(local_role, 'alfonsrole')
280        # Local roles are not touched in FAC4 due to empty cell.
281        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
282        local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0]
283        self.assertEqual(user_name, 'alfons')
284        self.assertEqual(local_role, 'alfonsrole')
285        shutil.rmtree(os.path.dirname(fin_file))
286        return
287
288class TestDepartmentProcessor(UniversityProcessorSetup):
289
290    def setUp(self):
291        super(TestDepartmentProcessor, self).setUp()
292        self.proc = DepartmentProcessor()
293        self.site0 = dict()
294        self.site1 = dict(faculties=dict())
295        self.site2 = dict(faculties=dict(FAC=dict()))
296        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
297        self.row = dict(code='DPT', faculty_code='FAC')
298
299        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
300        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
301        return
302
303    def test_ifaces(self):
304        # Make sure we fullfill all interface contracts
305        verifyClass(IBatchProcessor, DepartmentProcessor)
306        verifyObject(IBatchProcessor, self.proc)
307        return
308
309    def test_parents_exist(self):
310        # make sure we lookup parents correctly.
311        result0 = self.proc.parentsExist(self.row, self.site0)
312        result1 = self.proc.parentsExist(self.row, self.site1)
313        result2 = self.proc.parentsExist(self.row, self.site2)
314        result3 = self.proc.parentsExist(self.row, self.site3)
315        self.assertTrue(result0 is False)
316        self.assertTrue(result1 is False)
317        self.assertTrue(result2 is True)
318        self.assertTrue(result3 is True)
319        return
320
321    def test_entry_exists(self):
322        # make sure we lookup entries correctly.
323        result0 = self.proc.entryExists(self.row, dict())
324        result1 = self.proc.entryExists(self.row, self.site1)
325        result2 = self.proc.entryExists(self.row, self.site2)
326        result3 = self.proc.entryExists(self.row, self.site3)
327        self.assertTrue(result0 is False)
328        self.assertTrue(result1 is False)
329        self.assertTrue(result2 is False)
330        self.assertTrue(result3 is True)
331        return
332
333    def test_get_entry(self):
334        # we can get a dept. if it exists
335        result1 = self.proc.getEntry(self.row, self.site2)
336        result2 = self.proc.getEntry(self.row, self.site3)
337        self.assertTrue(result1 is None)
338        self.assertEqual(result2, 'pseudo department')
339        return
340
341    def test_del_entry(self):
342        # we can delete departments
343        self.proc.delEntry(self.row, self.site3)
344        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
345        return
346
347    def test_import(self):
348        num, num_warns, fin_file, fail_file = self.proc.doImport(
349            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
350        content = open(fail_file).read()
351        self.assertEqual(num_warns,6)
352        self.assertEqual(
353            content,
354            'faculty_code,code,local_roles,--ERRORS--\r\n'
355            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
356            'local_roles: alice does not exist\r\n'
357            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
358            'local_roles: waeup.local.Boss not allowed\r\n'
359            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
360            'FAC1,DEP2,[,local_roles: Error\r\n'
361            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
362            'local_roles: user_name or local_role missing\r\n'
363            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
364            'Not all parents do exist yet. Skipping\r\n'
365            )
366        # Anne got a local role in department ABC.
367        dep = self.app['faculties']['FAC1']['DEP2']
368        role_map = IPrincipalRoleMap(dep)
369        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
370        self.assertEqual(user_name, 'anne')
371        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
372        shutil.rmtree(os.path.dirname(fin_file))
373        return
374
375class CourseProcessorTests(UniversityProcessorSetup):
376
377    def setUp(self):
378        super(CourseProcessorTests, self).setUp()
379        self.proc = CourseProcessor()
380        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
381        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
382        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
383        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
384
385        self.csv_file_course = os.path.join(self.workdir, 'sample_course_data.csv')
386        open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA)
387        return
388
389    def test_ifaces(self):
390        # Make sure we fullfill all interface contracts
391        verifyClass(IBatchProcessor, CourseProcessor)
392        verifyObject(IBatchProcessor, self.proc)
393        return
394
395    def test_parents_exist(self):
396        # make sure we lookup parents correctly
397        result1 = self.proc.parentsExist(self.row1, self.app)
398        result2 = self.proc.parentsExist(self.row2, self.app)
399        result3 = self.proc.parentsExist(self.row3, self.app)
400        self.assertTrue(result1 is False)
401        self.assertTrue(result2 is False)
402        self.assertTrue(result3 is True)
403        return
404
405    def test_entry_exists(self):
406        # make sure we find an entry if it exists
407        result1 = self.proc.entryExists(self.row1, self.app)
408        result2 = self.proc.entryExists(self.row2, self.app)
409        result3 = self.proc.entryExists(self.row3, self.app)
410        result4 = self.proc.entryExists(self.row4, self.app)
411        self.assertTrue(result1 is False)
412        self.assertTrue(result2 is False)
413        self.assertTrue(result3 is False)
414        self.assertTrue(result4 is True)
415        return
416
417    def test_get_entry(self):
418        # make sure we can get an entry if it exists
419        result1 = self.proc.getEntry(self.row1, self.app)
420        result2 = self.proc.getEntry(self.row4, self.app)
421        self.assertTrue(result1 is None)
422        self.assertTrue(result2 is self.course)
423        return
424
425    def test_del_entry(self):
426        # make sure we can delete entries
427        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
428        self.proc.delEntry(self.row4, self.app)
429        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
430        return
431
432    def test_import(self):
433        num, num_warns, fin_file, fail_file = self.proc.doImport(
434            self.csv_file_course, COURSE_HEADER_FIELDS)
435        content = open(fail_file).read()
436        self.assertEqual(num_warns,6)
437        self.assertEqual(
438            content,
439            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
440            'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",'
441            'local_roles: alice does not exist\r\n'
442            'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
443            'local_roles: waeup.local.Boss not allowed\r\n'
444            'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n'
445            'FAC1,DEP1,CRS2,[,local_roles: Error\r\n'
446            'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",'
447            'local_roles: user_name or local_role missing\r\n'
448            'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",'
449            'Not all parents do exist yet. Skipping\r\n'
450            )
451        # Anne got a local role in course ABC.
452        dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2']
453        role_map = IPrincipalRoleMap(dep)
454        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
455        self.assertEqual(user_name, 'anne')
456        self.assertEqual(local_role, 'waeup.local.Lecturer')
457        shutil.rmtree(os.path.dirname(fin_file))
458        return
459
460
461class CertificateProcessorTests(UniversityProcessorSetup):
462
463    def setUp(self):
464        super(CertificateProcessorTests, self).setUp()
465        self.proc = CertificateProcessor()
466        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
467        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
468        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
469        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
470
471        self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv')
472        open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA)
473        return
474
475    def test_ifaces(self):
476        # Make sure we fullfill all interface contracts
477        verifyClass(IBatchProcessor, CourseProcessor)
478        verifyObject(IBatchProcessor, self.proc)
479        return
480
481    def test_parents_exist(self):
482        # make sure we lookup parents correctly
483        result1 = self.proc.parentsExist(self.row1, self.app)
484        result2 = self.proc.parentsExist(self.row2, self.app)
485        result3 = self.proc.parentsExist(self.row3, self.app)
486        self.assertTrue(result1 is False)
487        self.assertTrue(result2 is False)
488        self.assertTrue(result3 is True)
489        return
490
491    def test_entry_exists(self):
492        # make sure we find an entry if it exists
493        result1 = self.proc.entryExists(self.row1, self.app)
494        result2 = self.proc.entryExists(self.row2, self.app)
495        result3 = self.proc.entryExists(self.row3, self.app)
496        result4 = self.proc.entryExists(self.row4, self.app)
497        self.assertTrue(result1 is False)
498        self.assertTrue(result2 is False)
499        self.assertTrue(result3 is False)
500        self.assertTrue(result4 is True)
501        return
502
503    def test_get_entry(self):
504        # make sure we can get an entry if it exists
505        result1 = self.proc.getEntry(self.row1, self.app)
506        result2 = self.proc.getEntry(self.row4, self.app)
507        self.assertTrue(result1 is None)
508        self.assertTrue(result2 is self.certificate)
509        return
510
511    def test_del_entry(self):
512        # make sure we can delete entries
513        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
514        self.proc.delEntry(self.row4, self.app)
515        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
516        return
517
518    def test_import(self):
519        num, num_warns, fin_file, fail_file = self.proc.doImport(
520            self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS)
521        content = open(fail_file).read()
522        self.assertEqual(num_warns,6)
523        self.assertEqual(
524            content,
525            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
526            'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
527            'local_roles: alice does not exist\r\n'
528            'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
529            'local_roles: waeup.local.Boss not allowed\r\n'
530            'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n'
531            'FAC1,DEP1,CRT2,[,local_roles: Error\r\n'
532            'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",'
533            'local_roles: user_name or local_role missing\r\n'
534            'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
535            'Not all parents do exist yet. Skipping\r\n'
536            )
537        # Anne got a local role in certificate ABC.
538        dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2']
539        role_map = IPrincipalRoleMap(dep)
540        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
541        self.assertEqual(user_name, 'anne')
542        self.assertEqual(local_role, 'waeup.local.CourseAdviser100')
543        shutil.rmtree(os.path.dirname(fin_file))
544        return
545
546
547class CertCourseProcessorTests(FunctionalTestCase):
548
549    layer = FunctionalLayer
550
551    def setUp(self):
552        super(CertCourseProcessorTests, self).setUp()
553        self.proc = CertificateCourseProcessor()
554        self.site0 = dict()
555        self.site1 = dict(faculties=dict())
556        self.site2 = dict(faculties=dict(FAC=dict()))
557        self.department1 = Department(code='DPT')
558        self.department2 = Department(code='DPT')
559        self.certificate = Certificate(code='CRT')
560        self.department2.certificates['CRT'] = self.certificate
561        self.course = Course(code='CRS')
562        self.department2.certificates['CRT'].addCertCourse(self.course)
563        self.cert_course = self.certificate['CRS_100']
564        self.department2.courses['CRS'] = self.course
565        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
566        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
567        self.row = dict(
568            department_code='DPT',
569            faculty_code='FAC',
570            certificate_code='CRT',
571            course=self.course, level='100',
572            code='CRS_100')
573        return
574
575    def test_ifaces(self):
576        # Make sure we fullfill all interface contracts
577        verifyClass(IBatchProcessor, CertificateCourseProcessor)
578        verifyObject(IBatchProcessor, self.proc)
579        return
580
581    def test_parents_exist(self):
582        # make sure we can find all certificate parents
583        result0 = self.proc.parentsExist(self.row, self.site0)
584        result1 = self.proc.parentsExist(self.row, self.site1)
585        result2 = self.proc.parentsExist(self.row, self.site2)
586        result3 = self.proc.parentsExist(self.row, self.site3)
587        result4 = self.proc.parentsExist(self.row, self.site4)
588        self.assertTrue(result0 is False)
589        self.assertTrue(result1 is False)
590        self.assertTrue(result2 is False)
591        self.assertTrue(result3 is False)
592        self.assertTrue(result4 is True)
593        return
594
595    def test_entry_exists(self):
596        # make sure we find an entry if it exists
597        result0 = self.proc.entryExists(self.row, self.site0)
598        result1 = self.proc.entryExists(self.row, self.site1)
599        result2 = self.proc.entryExists(self.row, self.site2)
600        result3 = self.proc.entryExists(self.row, self.site3)
601        result4 = self.proc.entryExists(self.row, self.site4)
602        self.assertTrue(result0 is False)
603        self.assertTrue(result1 is False)
604        self.assertTrue(result2 is False)
605        self.assertTrue(result3 is False)
606        self.assertTrue(result4 is True)
607        return
608
609    def test_get_entry(self):
610        # make sure we can get an entry if it exists
611        result1 = self.proc.getEntry(self.row, self.site3)
612        result2 = self.proc.getEntry(self.row, self.site4)
613        self.assertTrue(result1 is None)
614        self.assertTrue(result2 is self.cert_course)
615        return
616
617    def test_del_entry(self):
618        # make sure we can delete entries
619        self.assertTrue('CRS_100' in self.certificate.keys())
620        self.proc.delEntry(self.row, self.site4)
621        self.assertTrue('CRS_100' not in self.certificate.keys())
622        return
Note: See TracBrowser for help on using the repository browser.