source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 9001

Last change on this file since 9001 was 9001, checked in by Henrik Bettermann, 12 years ago

Update local roles by import. Test if ignore and deltion marker are doing what they are supposed to do.

  • Property svn:keywords set to Id
File size: 24.9 KB
Line 
1## $Id: test_batching.py 9001 2012-07-13 16:17:44Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import (
27    IPrincipalRoleMap, IPrincipalRoleManager)
28from zope.interface.verify import verifyClass, verifyObject
29from waeup.kofa.interfaces import IBatchProcessor
30from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
31from waeup.kofa.app import University
32from waeup.kofa.university.batching import (
33    FacultyProcessor, DepartmentProcessor, CourseProcessor,
34    CertificateProcessor, CertificateCourseProcessor)
35from waeup.kofa.university.certificate import Certificate, CertificateCourse
36from waeup.kofa.university.course import Course
37from waeup.kofa.university import Faculty, Department
38from waeup.kofa.university.batching import FacultyProcessor
39
40FACULTY_SAMPLE_DATA = open(
41    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
42    'rb').read()
43
44FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
45    '\n')[0].split(',')
46
47FACULTY_SAMPLE_DATA_UPDATE = open(
48    os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'),
49    'rb').read()
50
51FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split(
52    '\n')[0].split(',')
53
54DEPARTMENT_SAMPLE_DATA = open(
55    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
56    'rb').read()
57
58DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
59    '\n')[0].split(',')
60
61CERTIFICATE_SAMPLE_DATA = open(
62    os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'),
63    'rb').read()
64
65CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split(
66    '\n')[0].split(',')
67
68class UniversityProcessorSetup(FunctionalTestCase):
69
70    layer = FunctionalLayer
71
72    def setUp(self):
73        super(UniversityProcessorSetup, self).setUp()
74        self.dc_root = tempfile.mkdtemp()
75        self.workdir = tempfile.mkdtemp()
76        app = University()
77        self.getRootFolder()['app'] = app
78        self.app = self.getRootFolder()['app']
79        setSite(app)
80
81        self.app['users'].addUser('bob', 'bobssecret')
82        self.app['users'].addUser('anne', 'annessecret')
83
84        # Populate university
85        self.certificate = createObject('waeup.Certificate')
86        self.certificate.code = 'CRT1'
87        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
88        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
89        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
90            self.certificate)
91        self.course = createObject('waeup.Course')
92        self.course.code = 'CRS1'
93        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
94            self.course)
95        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
96        #    self.course, level=100)
97        return
98
99    def tearDown(self):
100        super(UniversityProcessorSetup, self).tearDown()
101        shutil.rmtree(self.workdir)
102        shutil.rmtree(self.dc_root)
103        clearSite()
104        return
105
106class TestFacultyProcessor(UniversityProcessorSetup):
107
108    def setUp(self):
109        super(TestFacultyProcessor, self).setUp()
110
111        self.proc = FacultyProcessor()
112        self.site1 = dict(faculties=dict())
113        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
114        self.row = dict(code='FAC')
115
116        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
117        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
118        self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')
119        open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)
120        return
121
122    def test_ifaces(self):
123        # Make sure we fullfill all interface contracts
124        verifyClass(IBatchProcessor, FacultyProcessor)
125        verifyObject(IBatchProcessor, self.proc)
126        return
127
128    def test_get_entry(self):
129        # if a faculty exists already, we will get it
130        result1 = self.proc.getEntry(self.row, self.site1)
131        result2 = self.proc.getEntry(self.row, self.site2)
132        self.assertTrue(result1 is None)
133        self.assertEqual(result2, 'pseudo faculty')
134        return
135
136    def test_del_entry(self):
137        # make sure we can del entries.
138        self.proc.delEntry(self.row, self.site2)
139        self.assertTrue('FAC' not in self.site2.keys())
140        return
141
142    def test_checkConversion(self):
143        # Make sure we can check conversions.
144        errs, inv_errs, conv_dict = self.proc.checkConversion(
145            dict(faculty_code='ABC', local_roles='[]'))
146        self.assertEqual(len(errs),0)
147
148        errs, inv_errs, conv_dict = self.proc.checkConversion(
149            dict(faculty_code='ABC',
150            local_roles="['nonsense'"
151            ))
152        self.assertEqual(len(errs),1)
153        self.assertEqual(errs, [('local_roles', 'Error')])
154        errs, inv_errs, conv_dict = self.proc.checkConversion(
155            dict(faculty_code='ABC',
156            local_roles="('abc')"
157            ))
158        self.assertEqual(len(errs),1)
159        self.assertEqual(errs, [('local_roles', 'no list')])
160        errs, inv_errs, conv_dict = self.proc.checkConversion(
161            dict(faculty_code='ABC',
162            local_roles="[('ABC')]"
163            ))
164        self.assertEqual(len(errs),1)
165        self.assertEqual(errs, [('local_roles', 'no dicts')])
166        errs, inv_errs, conv_dict = self.proc.checkConversion(
167            dict(faculty_code='ABC',
168            local_roles="('abc')"
169            ))
170        self.assertEqual(len(errs),1)
171        self.assertEqual(errs, [('local_roles', 'no list')])
172        errs, inv_errs, conv_dict = self.proc.checkConversion(
173            dict(faculty_code='ABC', local_roles=
174            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
175            ))
176        self.assertEqual(len(errs),1)
177        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
178        errs, inv_errs, conv_dict = self.proc.checkConversion(
179            dict(faculty_code='ABC', local_roles=
180            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
181            ))
182        self.assertEqual(len(errs),1)
183        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
184        errs, inv_errs, conv_dict = self.proc.checkConversion(
185            dict(faculty_code='ABC', local_roles=
186            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
187            ))
188        self.assertEqual(len(errs),1)
189        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
190        errs, inv_errs, conv_dict = self.proc.checkConversion(
191            dict(faculty_code='ABC', local_roles=
192            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
193            ))
194        self.assertEqual(len(errs),1)
195        self.assertEqual(errs, [('local_roles', 'john does not exist')])
196        errs, inv_errs, conv_dict = self.proc.checkConversion(
197            dict(faculty_code='ABC', local_roles=
198            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
199            ))
200        self.assertEqual(len(errs),0)
201        return
202
203    def test_import(self):
204        num, num_warns, fin_file, fail_file = self.proc.doImport(
205            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
206        content = open(fail_file).read()
207        self.assertEqual(num_warns,5)
208        self.assertEqual(
209            content,
210            'code,local_roles,--ERRORS--\r\n'
211            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
212            'local_roles: alice does not exist\r\n'
213            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
214            'local_roles: waeup.local.Boss not allowed\r\n'
215            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
216            'FGH,[,local_roles: Error\r\n'
217            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
218            'local_roles: user_name or local_role missing\r\n'
219            )
220        # Bob got a local role in faculty ABC.
221        abc = self.app['faculties']['ABC']
222        role_map = IPrincipalRoleMap(abc)
223        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
224        self.assertEqual(user_name, 'bob')
225        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
226        shutil.rmtree(os.path.dirname(fin_file))
227        return
228
229    def test_import_update(self):
230        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
231        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
232        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
233
234        role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1'])
235        role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons')
236        role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1'])
237        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1)
238
239        role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2'])
240        role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons')
241        role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2'])
242        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
243
244        role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3'])
245        role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons')
246        role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3'])
247        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
248
249        role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4'])
250        role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons')
251        role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4'])
252        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
253
254        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
255        self.assertEqual(user_name, 'alfons')
256        self.assertEqual(local_role, 'alfonsrole')
257
258        num, num_warns, fin_file, fail_file = self.proc.doImport(
259            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
260        self.assertEqual(num_warns,0)
261        # Local roles have been removed in FAC1 due to deletion marker.
262        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0)
263        # Old local roles have been removed and new roles have been added in FAC2.
264        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
265        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
266        self.assertEqual(user_name, 'bob')
267        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
268        # Local roles are not touched in FAC3 due to ignore marker.
269        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
270        local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0]
271        self.assertEqual(user_name, 'alfons')
272        self.assertEqual(local_role, 'alfonsrole')
273        # Local roles are not touched in FAC4 due to empty cell.
274        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
275        local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0]
276        self.assertEqual(user_name, 'alfons')
277        self.assertEqual(local_role, 'alfonsrole')
278        shutil.rmtree(os.path.dirname(fin_file))
279        return
280
281class TestDepartmentProcessor(UniversityProcessorSetup):
282
283    def setUp(self):
284        super(TestDepartmentProcessor, self).setUp()
285        self.proc = DepartmentProcessor()
286        self.site0 = dict()
287        self.site1 = dict(faculties=dict())
288        self.site2 = dict(faculties=dict(FAC=dict()))
289        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
290        self.row = dict(code='DPT', faculty_code='FAC')
291
292        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
293        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
294        return
295
296    def test_ifaces(self):
297        # Make sure we fullfill all interface contracts
298        verifyClass(IBatchProcessor, DepartmentProcessor)
299        verifyObject(IBatchProcessor, self.proc)
300        return
301
302    def test_parents_exist(self):
303        # make sure we lookup parents correctly.
304        result0 = self.proc.parentsExist(self.row, self.site0)
305        result1 = self.proc.parentsExist(self.row, self.site1)
306        result2 = self.proc.parentsExist(self.row, self.site2)
307        result3 = self.proc.parentsExist(self.row, self.site3)
308        self.assertTrue(result0 is False)
309        self.assertTrue(result1 is False)
310        self.assertTrue(result2 is True)
311        self.assertTrue(result3 is True)
312        return
313
314    def test_entry_exists(self):
315        # make sure we lookup entries correctly.
316        result0 = self.proc.entryExists(self.row, dict())
317        result1 = self.proc.entryExists(self.row, self.site1)
318        result2 = self.proc.entryExists(self.row, self.site2)
319        result3 = self.proc.entryExists(self.row, self.site3)
320        self.assertTrue(result0 is False)
321        self.assertTrue(result1 is False)
322        self.assertTrue(result2 is False)
323        self.assertTrue(result3 is True)
324        return
325
326    def test_get_entry(self):
327        # we can get a dept. if it exists
328        result1 = self.proc.getEntry(self.row, self.site2)
329        result2 = self.proc.getEntry(self.row, self.site3)
330        self.assertTrue(result1 is None)
331        self.assertEqual(result2, 'pseudo department')
332        return
333
334    def test_del_entry(self):
335        # we can delete departments
336        self.proc.delEntry(self.row, self.site3)
337        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
338        return
339
340    def test_import(self):
341        num, num_warns, fin_file, fail_file = self.proc.doImport(
342            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
343        content = open(fail_file).read()
344        self.assertEqual(num_warns,6)
345        self.assertEqual(
346            content,
347            'faculty_code,code,local_roles,--ERRORS--\r\n'
348            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
349            'local_roles: alice does not exist\r\n'
350            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
351            'local_roles: waeup.local.Boss not allowed\r\n'
352            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
353            'FAC1,DEP2,[,local_roles: Error\r\n'
354            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
355            'local_roles: user_name or local_role missing\r\n'
356            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
357            'Not all parents do exist yet. Skipping\r\n'
358            )
359        # Anne got a local role in department ABC.
360        dep = self.app['faculties']['FAC1']['DEP2']
361        role_map = IPrincipalRoleMap(dep)
362        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
363        self.assertEqual(user_name, 'anne')
364        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
365        shutil.rmtree(os.path.dirname(fin_file))
366        return
367
368class CourseProcessorTests(UniversityProcessorSetup):
369
370    def setUp(self):
371        super(CourseProcessorTests, self).setUp()
372        self.proc = CourseProcessor()
373        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
374        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
375        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
376        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
377        return
378
379    def test_ifaces(self):
380        # Make sure we fullfill all interface contracts
381        verifyClass(IBatchProcessor, CourseProcessor)
382        verifyObject(IBatchProcessor, self.proc)
383        return
384
385    def test_parents_exist(self):
386        # make sure we lookup parents correctly
387        result1 = self.proc.parentsExist(self.row1, self.app)
388        result2 = self.proc.parentsExist(self.row2, self.app)
389        result3 = self.proc.parentsExist(self.row3, self.app)
390        self.assertTrue(result1 is False)
391        self.assertTrue(result2 is False)
392        self.assertTrue(result3 is True)
393        return
394
395    def test_entry_exists(self):
396        # make sure we find an entry if it exists
397        result1 = self.proc.entryExists(self.row1, self.app)
398        result2 = self.proc.entryExists(self.row2, self.app)
399        result3 = self.proc.entryExists(self.row3, self.app)
400        result4 = self.proc.entryExists(self.row4, self.app)
401        self.assertTrue(result1 is False)
402        self.assertTrue(result2 is False)
403        self.assertTrue(result3 is False)
404        self.assertTrue(result4 is True)
405        return
406
407    def test_get_entry(self):
408        # make sure we can get an entry if it exists
409        result1 = self.proc.getEntry(self.row1, self.app)
410        result2 = self.proc.getEntry(self.row4, self.app)
411        self.assertTrue(result1 is None)
412        self.assertTrue(result2 is self.course)
413        return
414
415    def test_del_entry(self):
416        # make sure we can delete entries
417        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
418        self.proc.delEntry(self.row4, self.app)
419        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
420        return
421
422class CertificateProcessorTests(UniversityProcessorSetup):
423
424    def setUp(self):
425        super(CertificateProcessorTests, self).setUp()
426        self.proc = CertificateProcessor()
427        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
428        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
429        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
430        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
431
432        self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv')
433        open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA)
434        return
435
436    def test_ifaces(self):
437        # Make sure we fullfill all interface contracts
438        verifyClass(IBatchProcessor, CourseProcessor)
439        verifyObject(IBatchProcessor, self.proc)
440        return
441
442    def test_parents_exist(self):
443        # make sure we lookup parents correctly
444        result1 = self.proc.parentsExist(self.row1, self.app)
445        result2 = self.proc.parentsExist(self.row2, self.app)
446        result3 = self.proc.parentsExist(self.row3, self.app)
447        self.assertTrue(result1 is False)
448        self.assertTrue(result2 is False)
449        self.assertTrue(result3 is True)
450        return
451
452    def test_entry_exists(self):
453        # make sure we find an entry if it exists
454        result1 = self.proc.entryExists(self.row1, self.app)
455        result2 = self.proc.entryExists(self.row2, self.app)
456        result3 = self.proc.entryExists(self.row3, self.app)
457        result4 = self.proc.entryExists(self.row4, self.app)
458        self.assertTrue(result1 is False)
459        self.assertTrue(result2 is False)
460        self.assertTrue(result3 is False)
461        self.assertTrue(result4 is True)
462        return
463
464    def test_get_entry(self):
465        # make sure we can get an entry if it exists
466        result1 = self.proc.getEntry(self.row1, self.app)
467        result2 = self.proc.getEntry(self.row4, self.app)
468        self.assertTrue(result1 is None)
469        self.assertTrue(result2 is self.certificate)
470        return
471
472    def test_del_entry(self):
473        # make sure we can delete entries
474        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
475        self.proc.delEntry(self.row4, self.app)
476        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
477        return
478
479    def test_import(self):
480        num, num_warns, fin_file, fail_file = self.proc.doImport(
481            self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS)
482        content = open(fail_file).read()
483        self.assertEqual(num_warns,6)
484        self.assertEqual(
485            content,
486            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
487            'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
488            'local_roles: alice does not exist\r\n'
489            'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
490            'local_roles: waeup.local.Boss not allowed\r\n'
491            'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n'
492            'FAC1,DEP1,CRT2,[,local_roles: Error\r\n'
493            'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",'
494            'local_roles: user_name or local_role missing\r\n'
495            'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
496            'Not all parents do exist yet. Skipping\r\n'
497            )
498        # Anne got a local role in certificate ABC.
499        dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2']
500        role_map = IPrincipalRoleMap(dep)
501        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
502        self.assertEqual(user_name, 'anne')
503        self.assertEqual(local_role, 'waeup.local.CourseAdviser100')
504        shutil.rmtree(os.path.dirname(fin_file))
505        return
506
507
508class CertCourseProcessorTests(FunctionalTestCase):
509
510    layer = FunctionalLayer
511
512    def setUp(self):
513        super(CertCourseProcessorTests, self).setUp()
514        self.proc = CertificateCourseProcessor()
515        self.site0 = dict()
516        self.site1 = dict(faculties=dict())
517        self.site2 = dict(faculties=dict(FAC=dict()))
518        self.department1 = Department(code='DPT')
519        self.department2 = Department(code='DPT')
520        self.certificate = Certificate(code='CRT')
521        self.department2.certificates['CRT'] = self.certificate
522        self.course = Course(code='CRS')
523        self.department2.certificates['CRT'].addCertCourse(self.course)
524        self.cert_course = self.certificate['CRS_100']
525        self.department2.courses['CRS'] = self.course
526        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
527        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
528        self.row = dict(
529            department_code='DPT',
530            faculty_code='FAC',
531            certificate_code='CRT',
532            course=self.course, level='100',
533            code='CRS_100')
534        return
535
536    def test_ifaces(self):
537        # Make sure we fullfill all interface contracts
538        verifyClass(IBatchProcessor, CertificateCourseProcessor)
539        verifyObject(IBatchProcessor, self.proc)
540        return
541
542    def test_parents_exist(self):
543        # make sure we can find all certificate parents
544        result0 = self.proc.parentsExist(self.row, self.site0)
545        result1 = self.proc.parentsExist(self.row, self.site1)
546        result2 = self.proc.parentsExist(self.row, self.site2)
547        result3 = self.proc.parentsExist(self.row, self.site3)
548        result4 = self.proc.parentsExist(self.row, self.site4)
549        self.assertTrue(result0 is False)
550        self.assertTrue(result1 is False)
551        self.assertTrue(result2 is False)
552        self.assertTrue(result3 is False)
553        self.assertTrue(result4 is True)
554        return
555
556    def test_entry_exists(self):
557        # make sure we find an entry if it exists
558        result0 = self.proc.entryExists(self.row, self.site0)
559        result1 = self.proc.entryExists(self.row, self.site1)
560        result2 = self.proc.entryExists(self.row, self.site2)
561        result3 = self.proc.entryExists(self.row, self.site3)
562        result4 = self.proc.entryExists(self.row, self.site4)
563        self.assertTrue(result0 is False)
564        self.assertTrue(result1 is False)
565        self.assertTrue(result2 is False)
566        self.assertTrue(result3 is False)
567        self.assertTrue(result4 is True)
568        return
569
570    def test_get_entry(self):
571        # make sure we can get an entry if it exists
572        result1 = self.proc.getEntry(self.row, self.site3)
573        result2 = self.proc.getEntry(self.row, self.site4)
574        self.assertTrue(result1 is None)
575        self.assertTrue(result2 is self.cert_course)
576        return
577
578    def test_del_entry(self):
579        # make sure we can delete entries
580        self.assertTrue('CRS_100' in self.certificate.keys())
581        self.proc.delEntry(self.row, self.site4)
582        self.assertTrue('CRS_100' not in self.certificate.keys())
583        return
Note: See TracBrowser for help on using the repository browser.