source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 9070

Last change on this file since 9070 was 9038, checked in by Henrik Bettermann, 12 years ago

Do not allow to upload file with filename which already exists in finished or unfinished folder.

  • Property svn:keywords set to Id
File size: 28.7 KB
Line 
1## $Id: test_batching.py 9038 2012-07-21 16:41:08Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import (
27    IPrincipalRoleMap, IPrincipalRoleManager)
28from zope.testbrowser.testing import Browser
29from zope.interface.verify import verifyClass, verifyObject
30from waeup.kofa.interfaces import IBatchProcessor
31from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
32from waeup.kofa.app import University
33from waeup.kofa.university.batching import (
34    FacultyProcessor, DepartmentProcessor, CourseProcessor,
35    CertificateProcessor, CertificateCourseProcessor)
36from waeup.kofa.university.certificate import Certificate, CertificateCourse
37from waeup.kofa.university.course import Course
38from waeup.kofa.university import Faculty, Department
39from waeup.kofa.university.batching import FacultyProcessor
40
41FACULTY_SAMPLE_DATA = open(
42    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
43    'rb').read()
44
45FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
46    '\n')[0].split(',')
47
48FACULTY_SAMPLE_DATA_UPDATE = open(
49    os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'),
50    'rb').read()
51
52FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split(
53    '\n')[0].split(',')
54
55DEPARTMENT_SAMPLE_DATA = open(
56    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
57    'rb').read()
58
59DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
60    '\n')[0].split(',')
61
62CERTIFICATE_SAMPLE_DATA = open(
63    os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'),
64    'rb').read()
65
66CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split(
67    '\n')[0].split(',')
68
69COURSE_SAMPLE_DATA = open(
70    os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'),
71    'rb').read()
72
73COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split(
74    '\n')[0].split(',')
75
76class UniversityProcessorSetup(FunctionalTestCase):
77
78    layer = FunctionalLayer
79
80    def setUp(self):
81        super(UniversityProcessorSetup, self).setUp()
82        self.dc_root = tempfile.mkdtemp()
83        self.workdir = tempfile.mkdtemp()
84        app = University()
85        self.getRootFolder()['app'] = app
86        self.app = self.getRootFolder()['app']
87        setSite(app)
88
89        self.app['users'].addUser('bob', 'bobssecret')
90        self.app['users'].addUser('anne', 'annessecret')
91
92        # Populate university
93        self.certificate = createObject('waeup.Certificate')
94        self.certificate.code = 'CRT1'
95        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
96        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
97        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
98            self.certificate)
99        self.course = createObject('waeup.Course')
100        self.course.code = 'CRS1'
101        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
102            self.course)
103        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
104        #    self.course, level=100)
105        return
106
107    def tearDown(self):
108        super(UniversityProcessorSetup, self).tearDown()
109        shutil.rmtree(self.workdir)
110        shutil.rmtree(self.dc_root)
111        clearSite()
112        return
113
114class TestFacultyProcessor(UniversityProcessorSetup):
115
116    def setUp(self):
117        super(TestFacultyProcessor, self).setUp()
118
119        self.browser = Browser()
120        self.browser.handleErrors = False
121        self.datacenter_path = 'http://localhost/app/datacenter'
122        self.app['datacenter'].setStoragePath(self.dc_root)
123
124        self.proc = FacultyProcessor()
125        self.site1 = dict(faculties=dict())
126        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
127        self.row = dict(code='FAC')
128
129        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
130        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
131        self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')
132        open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)
133        return
134
135    def test_ifaces(self):
136        # Make sure we fullfill all interface contracts
137        verifyClass(IBatchProcessor, FacultyProcessor)
138        verifyObject(IBatchProcessor, self.proc)
139        return
140
141    def test_get_entry(self):
142        # if a faculty exists already, we will get it
143        result1 = self.proc.getEntry(self.row, self.site1)
144        result2 = self.proc.getEntry(self.row, self.site2)
145        self.assertTrue(result1 is None)
146        self.assertEqual(result2, 'pseudo faculty')
147        return
148
149    def test_del_entry(self):
150        # make sure we can del entries.
151        self.proc.delEntry(self.row, self.site2)
152        self.assertTrue('FAC' not in self.site2.keys())
153        return
154
155    def test_checkConversion(self):
156        # Make sure we can check conversions.
157        errs, inv_errs, conv_dict = self.proc.checkConversion(
158            dict(faculty_code='ABC', local_roles='[]'))
159        self.assertEqual(len(errs),0)
160
161        errs, inv_errs, conv_dict = self.proc.checkConversion(
162            dict(faculty_code='ABC',
163            local_roles="['nonsense'"
164            ))
165        self.assertEqual(len(errs),1)
166        self.assertEqual(errs, [('local_roles', 'Error')])
167        errs, inv_errs, conv_dict = self.proc.checkConversion(
168            dict(faculty_code='ABC',
169            local_roles="('abc')"
170            ))
171        self.assertEqual(len(errs),1)
172        self.assertEqual(errs, [('local_roles', 'no list')])
173        errs, inv_errs, conv_dict = self.proc.checkConversion(
174            dict(faculty_code='ABC',
175            local_roles="[('ABC')]"
176            ))
177        self.assertEqual(len(errs),1)
178        self.assertEqual(errs, [('local_roles', 'no dicts')])
179        errs, inv_errs, conv_dict = self.proc.checkConversion(
180            dict(faculty_code='ABC',
181            local_roles="('abc')"
182            ))
183        self.assertEqual(len(errs),1)
184        self.assertEqual(errs, [('local_roles', 'no list')])
185        errs, inv_errs, conv_dict = self.proc.checkConversion(
186            dict(faculty_code='ABC', local_roles=
187            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
188            ))
189        self.assertEqual(len(errs),1)
190        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
191        errs, inv_errs, conv_dict = self.proc.checkConversion(
192            dict(faculty_code='ABC', local_roles=
193            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
194            ))
195        self.assertEqual(len(errs),1)
196        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
197        errs, inv_errs, conv_dict = self.proc.checkConversion(
198            dict(faculty_code='ABC', local_roles=
199            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
200            ))
201        self.assertEqual(len(errs),1)
202        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
203        errs, inv_errs, conv_dict = self.proc.checkConversion(
204            dict(faculty_code='ABC', local_roles=
205            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
206            ))
207        self.assertEqual(len(errs),1)
208        self.assertEqual(errs, [('local_roles', 'john does not exist')])
209        errs, inv_errs, conv_dict = self.proc.checkConversion(
210            dict(faculty_code='ABC', local_roles=
211            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
212            ))
213        self.assertEqual(len(errs),0)
214        return
215
216    def test_import(self):
217        num, num_warns, fin_file, fail_file = self.proc.doImport(
218            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
219        content = open(fail_file).read()
220        self.assertEqual(num_warns,5)
221        self.assertEqual(
222            content,
223            'code,local_roles,--ERRORS--\r\n'
224            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
225            'local_roles: alice does not exist\r\n'
226            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
227            'local_roles: waeup.local.Boss not allowed\r\n'
228            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
229            'FGH,[,local_roles: Error\r\n'
230            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
231            'local_roles: user_name or local_role missing\r\n'
232            )
233        # Bob got a local role in faculty ABC.
234        abc = self.app['faculties']['ABC']
235        role_map = IPrincipalRoleMap(abc)
236        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
237        self.assertEqual(user_name, 'bob')
238        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
239        shutil.rmtree(os.path.dirname(fin_file))
240        return
241
242    def test_import_update(self):
243        self.app['faculties']['FAC2'] = Faculty(code='FAC2')
244        self.app['faculties']['FAC3'] = Faculty(code='FAC3')
245        self.app['faculties']['FAC4'] = Faculty(code='FAC4')
246
247        role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1'])
248        role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons')
249        role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1'])
250        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1)
251
252        role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2'])
253        role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons')
254        role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2'])
255        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
256
257        role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3'])
258        role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons')
259        role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3'])
260        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
261
262        role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4'])
263        role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons')
264        role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4'])
265        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
266
267        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
268        self.assertEqual(user_name, 'alfons')
269        self.assertEqual(local_role, 'alfonsrole')
270
271        num, num_warns, fin_file, fail_file = self.proc.doImport(
272            self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update')
273        self.assertEqual(num_warns,0)
274        # Local roles have been removed in FAC1 due to deletion marker.
275        self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0)
276        # Old local roles have been removed and new roles have been added in FAC2.
277        self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1)
278        local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0]
279        self.assertEqual(user_name, 'bob')
280        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
281        # Local roles are not touched in FAC3 due to ignore marker.
282        self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1)
283        local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0]
284        self.assertEqual(user_name, 'alfons')
285        self.assertEqual(local_role, 'alfonsrole')
286        # Local roles are not touched in FAC4 due to empty cell.
287        self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1)
288        local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0]
289        self.assertEqual(user_name, 'alfons')
290        self.assertEqual(local_role, 'alfonsrole')
291        shutil.rmtree(os.path.dirname(fin_file))
292        return
293
294    def test_upload_import_reupload(self):
295        self.browser.addHeader('Authorization', 'Basic mgr:mgrpw')
296        self.browser.open(self.datacenter_path)
297        self.assertEqual(self.browser.headers['Status'], '200 Ok')
298        self.assertEqual(self.browser.url, self.datacenter_path)
299        self.browser.getLink("Upload data").click()
300        file = open(self.csv_file_faculty)
301        ctrl = self.browser.getControl(name='uploadfile:file')
302        file_ctrl = ctrl.mech_control
303        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
304        self.browser.getControl('Upload').click()
305        self.browser.getLink('Process data').click()
306        self.browser.getControl(name="select").click()
307        importerselect = self.browser.getControl(name='importer')
308        importerselect.getControl('Faculty Processor').selected = True
309        modeselect = self.browser.getControl(name='mode')
310        modeselect.getControl(value='create').selected = True
311        self.browser.getControl('Proceed to step 3').click()
312        self.assertTrue('Header fields OK' in self.browser.contents)
313        self.browser.getControl('Perform import').click()
314        self.assertTrue('Successfully processed 1 rows' in self.browser.contents)
315        # We import the same file a second time.
316        self.browser.open(self.datacenter_path)
317        self.browser.getLink("Upload data").click()
318        file = open(self.csv_file_faculty)
319        ctrl = self.browser.getControl(name='uploadfile:file')
320        file_ctrl = ctrl.mech_control
321        file_ctrl.add_file(file, filename='sample_faculty_data.csv')
322        self.browser.getControl('Upload').click()
323        self.assertTrue(
324            'File with same name was uploaded earlier' in self.browser.contents)
325        return
326
327class TestDepartmentProcessor(UniversityProcessorSetup):
328
329    def setUp(self):
330        super(TestDepartmentProcessor, self).setUp()
331        self.proc = DepartmentProcessor()
332        self.site0 = dict()
333        self.site1 = dict(faculties=dict())
334        self.site2 = dict(faculties=dict(FAC=dict()))
335        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
336        self.row = dict(code='DPT', faculty_code='FAC')
337
338        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
339        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
340        return
341
342    def test_ifaces(self):
343        # Make sure we fullfill all interface contracts
344        verifyClass(IBatchProcessor, DepartmentProcessor)
345        verifyObject(IBatchProcessor, self.proc)
346        return
347
348    def test_parents_exist(self):
349        # make sure we lookup parents correctly.
350        result0 = self.proc.parentsExist(self.row, self.site0)
351        result1 = self.proc.parentsExist(self.row, self.site1)
352        result2 = self.proc.parentsExist(self.row, self.site2)
353        result3 = self.proc.parentsExist(self.row, self.site3)
354        self.assertTrue(result0 is False)
355        self.assertTrue(result1 is False)
356        self.assertTrue(result2 is True)
357        self.assertTrue(result3 is True)
358        return
359
360    def test_entry_exists(self):
361        # make sure we lookup entries correctly.
362        result0 = self.proc.entryExists(self.row, dict())
363        result1 = self.proc.entryExists(self.row, self.site1)
364        result2 = self.proc.entryExists(self.row, self.site2)
365        result3 = self.proc.entryExists(self.row, self.site3)
366        self.assertTrue(result0 is False)
367        self.assertTrue(result1 is False)
368        self.assertTrue(result2 is False)
369        self.assertTrue(result3 is True)
370        return
371
372    def test_get_entry(self):
373        # we can get a dept. if it exists
374        result1 = self.proc.getEntry(self.row, self.site2)
375        result2 = self.proc.getEntry(self.row, self.site3)
376        self.assertTrue(result1 is None)
377        self.assertEqual(result2, 'pseudo department')
378        return
379
380    def test_del_entry(self):
381        # we can delete departments
382        self.proc.delEntry(self.row, self.site3)
383        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
384        return
385
386    def test_import(self):
387        num, num_warns, fin_file, fail_file = self.proc.doImport(
388            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
389        content = open(fail_file).read()
390        self.assertEqual(num_warns,6)
391        self.assertEqual(
392            content,
393            'faculty_code,code,local_roles,--ERRORS--\r\n'
394            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
395            'local_roles: alice does not exist\r\n'
396            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
397            'local_roles: waeup.local.Boss not allowed\r\n'
398            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
399            'FAC1,DEP2,[,local_roles: Error\r\n'
400            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
401            'local_roles: user_name or local_role missing\r\n'
402            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
403            'Not all parents do exist yet. Skipping\r\n'
404            )
405        # Anne got a local role in department ABC.
406        dep = self.app['faculties']['FAC1']['DEP2']
407        role_map = IPrincipalRoleMap(dep)
408        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
409        self.assertEqual(user_name, 'anne')
410        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
411        shutil.rmtree(os.path.dirname(fin_file))
412        return
413
414class CourseProcessorTests(UniversityProcessorSetup):
415
416    def setUp(self):
417        super(CourseProcessorTests, self).setUp()
418        self.proc = CourseProcessor()
419        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
420        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
421        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
422        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
423
424        self.csv_file_course = os.path.join(self.workdir, 'sample_course_data.csv')
425        open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA)
426        return
427
428    def test_ifaces(self):
429        # Make sure we fullfill all interface contracts
430        verifyClass(IBatchProcessor, CourseProcessor)
431        verifyObject(IBatchProcessor, self.proc)
432        return
433
434    def test_parents_exist(self):
435        # make sure we lookup parents correctly
436        result1 = self.proc.parentsExist(self.row1, self.app)
437        result2 = self.proc.parentsExist(self.row2, self.app)
438        result3 = self.proc.parentsExist(self.row3, self.app)
439        self.assertTrue(result1 is False)
440        self.assertTrue(result2 is False)
441        self.assertTrue(result3 is True)
442        return
443
444    def test_entry_exists(self):
445        # make sure we find an entry if it exists
446        result1 = self.proc.entryExists(self.row1, self.app)
447        result2 = self.proc.entryExists(self.row2, self.app)
448        result3 = self.proc.entryExists(self.row3, self.app)
449        result4 = self.proc.entryExists(self.row4, self.app)
450        self.assertTrue(result1 is False)
451        self.assertTrue(result2 is False)
452        self.assertTrue(result3 is False)
453        self.assertTrue(result4 is True)
454        return
455
456    def test_get_entry(self):
457        # make sure we can get an entry if it exists
458        result1 = self.proc.getEntry(self.row1, self.app)
459        result2 = self.proc.getEntry(self.row4, self.app)
460        self.assertTrue(result1 is None)
461        self.assertTrue(result2 is self.course)
462        return
463
464    def test_del_entry(self):
465        # make sure we can delete entries
466        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
467        self.proc.delEntry(self.row4, self.app)
468        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
469        return
470
471    def test_import(self):
472        num, num_warns, fin_file, fail_file = self.proc.doImport(
473            self.csv_file_course, COURSE_HEADER_FIELDS)
474        content = open(fail_file).read()
475        self.assertEqual(num_warns,6)
476        self.assertEqual(
477            content,
478            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
479            'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",'
480            'local_roles: alice does not exist\r\n'
481            'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
482            'local_roles: waeup.local.Boss not allowed\r\n'
483            'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n'
484            'FAC1,DEP1,CRS2,[,local_roles: Error\r\n'
485            'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",'
486            'local_roles: user_name or local_role missing\r\n'
487            'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",'
488            'Not all parents do exist yet. Skipping\r\n'
489            )
490        # Anne got a local role in course ABC.
491        dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2']
492        role_map = IPrincipalRoleMap(dep)
493        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
494        self.assertEqual(user_name, 'anne')
495        self.assertEqual(local_role, 'waeup.local.Lecturer')
496        shutil.rmtree(os.path.dirname(fin_file))
497        return
498
499
500class CertificateProcessorTests(UniversityProcessorSetup):
501
502    def setUp(self):
503        super(CertificateProcessorTests, self).setUp()
504        self.proc = CertificateProcessor()
505        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
506        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
507        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
508        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
509
510        self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv')
511        open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA)
512        return
513
514    def test_ifaces(self):
515        # Make sure we fullfill all interface contracts
516        verifyClass(IBatchProcessor, CourseProcessor)
517        verifyObject(IBatchProcessor, self.proc)
518        return
519
520    def test_parents_exist(self):
521        # make sure we lookup parents correctly
522        result1 = self.proc.parentsExist(self.row1, self.app)
523        result2 = self.proc.parentsExist(self.row2, self.app)
524        result3 = self.proc.parentsExist(self.row3, self.app)
525        self.assertTrue(result1 is False)
526        self.assertTrue(result2 is False)
527        self.assertTrue(result3 is True)
528        return
529
530    def test_entry_exists(self):
531        # make sure we find an entry if it exists
532        result1 = self.proc.entryExists(self.row1, self.app)
533        result2 = self.proc.entryExists(self.row2, self.app)
534        result3 = self.proc.entryExists(self.row3, self.app)
535        result4 = self.proc.entryExists(self.row4, self.app)
536        self.assertTrue(result1 is False)
537        self.assertTrue(result2 is False)
538        self.assertTrue(result3 is False)
539        self.assertTrue(result4 is True)
540        return
541
542    def test_get_entry(self):
543        # make sure we can get an entry if it exists
544        result1 = self.proc.getEntry(self.row1, self.app)
545        result2 = self.proc.getEntry(self.row4, self.app)
546        self.assertTrue(result1 is None)
547        self.assertTrue(result2 is self.certificate)
548        return
549
550    def test_del_entry(self):
551        # make sure we can delete entries
552        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
553        self.proc.delEntry(self.row4, self.app)
554        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
555        return
556
557    def test_import(self):
558        num, num_warns, fin_file, fail_file = self.proc.doImport(
559            self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS)
560        content = open(fail_file).read()
561        self.assertEqual(num_warns,6)
562        self.assertEqual(
563            content,
564            'faculty_code,department_code,code,local_roles,--ERRORS--\r\n'
565            'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
566            'local_roles: alice does not exist\r\n'
567            'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
568            'local_roles: waeup.local.Boss not allowed\r\n'
569            'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n'
570            'FAC1,DEP1,CRT2,[,local_roles: Error\r\n'
571            'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",'
572            'local_roles: user_name or local_role missing\r\n'
573            'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",'
574            'Not all parents do exist yet. Skipping\r\n'
575            )
576        # Anne got a local role in certificate ABC.
577        dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2']
578        role_map = IPrincipalRoleMap(dep)
579        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
580        self.assertEqual(user_name, 'anne')
581        self.assertEqual(local_role, 'waeup.local.CourseAdviser100')
582        shutil.rmtree(os.path.dirname(fin_file))
583        return
584
585
586class CertCourseProcessorTests(FunctionalTestCase):
587
588    layer = FunctionalLayer
589
590    def setUp(self):
591        super(CertCourseProcessorTests, self).setUp()
592        self.proc = CertificateCourseProcessor()
593        self.site0 = dict()
594        self.site1 = dict(faculties=dict())
595        self.site2 = dict(faculties=dict(FAC=dict()))
596        self.department1 = Department(code='DPT')
597        self.department2 = Department(code='DPT')
598        self.certificate = Certificate(code='CRT')
599        self.department2.certificates['CRT'] = self.certificate
600        self.course = Course(code='CRS')
601        self.department2.certificates['CRT'].addCertCourse(self.course)
602        self.cert_course = self.certificate['CRS_100']
603        self.department2.courses['CRS'] = self.course
604        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
605        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
606        self.row = dict(
607            department_code='DPT',
608            faculty_code='FAC',
609            certificate_code='CRT',
610            course=self.course, level='100',
611            code='CRS_100')
612        return
613
614    def test_ifaces(self):
615        # Make sure we fullfill all interface contracts
616        verifyClass(IBatchProcessor, CertificateCourseProcessor)
617        verifyObject(IBatchProcessor, self.proc)
618        return
619
620    def test_parents_exist(self):
621        # make sure we can find all certificate parents
622        result0 = self.proc.parentsExist(self.row, self.site0)
623        result1 = self.proc.parentsExist(self.row, self.site1)
624        result2 = self.proc.parentsExist(self.row, self.site2)
625        result3 = self.proc.parentsExist(self.row, self.site3)
626        result4 = self.proc.parentsExist(self.row, self.site4)
627        self.assertTrue(result0 is False)
628        self.assertTrue(result1 is False)
629        self.assertTrue(result2 is False)
630        self.assertTrue(result3 is False)
631        self.assertTrue(result4 is True)
632        return
633
634    def test_entry_exists(self):
635        # make sure we find an entry if it exists
636        result0 = self.proc.entryExists(self.row, self.site0)
637        result1 = self.proc.entryExists(self.row, self.site1)
638        result2 = self.proc.entryExists(self.row, self.site2)
639        result3 = self.proc.entryExists(self.row, self.site3)
640        result4 = self.proc.entryExists(self.row, self.site4)
641        self.assertTrue(result0 is False)
642        self.assertTrue(result1 is False)
643        self.assertTrue(result2 is False)
644        self.assertTrue(result3 is False)
645        self.assertTrue(result4 is True)
646        return
647
648    def test_get_entry(self):
649        # make sure we can get an entry if it exists
650        result1 = self.proc.getEntry(self.row, self.site3)
651        result2 = self.proc.getEntry(self.row, self.site4)
652        self.assertTrue(result1 is None)
653        self.assertTrue(result2 is self.cert_course)
654        return
655
656    def test_del_entry(self):
657        # make sure we can delete entries
658        self.assertTrue('CRS_100' in self.certificate.keys())
659        self.proc.delEntry(self.row, self.site4)
660        self.assertTrue('CRS_100' not in self.certificate.keys())
661        return
Note: See TracBrowser for help on using the repository browser.