source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 9798

Last change on this file since 9798 was 9754, checked in by Henrik Bettermann, 12 years ago

Fix test.

  • Property svn:keywords set to Id
File size: 30.9 KB
Line 
1## $Id: test_batching.py 9754 2012-12-01 08:21:23Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import (
27    IPrincipalRoleMap, IPrincipalRoleManager)
28from zope.testbrowser.testing import Browser
29from zope.interface.verify import verifyClass, verifyObject
30from waeup.kofa.interfaces import IBatchProcessor
31from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
32from waeup.kofa.app import University
33from waeup.kofa.university.batching import (
34    FacultyProcessor, DepartmentProcessor, CourseProcessor,
35    CertificateProcessor, CertificateCourseProcessor)
36from waeup.kofa.university.certificate import Certificate, CertificateCourse
37from waeup.kofa.university.course import Course
38from waeup.kofa.university import Faculty, Department
39from waeup.kofa.university.batching import FacultyProcessor
40
41FACULTY_SAMPLE_DATA = open(
42    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
43    'rb').read()
44
45FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
46    '\n')[0].split(',')
47
48FACULTY_SAMPLE_DATA_UPDATE = open(
49    os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'),
50    'rb').read()
51
52FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split(
53    '\n')[0].split(',')
54
55DEPARTMENT_SAMPLE_DATA = open(
56    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
57    'rb').read()
58
59DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
60    '\n')[0].split(',')
61
62CERTIFICATE_SAMPLE_DATA = open(
63    os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'),
64    'rb').read()
65
66CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split(
67    '\n')[0].split(',')
68
69COURSE_SAMPLE_DATA = open(
70    os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'),
71    'rb').read()
72
73COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split(
74    '\n')[0].split(',')
75
76COURSE_SAMPLE_DATA_UPDATE = open(
77    os.path.join(os.path.dirname(__file__), 'sample_course_data_update.csv'),
78    'rb').read()
79
80COURSE_HEADER_FIELDS_UPDATE = COURSE_SAMPLE_DATA_UPDATE.split(
81    '\n')[0].split(',')
82
83class UniversityProcessorSetup(FunctionalTestCase):
84
85    layer = FunctionalLayer
86
87    def setUp(self):
88        super(UniversityProcessorSetup, self).setUp()
89        self.dc_root = tempfile.mkdtemp()
90        self.workdir = tempfile.mkdtemp()
91        app = University()
92        self.getRootFolder()['app'] = app
93        self.app = self.getRootFolder()['app']
94        setSite(app)
95
96        self.app['users'].addUser('bob', 'bobssecret')
97        self.app['users'].addUser('anne', 'annessecret')
98
99        # Populate university
100        self.certificate = createObject('waeup.Certificate')
101        self.certificate.code = 'CRT1'
102        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
103        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
104        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
105            self.certificate)
106        self.course = createObject('waeup.Course')
107        self.course.code = 'CRS1'
108        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
109            self.course)
110        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
111        #    self.course, level=100)
112       
113        self.logfile = os.path.join(
114            self.app['datacenter'].storage, 'logs', 'main.log')
115        return
116
117    def tearDown(self):
118        super(UniversityProcessorSetup, self).tearDown()
119        shutil.rmtree(self.workdir)
120        shutil.rmtree(self.dc_root)
121        clearSite()
122        return
123
124class TestFacultyProcessor(UniversityProcessorSetup):
125
126    def setUp(self):
127        super(TestFacultyProcessor, self).setUp()
128
129        self.browser = Browser()
130        self.browser.handleErrors = False
131        self.datacenter_path = 'http://localhost/app/datacenter'
132        self.app['datacenter'].setStoragePath(self.dc_root)
133
134        self.proc = FacultyProcessor()
135        self.site1 = dict(faculties=dict())
136        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
137        self.row = dict(code='FAC')
138
139        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
140        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
141        self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')
142        open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)
143        return
144
145    def test_ifaces(self):
146        # Make sure we fullfill all interface contracts
147        verifyClass(IBatchProcessor, FacultyProcessor)
148        verifyObject(IBatchProcessor, self.proc)
149        return
150
151    def test_get_entry(self):
152        # if a faculty exists already, we will get it
153        result1 = self.proc.getEntry(self.row, self.site1)
154        result2 = self.proc.getEntry(self.row, self.site2)
155        self.assertTrue(result1 is None)
156        self.assertEqual(result2, 'pseudo faculty')
157        return
158
159    def test_del_entry(self):
160        # make sure we can del entries.
161        self.proc.delEntry(self.row, self.site2)
162        self.assertTrue('FAC' not in self.site2.keys())
163        return
164
165    def test_checkConversion(self):
166        # Make sure we can check conversions.
167        errs, inv_errs, conv_dict = self.proc.checkConversion(
168            dict(faculty_code='ABC', local_roles='[]'))
169        self.assertEqual(len(errs),0)
170
171        errs, inv_errs, conv_dict = self.proc.checkConversion(
172            dict(faculty_code='ABC',
173            local_roles="['nonsense'"
174            ))
175        self.assertEqual(len(errs),1)
176        self.assertEqual(errs, [('local_roles', 'Error')])
177        errs, inv_errs, conv_dict = self.proc.checkConversion(
178            dict(faculty_code='ABC',
179            local_roles="('abc')"
180            ))
181        self.assertEqual(len(errs),1)
182        self.assertEqual(errs, [('local_roles', 'no list')])
183        errs, inv_errs, conv_dict = self.proc.checkConversion(
184            dict(faculty_code='ABC',
185            local_roles="[('ABC')]"
186            ))
187        self.assertEqual(len(errs),1)
188        self.assertEqual(errs, [('local_roles', 'no dicts')])
189        errs, inv_errs, conv_dict = self.proc.checkConversion(
190            dict(faculty_code='ABC',
191            local_roles="('abc')"
192            ))
193        self.assertEqual(len(errs),1)
194        self.assertEqual(errs, [('local_roles', 'no list')])
195        errs, inv_errs, conv_dict = self.proc.checkConversion(
196            dict(faculty_code='ABC', local_roles=
197            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
198            ))
199        self.assertEqual(len(errs),1)
200        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
201        errs, inv_errs, conv_dict = self.proc.checkConversion(
202            dict(faculty_code='ABC', local_roles=
203            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
204            ))
205        self.assertEqual(len(errs),1)
206        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
207        errs, inv_errs, conv_dict = self.proc.checkConversion(
208            dict(faculty_code='ABC', local_roles=
209            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
210            ))
211        self.assertEqual(len(errs),1)
212        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
213        errs, inv_errs, conv_dict = self.proc.checkConversion(
214            dict(faculty_code='ABC', local_roles=
215            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
216            ))
217        self.assertEqual(len(errs),1)
218        self.assertEqual(errs, [('local_roles', 'john does not exist')])
219        errs, inv_errs, conv_dict = self.proc.checkConversion(
220            dict(faculty_code='ABC', local_roles=
221            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
222            ))
223        self.assertEqual(len(errs),0)
224        return
225
226    def test_import(self):
227        num, num_warns, fin_file, fail_file = self.proc.doImport(
228            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
229        content = open(fail_file).read()
230        self.assertEqual(num_warns,5)
231        self.assertEqual(
232            content,
233            'code,local_roles,--ERRORS--\r\n'
234            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
235            'local_roles: alice does not exist\r\n'
236            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
237            'local_roles: waeup.local.Boss not allowed\r\n'
238            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
239            'FGH,[,local_roles: Error\r\n'
240            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
241            'local_roles: user_name or local_role missing\r\n'
242            )
243        # Bob got a local role in faculty ABC.
244        abc = self.app['faculties']['ABC']
245        role_map = IPrincipalRoleMap(abc)
246        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
247        self.assertEqual(user_name, 'bob')
248        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
249        shutil.rmtree(os.path.dirname(fin_file))
250        return
251
252    def test_import_update(self):
253        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
254        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
255        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
256
257        role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1'])
258        role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons')
259        role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1'])
260        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1)
261
262        role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2'])
263        role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons')
264        role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2'])
265        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
266
267        role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3'])
268        role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons')
269        role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3'])
270        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
271
272        role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4'])
273        role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons')
274        role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4'])
275        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
276
277        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
278        self.assertEqual(user_name, 'alfons')
279        self.assertEqual(local_role, 'alfonsrole')
280
281        num, num_warns, fin_file, fail_file = self.proc.doImport(
282            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
283        self.assertEqual(num_warns,0)
284        # Local roles have been removed in FAC1 due to deletion marker.
285        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0)
286        # Old local roles have been removed and new roles have been added in FAC2.
287        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
288        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
289        self.assertEqual(user_name, 'bob')
290        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
291        # Local roles are not touched in FAC3 due to ignore marker.
292        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
293        local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0]
294        self.assertEqual(user_name, 'alfons')
295        self.assertEqual(local_role, 'alfonsrole')
296        # Local roles are not touched in FAC4 due to empty cell.
297        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
298        local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0]
299        self.assertEqual(user_name, 'alfons')
300        self.assertEqual(local_role, 'alfonsrole')
301        shutil.rmtree(os.path.dirname(fin_file))
302        return
303       
304    def test_import_update_logging(self):
305        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
306        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
307        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
308        num, num_warns, fin_file, fail_file = self.proc.doImport(
309            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
310        self.assertEqual(num_warns,0)
311        logcontent = open(self.logfile).read()
312        # Logging message from updateEntry
313        self.assertTrue(
314            'INFO - system - Department Processor - sample_department_data '
315            '- DEP2 - updated: local_roles=anne|waeup.local.DepartmentManager, '
316            'code=DEP2\n'
317            in logcontent)
318        shutil.rmtree(os.path.dirname(fin_file))
319
320    def test_upload_import_reupload(self):
321        self.browser.addHeader('Authorization', 'Basic mgr:mgrpw')
322        self.browser.open(self.datacenter_path)
323        self.assertEqual(self.browser.headers['Status'], '200 Ok')
324        self.assertEqual(self.browser.url, self.datacenter_path)
325        self.browser.getLink("Upload data").click()
326        file = open(self.csv_file_faculty)
327        ctrl = self.browser.getControl(name='uploadfile:file')
328        file_ctrl = ctrl.mech_control
329        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
330        self.browser.getControl('Upload').click()
331        self.browser.getLink('Process data').click()
332        self.browser.getControl(name="select").click()
333        importerselect = self.browser.getControl(name='importer')
334        importerselect.getControl('Faculty Processor').selected = True
335        modeselect = self.browser.getControl(name='mode')
336        modeselect.getControl(value='create').selected = True
337        self.browser.getControl('Proceed to step 3').click()
338        self.assertTrue('Header fields OK' in self.browser.contents)
339        self.browser.getControl('Perform import').click()
340        self.assertTrue('Successfully processed 1 rows' in self.browser.contents)
341        # We import the same file a second time.
342        self.browser.open(self.datacenter_path)
343        self.browser.getLink("Upload data").click()
344        file = open(self.csv_file_faculty)
345        ctrl = self.browser.getControl(name='uploadfile:file')
346        file_ctrl = ctrl.mech_control
347        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
348        self.browser.getControl('Upload').click()
349        self.assertTrue(
350            'File with same name was uploaded earlier' in self.browser.contents)
351        return
352
353class TestDepartmentProcessor(UniversityProcessorSetup):
354
355    def setUp(self):
356        super(TestDepartmentProcessor, self).setUp()
357        self.proc = DepartmentProcessor()
358        self.site0 = dict()
359        self.site1 = dict(faculties=dict())
360        self.site2 = dict(faculties=dict(FAC=dict()))
361        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
362        self.row = dict(code='DPT', faculty_code='FAC')
363
364        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
365        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
366        return
367
368    def test_ifaces(self):
369        # Make sure we fullfill all interface contracts
370        verifyClass(IBatchProcessor, DepartmentProcessor)
371        verifyObject(IBatchProcessor, self.proc)
372        return
373
374    def test_parents_exist(self):
375        # make sure we lookup parents correctly.
376        result0 = self.proc.parentsExist(self.row, self.site0)
377        result1 = self.proc.parentsExist(self.row, self.site1)
378        result2 = self.proc.parentsExist(self.row, self.site2)
379        result3 = self.proc.parentsExist(self.row, self.site3)
380        self.assertTrue(result0 is False)
381        self.assertTrue(result1 is False)
382        self.assertTrue(result2 is True)
383        self.assertTrue(result3 is True)
384        return
385
386    def test_entry_exists(self):
387        # make sure we lookup entries correctly.
388        result0 = self.proc.entryExists(self.row, dict())
389        result1 = self.proc.entryExists(self.row, self.site1)
390        result2 = self.proc.entryExists(self.row, self.site2)
391        result3 = self.proc.entryExists(self.row, self.site3)
392        self.assertTrue(result0 is False)
393        self.assertTrue(result1 is False)
394        self.assertTrue(result2 is False)
395        self.assertTrue(result3 is True)
396        return
397
398    def test_get_entry(self):
399        # we can get a dept. if it exists
400        result1 = self.proc.getEntry(self.row, self.site2)
401        result2 = self.proc.getEntry(self.row, self.site3)
402        self.assertTrue(result1 is None)
403        self.assertEqual(result2, 'pseudo department')
404        return
405
406    def test_del_entry(self):
407        # we can delete departments
408        self.proc.delEntry(self.row, self.site3)
409        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
410        return
411
412    def test_import(self):
413        num, num_warns, fin_file, fail_file = self.proc.doImport(
414            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
415        content = open(fail_file).read()
416        self.assertEqual(num_warns,6)
417        self.assertEqual(
418            content,
419            'faculty_code,code,local_roles,--ERRORS--\r\n'
420            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
421            'local_roles: alice does not exist\r\n'
422            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
423            'local_roles: waeup.local.Boss not allowed\r\n'
424            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
425            'FAC1,DEP2,[,local_roles: Error\r\n'
426            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
427            'local_roles: user_name or local_role missing\r\n'
428            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
429            'Not all parents do exist yet. Skipping\r\n'
430            )
431        # Anne got a local role in department DEP2.
432        dep = self.app['faculties']['FAC1']['DEP2']
433        role_map = IPrincipalRoleMap(dep)
434        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
435        self.assertEqual(user_name, 'anne')
436        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
437        shutil.rmtree(os.path.dirname(fin_file))
438        return
439
440class CourseProcessorTests(UniversityProcessorSetup):
441
442    def setUp(self):
443        super(CourseProcessorTests, self).setUp()
444        self.proc = CourseProcessor()
445        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
446        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
447        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
448        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
449
450        self.csv_file_course = os.path.join(
451            self.workdir, 'sample_course_data.csv')
452        open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA)
453        self.csv_file_course_update = os.path.join(
454            self.workdir, 'sample_course_data_update.csv')
455        open(self.csv_file_course_update, 'wb').write(COURSE_SAMPLE_DATA_UPDATE)
456        return
457
458    def test_ifaces(self):
459        # Make sure we fullfill all interface contracts
460        verifyClass(IBatchProcessor, CourseProcessor)
461        verifyObject(IBatchProcessor, self.proc)
462        return
463
464    def test_parents_exist(self):
465        # make sure we lookup parents correctly
466        result1 = self.proc.parentsExist(self.row1, self.app)
467        result2 = self.proc.parentsExist(self.row2, self.app)
468        result3 = self.proc.parentsExist(self.row3, self.app)
469        self.assertTrue(result1 is False)
470        self.assertTrue(result2 is False)
471        self.assertTrue(result3 is True)
472        return
473
474    def test_entry_exists(self):
475        # make sure we find an entry if it exists
476        result1 = self.proc.entryExists(self.row1, self.app)
477        result2 = self.proc.entryExists(self.row2, self.app)
478        result3 = self.proc.entryExists(self.row3, self.app)
479        result4 = self.proc.entryExists(self.row4, self.app)
480        self.assertTrue(result1 is False)
481        self.assertTrue(result2 is False)
482        self.assertTrue(result3 is False)
483        self.assertTrue(result4 is True)
484        return
485
486    def test_get_entry(self):
487        # make sure we can get an entry if it exists
488        result1 = self.proc.getEntry(self.row1, self.app)
489        result2 = self.proc.getEntry(self.row4, self.app)
490        self.assertTrue(result1 is None)
491        self.assertTrue(result2 is self.course)
492        return
493
494    def test_del_entry(self):
495        # make sure we can delete entries
496        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
497        self.proc.delEntry(self.row4, self.app)
498        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
499        return
500
501    def test_import(self):
502        num, num_warns, fin_file, fail_file = self.proc.doImport(
503            self.csv_file_course, COURSE_HEADER_FIELDS)
504        content = open(fail_file).read()
505        self.assertEqual(num_warns,6)
506        self.assertEqual(
507            content,
508            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
509            'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",'
510            'local_roles: alice does not exist\r\n'
511            'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
512            'local_roles: waeup.local.Boss not allowed\r\n'
513            'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n'
514            'FAC1,DEP1,CRS2,[,local_roles: Error\r\n'
515            'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",'
516            'local_roles: user_name or local_role missing\r\n'
517            'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",'
518            'Not all parents do exist yet. Skipping\r\n'
519            )
520        # Anne got a local role in course CRS2.
521        dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2']
522        role_map = IPrincipalRoleMap(dep)
523        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
524        self.assertEqual(user_name, 'anne')
525        self.assertEqual(local_role, 'waeup.local.Lecturer')
526        shutil.rmtree(os.path.dirname(fin_file))
527        return
528
529    def test_import_update(self):
530        num, num_warns, fin_file, fail_file = self.proc.doImport(
531            self.csv_file_course, COURSE_HEADER_FIELDS)
532        shutil.rmtree(os.path.dirname(fin_file))
533        num, num_warns, fin_file, fail_file = self.proc.doImport(
534            self.csv_file_course_update, COURSE_HEADER_FIELDS_UPDATE, 'update')
535        self.assertEqual(num_warns,0)
536        self.assertEqual(
537            self.app['faculties']['FAC1']['DEP1'].courses['CRS2'].title,
538            'New Title')
539        shutil.rmtree(os.path.dirname(fin_file))
540        return
541
542
543class CertificateProcessorTests(UniversityProcessorSetup):
544
545    def setUp(self):
546        super(CertificateProcessorTests, self).setUp()
547        self.proc = CertificateProcessor()
548        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
549        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
550        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
551        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
552
553        self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv')
554        open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA)
555        return
556
557    def test_ifaces(self):
558        # Make sure we fullfill all interface contracts
559        verifyClass(IBatchProcessor, CourseProcessor)
560        verifyObject(IBatchProcessor, self.proc)
561        return
562
563    def test_parents_exist(self):
564        # make sure we lookup parents correctly
565        result1 = self.proc.parentsExist(self.row1, self.app)
566        result2 = self.proc.parentsExist(self.row2, self.app)
567        result3 = self.proc.parentsExist(self.row3, self.app)
568        self.assertTrue(result1 is False)
569        self.assertTrue(result2 is False)
570        self.assertTrue(result3 is True)
571        return
572
573    def test_entry_exists(self):
574        # make sure we find an entry if it exists
575        result1 = self.proc.entryExists(self.row1, self.app)
576        result2 = self.proc.entryExists(self.row2, self.app)
577        result3 = self.proc.entryExists(self.row3, self.app)
578        result4 = self.proc.entryExists(self.row4, self.app)
579        self.assertTrue(result1 is False)
580        self.assertTrue(result2 is False)
581        self.assertTrue(result3 is False)
582        self.assertTrue(result4 is True)
583        return
584
585    def test_get_entry(self):
586        # make sure we can get an entry if it exists
587        result1 = self.proc.getEntry(self.row1, self.app)
588        result2 = self.proc.getEntry(self.row4, self.app)
589        self.assertTrue(result1 is None)
590        self.assertTrue(result2 is self.certificate)
591        return
592
593    def test_del_entry(self):
594        # make sure we can delete entries
595        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
596        self.proc.delEntry(self.row4, self.app)
597        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
598        return
599
600    def test_import(self):
601        num, num_warns, fin_file, fail_file = self.proc.doImport(
602            self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS)
603        content = open(fail_file).read()
604        self.assertEqual(num_warns,6)
605        self.assertEqual(
606            content,
607            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
608            'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
609            'local_roles: alice does not exist\r\n'
610            'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
611            'local_roles: waeup.local.Boss not allowed\r\n'
612            'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n'
613            'FAC1,DEP1,CRT2,[,local_roles: Error\r\n'
614            'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",'
615            'local_roles: user_name or local_role missing\r\n'
616            'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
617            'Not all parents do exist yet. Skipping\r\n'
618            )
619        # Anne got a local role in certificate CRT2.
620        dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2']
621        role_map = IPrincipalRoleMap(dep)
622        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
623        self.assertEqual(user_name, 'anne')
624        self.assertEqual(local_role, 'waeup.local.CourseAdviser100')
625        shutil.rmtree(os.path.dirname(fin_file))
626       
627        logcontent = open(self.logfile).read()
628        # Logging message from updateEntry
629        self.assertTrue(
630            'INFO - system - Certificate Processor - '
631            'sample_certificate_data - CRT2 - '
632            'updated: local_roles=anne|waeup.local.CourseAdviser100, code=CRT2'
633            in logcontent)       
634       
635        return
636
637
638class CertCourseProcessorTests(FunctionalTestCase):
639
640    layer = FunctionalLayer
641
642    def setUp(self):
643        super(CertCourseProcessorTests, self).setUp()
644        self.proc = CertificateCourseProcessor()
645        self.site0 = dict()
646        self.site1 = dict(faculties=dict())
647        self.site2 = dict(faculties=dict(FAC=dict()))
648        self.department1 = Department(code='DPT')
649        self.department2 = Department(code='DPT')
650        self.certificate = Certificate(code='CRT')
651        self.department2.certificates['CRT'] = self.certificate
652        self.course = Course(code='CRS')
653        self.department2.certificates['CRT'].addCertCourse(self.course)
654        self.cert_course = self.certificate['CRS_100']
655        self.department2.courses['CRS'] = self.course
656        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
657        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
658        self.row = dict(
659            department_code='DPT',
660            faculty_code='FAC',
661            certificate_code='CRT',
662            course=self.course, level='100',
663            code='CRS_100')
664        return
665
666    def test_ifaces(self):
667        # Make sure we fullfill all interface contracts
668        verifyClass(IBatchProcessor, CertificateCourseProcessor)
669        verifyObject(IBatchProcessor, self.proc)
670        return
671
672    def test_parents_exist(self):
673        # make sure we can find all certificate parents
674        result0 = self.proc.parentsExist(self.row, self.site0)
675        result1 = self.proc.parentsExist(self.row, self.site1)
676        result2 = self.proc.parentsExist(self.row, self.site2)
677        result3 = self.proc.parentsExist(self.row, self.site3)
678        result4 = self.proc.parentsExist(self.row, self.site4)
679        self.assertTrue(result0 is False)
680        self.assertTrue(result1 is False)
681        self.assertTrue(result2 is False)
682        self.assertTrue(result3 is False)
683        self.assertTrue(result4 is True)
684        return
685
686    def test_entry_exists(self):
687        # make sure we find an entry if it exists
688        result0 = self.proc.entryExists(self.row, self.site0)
689        result1 = self.proc.entryExists(self.row, self.site1)
690        result2 = self.proc.entryExists(self.row, self.site2)
691        result3 = self.proc.entryExists(self.row, self.site3)
692        result4 = self.proc.entryExists(self.row, self.site4)
693        self.assertTrue(result0 is False)
694        self.assertTrue(result1 is False)
695        self.assertTrue(result2 is False)
696        self.assertTrue(result3 is False)
697        self.assertTrue(result4 is True)
698        return
699
700    def test_get_entry(self):
701        # make sure we can get an entry if it exists
702        result1 = self.proc.getEntry(self.row, self.site3)
703        result2 = self.proc.getEntry(self.row, self.site4)
704        self.assertTrue(result1 is None)
705        self.assertTrue(result2 is self.cert_course)
706        return
707
708    def test_del_entry(self):
709        # make sure we can delete entries
710        self.assertTrue('CRS_100' in self.certificate.keys())
711        self.proc.delEntry(self.row, self.site4)
712        self.assertTrue('CRS_100' not in self.certificate.keys())
713        return
Note: See TracBrowser for help on using the repository browser.