source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 8999

Last change on this file since 8999 was 8999, checked in by Henrik Bettermann, 12 years ago

Add test_import to TestDepartmentProcessor?.

  • Property svn:keywords set to Id
File size: 19.7 KB
Line 
1## $Id: test_batching.py 8999 2012-07-13 14:42:44Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import IPrincipalRoleMap
27from zope.interface.verify import verifyClass, verifyObject
28from waeup.kofa.interfaces import IBatchProcessor
29from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
30from waeup.kofa.app import University
31from waeup.kofa.university.batching import (
32    FacultyProcessor, DepartmentProcessor, CourseProcessor,
33    CertificateProcessor, CertificateCourseProcessor)
34from waeup.kofa.university.certificate import Certificate, CertificateCourse
35from waeup.kofa.university.course import Course
36from waeup.kofa.university import Faculty, Department
37from waeup.kofa.university.batching import FacultyProcessor
38
39FACULTY_SAMPLE_DATA = open(
40    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
41    'rb').read()
42
43FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
44    '\n')[0].split(',')
45
46DEPARTMENT_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'),
48    'rb').read()
49
50DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53class UniversityProcessorSetup(FunctionalTestCase):
54
55    layer = FunctionalLayer
56
57    def setUp(self):
58        super(UniversityProcessorSetup, self).setUp()
59        self.dc_root = tempfile.mkdtemp()
60        self.workdir = tempfile.mkdtemp()
61        app = University()
62        self.getRootFolder()['app'] = app
63        self.app = self.getRootFolder()['app']
64        setSite(app)
65
66        self.app['users'].addUser('bob', 'bobssecret')
67        self.app['users'].addUser('anne', 'annessecret')
68
69        # Populate university
70        self.certificate = createObject('waeup.Certificate')
71        self.certificate.code = 'CRT1'
72        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
73        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
74        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
75            self.certificate)
76        self.course = createObject('waeup.Course')
77        self.course.code = 'CRS1'
78        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
79            self.course)
80        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
81        #    self.course, level=100)
82        return
83
84    def tearDown(self):
85        super(UniversityProcessorSetup, self).tearDown()
86        shutil.rmtree(self.workdir)
87        shutil.rmtree(self.dc_root)
88        clearSite()
89        return
90
91class TestFacultyProcessor(UniversityProcessorSetup):
92
93    def setUp(self):
94        super(TestFacultyProcessor, self).setUp()
95
96        self.proc = FacultyProcessor()
97        self.site1 = dict(faculties=dict())
98        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
99        self.row = dict(code='FAC')
100
101        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
102        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
103        return
104
105    def test_ifaces(self):
106        # Make sure we fullfill all interface contracts
107        verifyClass(IBatchProcessor, FacultyProcessor)
108        verifyObject(IBatchProcessor, self.proc)
109        return
110
111    def test_get_entry(self):
112        # if a faculty exists already, we will get it
113        result1 = self.proc.getEntry(self.row, self.site1)
114        result2 = self.proc.getEntry(self.row, self.site2)
115        self.assertTrue(result1 is None)
116        self.assertEqual(result2, 'pseudo faculty')
117        return
118
119    def test_del_entry(self):
120        # make sure we can del entries.
121        self.proc.delEntry(self.row, self.site2)
122        self.assertTrue('FAC' not in self.site2.keys())
123        return
124
125    def test_checkConversion(self):
126        # Make sure we can check conversions.
127        errs, inv_errs, conv_dict = self.proc.checkConversion(
128            dict(faculty_code='ABC', local_roles='[]'))
129        self.assertEqual(len(errs),0)
130
131        errs, inv_errs, conv_dict = self.proc.checkConversion(
132            dict(faculty_code='ABC',
133            local_roles="['nonsense'"
134            ))
135        self.assertEqual(len(errs),1)
136        self.assertEqual(errs, [('local_roles', 'Error')])
137        errs, inv_errs, conv_dict = self.proc.checkConversion(
138            dict(faculty_code='ABC',
139            local_roles="('abc')"
140            ))
141        self.assertEqual(len(errs),1)
142        self.assertEqual(errs, [('local_roles', 'no list')])
143        errs, inv_errs, conv_dict = self.proc.checkConversion(
144            dict(faculty_code='ABC',
145            local_roles="[('ABC')]"
146            ))
147        self.assertEqual(len(errs),1)
148        self.assertEqual(errs, [('local_roles', 'no dicts')])
149        errs, inv_errs, conv_dict = self.proc.checkConversion(
150            dict(faculty_code='ABC',
151            local_roles="('abc')"
152            ))
153        self.assertEqual(len(errs),1)
154        self.assertEqual(errs, [('local_roles', 'no list')])
155        errs, inv_errs, conv_dict = self.proc.checkConversion(
156            dict(faculty_code='ABC', local_roles=
157            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
158            ))
159        self.assertEqual(len(errs),1)
160        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
161        errs, inv_errs, conv_dict = self.proc.checkConversion(
162            dict(faculty_code='ABC', local_roles=
163            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
164            ))
165        self.assertEqual(len(errs),1)
166        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
167        errs, inv_errs, conv_dict = self.proc.checkConversion(
168            dict(faculty_code='ABC', local_roles=
169            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
170            ))
171        self.assertEqual(len(errs),1)
172        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
173        errs, inv_errs, conv_dict = self.proc.checkConversion(
174            dict(faculty_code='ABC', local_roles=
175            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
176            ))
177        self.assertEqual(len(errs),1)
178        self.assertEqual(errs, [('local_roles', 'john does not exist')])
179        errs, inv_errs, conv_dict = self.proc.checkConversion(
180            dict(faculty_code='ABC', local_roles=
181            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
182            ))
183        self.assertEqual(len(errs),0)
184        return
185
186    def test_import(self):
187        num, num_warns, fin_file, fail_file = self.proc.doImport(
188            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
189        content = open(fail_file).read()
190        self.assertEqual(num_warns,5)
191        self.assertEqual(
192            content,
193            'code,local_roles,--ERRORS--\r\n'
194            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
195            'local_roles: alice does not exist\r\n'
196            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
197            'local_roles: waeup.local.Boss not allowed\r\n'
198            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
199            'FGH,[,local_roles: Error\r\n'
200            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
201            'local_roles: user_name or local_role missing\r\n'
202            )
203        # Bob got a local role in faculty ABC.
204        abc = self.app['faculties']['ABC']
205        role_map = IPrincipalRoleMap(abc)
206        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
207        self.assertEqual(user_name, 'bob')
208        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
209        shutil.rmtree(os.path.dirname(fin_file))
210
211class TestDepartmentProcessor(UniversityProcessorSetup):
212
213    def setUp(self):
214        super(TestDepartmentProcessor, self).setUp()
215        self.proc = DepartmentProcessor()
216        self.site0 = dict()
217        self.site1 = dict(faculties=dict())
218        self.site2 = dict(faculties=dict(FAC=dict()))
219        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
220        self.row = dict(code='DPT', faculty_code='FAC')
221
222        self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')
223        open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)
224        return
225
226    def test_ifaces(self):
227        # Make sure we fullfill all interface contracts
228        verifyClass(IBatchProcessor, DepartmentProcessor)
229        verifyObject(IBatchProcessor, self.proc)
230        return
231
232    def test_parents_exist(self):
233        # make sure we lookup parents correctly.
234        result0 = self.proc.parentsExist(self.row, self.site0)
235        result1 = self.proc.parentsExist(self.row, self.site1)
236        result2 = self.proc.parentsExist(self.row, self.site2)
237        result3 = self.proc.parentsExist(self.row, self.site3)
238        self.assertTrue(result0 is False)
239        self.assertTrue(result1 is False)
240        self.assertTrue(result2 is True)
241        self.assertTrue(result3 is True)
242        return
243
244    def test_entry_exists(self):
245        # make sure we lookup entries correctly.
246        result0 = self.proc.entryExists(self.row, dict())
247        result1 = self.proc.entryExists(self.row, self.site1)
248        result2 = self.proc.entryExists(self.row, self.site2)
249        result3 = self.proc.entryExists(self.row, self.site3)
250        self.assertTrue(result0 is False)
251        self.assertTrue(result1 is False)
252        self.assertTrue(result2 is False)
253        self.assertTrue(result3 is True)
254        return
255
256    def test_get_entry(self):
257        # we can get a dept. if it exists
258        result1 = self.proc.getEntry(self.row, self.site2)
259        result2 = self.proc.getEntry(self.row, self.site3)
260        self.assertTrue(result1 is None)
261        self.assertEqual(result2, 'pseudo department')
262        return
263
264    def test_del_entry(self):
265        # we can delete departments
266        self.proc.delEntry(self.row, self.site3)
267        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
268        return
269
270    def test_import(self):
271        num, num_warns, fin_file, fail_file = self.proc.doImport(
272            self.csv_file_department, DEPARTMENT_HEADER_FIELDS)
273        content = open(fail_file).read()
274        self.assertEqual(num_warns,6)
275        self.assertEqual(
276            content,
277            'faculty_code,code,local_roles,--ERRORS--\r\n'
278            'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
279            'local_roles: alice does not exist\r\n'
280            'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",'
281            'local_roles: waeup.local.Boss not allowed\r\n'
282            'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n'
283            'FAC1,DEP2,[,local_roles: Error\r\n'
284            'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",'
285            'local_roles: user_name or local_role missing\r\n'
286            'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
287            'Not all parents do exist yet. Skipping\r\n'
288            )
289        # Bob got a local role in department ABC.
290        dep = self.app['faculties']['FAC1']['DEP2']
291        role_map = IPrincipalRoleMap(dep)
292        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
293        self.assertEqual(user_name, 'anne')
294        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
295        shutil.rmtree(os.path.dirname(fin_file))
296
297class CourseProcessorTests(UniversityProcessorSetup):
298
299    def setUp(self):
300        super(CourseProcessorTests, self).setUp()
301        self.proc = CourseProcessor()
302        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
303        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
304        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
305        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
306        return
307
308    def test_ifaces(self):
309        # Make sure we fullfill all interface contracts
310        verifyClass(IBatchProcessor, CourseProcessor)
311        verifyObject(IBatchProcessor, self.proc)
312        return
313
314    def test_parents_exist(self):
315        # make sure we lookup parents correctly
316        result1 = self.proc.parentsExist(self.row1, self.app)
317        result2 = self.proc.parentsExist(self.row2, self.app)
318        result3 = self.proc.parentsExist(self.row3, self.app)
319        self.assertTrue(result1 is False)
320        self.assertTrue(result2 is False)
321        self.assertTrue(result3 is True)
322        return
323
324    def test_entry_exists(self):
325        # make sure we find an entry if it exists
326        result1 = self.proc.entryExists(self.row1, self.app)
327        result2 = self.proc.entryExists(self.row2, self.app)
328        result3 = self.proc.entryExists(self.row3, self.app)
329        result4 = self.proc.entryExists(self.row4, self.app)
330        self.assertTrue(result1 is False)
331        self.assertTrue(result2 is False)
332        self.assertTrue(result3 is False)
333        self.assertTrue(result4 is True)
334        return
335
336    def test_get_entry(self):
337        # make sure we can get an entry if it exists
338        result1 = self.proc.getEntry(self.row1, self.app)
339        result2 = self.proc.getEntry(self.row4, self.app)
340        self.assertTrue(result1 is None)
341        self.assertTrue(result2 is self.course)
342        return
343
344    def test_del_entry(self):
345        # make sure we can delete entries
346        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
347        self.proc.delEntry(self.row4, self.app)
348        self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
349        return
350
351class CertificateProcessorTests(UniversityProcessorSetup):
352
353    def setUp(self):
354        super(CertificateProcessorTests, self).setUp()
355        self.proc = CertificateProcessor()
356        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1")
357        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1")
358        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2")
359        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1")
360        return
361
362    def test_ifaces(self):
363        # Make sure we fullfill all interface contracts
364        verifyClass(IBatchProcessor, CourseProcessor)
365        verifyObject(IBatchProcessor, self.proc)
366        return
367
368    def test_parents_exist(self):
369        # make sure we lookup parents correctly
370        result1 = self.proc.parentsExist(self.row1, self.app)
371        result2 = self.proc.parentsExist(self.row2, self.app)
372        result3 = self.proc.parentsExist(self.row3, self.app)
373        self.assertTrue(result1 is False)
374        self.assertTrue(result2 is False)
375        self.assertTrue(result3 is True)
376        return
377
378    def test_entry_exists(self):
379        # make sure we find an entry if it exists
380        result1 = self.proc.entryExists(self.row1, self.app)
381        result2 = self.proc.entryExists(self.row2, self.app)
382        result3 = self.proc.entryExists(self.row3, self.app)
383        result4 = self.proc.entryExists(self.row4, self.app)
384        self.assertTrue(result1 is False)
385        self.assertTrue(result2 is False)
386        self.assertTrue(result3 is False)
387        self.assertTrue(result4 is True)
388        return
389
390    def test_get_entry(self):
391        # make sure we can get an entry if it exists
392        result1 = self.proc.getEntry(self.row1, self.app)
393        result2 = self.proc.getEntry(self.row4, self.app)
394        self.assertTrue(result1 is None)
395        self.assertTrue(result2 is self.certificate)
396        return
397
398    def test_del_entry(self):
399        # make sure we can delete entries
400        self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
401        self.proc.delEntry(self.row4, self.app)
402        self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys())
403        return
404
405
406class CertCourseProcessorTests(FunctionalTestCase):
407
408    layer = FunctionalLayer
409
410    def setUp(self):
411        super(CertCourseProcessorTests, self).setUp()
412        self.proc = CertificateCourseProcessor()
413        self.site0 = dict()
414        self.site1 = dict(faculties=dict())
415        self.site2 = dict(faculties=dict(FAC=dict()))
416        self.department1 = Department(code='DPT')
417        self.department2 = Department(code='DPT')
418        self.certificate = Certificate(code='CRT')
419        self.department2.certificates['CRT'] = self.certificate
420        self.course = Course(code='CRS')
421        self.department2.certificates['CRT'].addCertCourse(self.course)
422        self.cert_course = self.certificate['CRS_100']
423        self.department2.courses['CRS'] = self.course
424        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
425        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
426        self.row = dict(
427            department_code='DPT',
428            faculty_code='FAC',
429            certificate_code='CRT',
430            course=self.course, level='100',
431            code='CRS_100')
432        return
433
434    def test_ifaces(self):
435        # Make sure we fullfill all interface contracts
436        verifyClass(IBatchProcessor, CertificateCourseProcessor)
437        verifyObject(IBatchProcessor, self.proc)
438        return
439
440    def test_parents_exist(self):
441        # make sure we can find all certificate parents
442        result0 = self.proc.parentsExist(self.row, self.site0)
443        result1 = self.proc.parentsExist(self.row, self.site1)
444        result2 = self.proc.parentsExist(self.row, self.site2)
445        result3 = self.proc.parentsExist(self.row, self.site3)
446        result4 = self.proc.parentsExist(self.row, self.site4)
447        self.assertTrue(result0 is False)
448        self.assertTrue(result1 is False)
449        self.assertTrue(result2 is False)
450        self.assertTrue(result3 is False)
451        self.assertTrue(result4 is True)
452        return
453
454    def test_entry_exists(self):
455        # make sure we find an entry if it exists
456        result0 = self.proc.entryExists(self.row, self.site0)
457        result1 = self.proc.entryExists(self.row, self.site1)
458        result2 = self.proc.entryExists(self.row, self.site2)
459        result3 = self.proc.entryExists(self.row, self.site3)
460        result4 = self.proc.entryExists(self.row, self.site4)
461        self.assertTrue(result0 is False)
462        self.assertTrue(result1 is False)
463        self.assertTrue(result2 is False)
464        self.assertTrue(result3 is False)
465        self.assertTrue(result4 is True)
466        return
467
468    def test_get_entry(self):
469        # make sure we can get an entry if it exists
470        result1 = self.proc.getEntry(self.row, self.site3)
471        result2 = self.proc.getEntry(self.row, self.site4)
472        self.assertTrue(result1 is None)
473        self.assertTrue(result2 is self.cert_course)
474        return
475
476    def test_del_entry(self):
477        # make sure we can delete entries
478        self.assertTrue('CRS_100' in self.certificate.keys())
479        self.proc.delEntry(self.row, self.site4)
480        self.assertTrue('CRS_100' not in self.certificate.keys())
481        return
Note: See TracBrowser for help on using the repository browser.