source: main/waeup.kofa/trunk/src/waeup/kofa/university/tests/test_batching.py @ 8997

Last change on this file since 8997 was 8997, checked in by Henrik Bettermann, 12 years ago

Use UniversityProcessorSetup? base class for setting up a site. Use this site in CourseProcessorTests?. (work in progress)

  • Property svn:keywords set to Id
File size: 18.4 KB
Line 
1## $Id: test_batching.py 8997 2012-07-13 10:54:40Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for university related batching
20import unittest
21import tempfile
22import shutil
23import os
24from zope.component.hooks import setSite, clearSite
25from zope.component import createObject
26from zope.securitypolicy.interfaces import IPrincipalRoleMap
27from zope.interface.verify import verifyClass, verifyObject
28from waeup.kofa.interfaces import IBatchProcessor
29from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
30from waeup.kofa.app import University
31from waeup.kofa.university.batching import (
32    FacultyProcessor, DepartmentProcessor, CourseProcessor,
33    CertificateProcessor, CertificateCourseProcessor)
34from waeup.kofa.university.certificate import Certificate, CertificateCourse
35from waeup.kofa.university.course import Course
36from waeup.kofa.university import Faculty, Department
37from waeup.kofa.university.batching import FacultyProcessor
38
39FACULTY_SAMPLE_DATA = open(
40    os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'),
41    'rb').read()
42
43FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split(
44    '\n')[0].split(',')
45
46class UniversityProcessorSetup(FunctionalTestCase):
47
48    layer = FunctionalLayer
49
50    def setUp(self):
51        super(UniversityProcessorSetup, self).setUp()
52        self.dc_root = tempfile.mkdtemp()
53        self.workdir = tempfile.mkdtemp()
54        app = University()
55        self.getRootFolder()['app'] = app
56        self.app = self.getRootFolder()['app']
57        setSite(app)
58
59        self.app['users'].addUser('bob', 'bobssecret')
60
61        # Populate university
62        self.certificate = createObject('waeup.Certificate')
63        self.certificate.code = 'CERT1'
64        self.app['faculties']['FAC1'] = Faculty(code='FAC1')
65        self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1')
66        self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate(
67            self.certificate)
68        self.course = createObject('waeup.Course')
69        self.course.code = 'CRS1'
70        self.app['faculties']['FAC1']['DEP1'].courses.addCourse(
71            self.course)
72        #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse(
73        #    self.course, level=100)
74        return
75
76    def tearDown(self):
77        super(UniversityProcessorSetup, self).tearDown()
78        shutil.rmtree(self.workdir)
79        shutil.rmtree(self.dc_root)
80        clearSite()
81        return
82
83class TestFacultyProcessor(UniversityProcessorSetup):
84
85    def setUp(self):
86        super(TestFacultyProcessor, self).setUp()
87
88        self.proc = FacultyProcessor()
89        self.site1 = dict(faculties=dict())
90        self.site2 = dict(faculties=dict(FAC='pseudo faculty'))
91        self.row = dict(code='FAC')
92
93        self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')
94        open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)
95        return
96
97    def test_ifaces(self):
98        # Make sure we fullfill all interface contracts
99        verifyClass(IBatchProcessor, FacultyProcessor)
100        verifyObject(IBatchProcessor, self.proc)
101        return
102
103    def test_get_entry(self):
104        # if a faculty exists already, we will get it
105        result1 = self.proc.getEntry(self.row, self.site1)
106        result2 = self.proc.getEntry(self.row, self.site2)
107        self.assertTrue(result1 is None)
108        self.assertEqual(result2, 'pseudo faculty')
109        return
110
111    def test_del_entry(self):
112        # make sure we can del entries.
113        self.proc.delEntry(self.row, self.site2)
114        self.assertTrue('FAC' not in self.site2.keys())
115        return
116
117    def test_checkConversion(self):
118        # Make sure we can check conversions.
119        errs, inv_errs, conv_dict = self.proc.checkConversion(
120            dict(faculty_code='ABC', local_roles='[]'))
121        self.assertEqual(len(errs),0)
122
123        errs, inv_errs, conv_dict = self.proc.checkConversion(
124            dict(faculty_code='ABC',
125            local_roles="['nonsense'"
126            ))
127        self.assertEqual(len(errs),1)
128        self.assertEqual(errs, [('local_roles', 'Error')])
129        errs, inv_errs, conv_dict = self.proc.checkConversion(
130            dict(faculty_code='ABC',
131            local_roles="('abc')"
132            ))
133        self.assertEqual(len(errs),1)
134        self.assertEqual(errs, [('local_roles', 'no list')])
135        errs, inv_errs, conv_dict = self.proc.checkConversion(
136            dict(faculty_code='ABC',
137            local_roles="[('ABC')]"
138            ))
139        self.assertEqual(len(errs),1)
140        self.assertEqual(errs, [('local_roles', 'no dicts')])
141        errs, inv_errs, conv_dict = self.proc.checkConversion(
142            dict(faculty_code='ABC',
143            local_roles="('abc')"
144            ))
145        self.assertEqual(len(errs),1)
146        self.assertEqual(errs, [('local_roles', 'no list')])
147        errs, inv_errs, conv_dict = self.proc.checkConversion(
148            dict(faculty_code='ABC', local_roles=
149            "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]"
150            ))
151        self.assertEqual(len(errs),1)
152        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
153        errs, inv_errs, conv_dict = self.proc.checkConversion(
154            dict(faculty_code='ABC', local_roles=
155            "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]"
156            ))
157        self.assertEqual(len(errs),1)
158        self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')])
159        errs, inv_errs, conv_dict = self.proc.checkConversion(
160            dict(faculty_code='ABC', local_roles=
161            "[{'user_name':'bob','local_role':'waeup.local.Boss'},]"
162            ))
163        self.assertEqual(len(errs),1)
164        self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')])
165        errs, inv_errs, conv_dict = self.proc.checkConversion(
166            dict(faculty_code='ABC', local_roles=
167            "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]"
168            ))
169        self.assertEqual(len(errs),1)
170        self.assertEqual(errs, [('local_roles', 'john does not exist')])
171        errs, inv_errs, conv_dict = self.proc.checkConversion(
172            dict(faculty_code='ABC', local_roles=
173            "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]"
174            ))
175        self.assertEqual(len(errs),0)
176        return
177
178    def test_import(self):
179        num, num_warns, fin_file, fail_file = self.proc.doImport(
180            self.csv_file_faculty, FACULTY_HEADER_FIELDS)
181        content = open(fail_file).read()
182        self.assertEqual(num_warns,5)
183        self.assertEqual(
184            content,
185            'code,local_roles,--ERRORS--\r\n'
186            'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",'
187            'local_roles: alice does not exist\r\n'
188            'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",'
189            'local_roles: waeup.local.Boss not allowed\r\n'
190            'EFG,[(\'anything\')],local_roles: no dicts\r\n'
191            'FGH,[,local_roles: Error\r\n'
192            'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",'
193            'local_roles: user_name or local_role missing\r\n'
194            )
195        # Bob got a local role in faculty ABC.
196        abc = self.app['faculties']['ABC']
197        role_map = IPrincipalRoleMap(abc)
198        local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0]
199        self.assertEqual(user_name, 'bob')
200        self.assertEqual(local_role, 'waeup.local.DepartmentManager')
201        shutil.rmtree(os.path.dirname(fin_file))
202
203class TestDepartmentProcessor(UniversityProcessorSetup):
204
205    def setUp(self):
206        super(TestDepartmentProcessor, self).setUp()
207        self.proc = DepartmentProcessor()
208        self.site0 = dict()
209        self.site1 = dict(faculties=dict())
210        self.site2 = dict(faculties=dict(FAC=dict()))
211        self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department')))
212        self.row = dict(code='DPT', faculty_code='FAC')
213        return
214
215    def test_ifaces(self):
216        # Make sure we fullfill all interface contracts
217        verifyClass(IBatchProcessor, DepartmentProcessor)
218        verifyObject(IBatchProcessor, self.proc)
219        return
220
221    def test_parents_exist(self):
222        # make sure we lookup parents correctly.
223        result0 = self.proc.parentsExist(self.row, self.site0)
224        result1 = self.proc.parentsExist(self.row, self.site1)
225        result2 = self.proc.parentsExist(self.row, self.site2)
226        result3 = self.proc.parentsExist(self.row, self.site3)
227        self.assertTrue(result0 is False)
228        self.assertTrue(result1 is False)
229        self.assertTrue(result2 is True)
230        self.assertTrue(result3 is True)
231        return
232
233    def test_entry_exists(self):
234        # make sure we lookup entries correctly.
235        result0 = self.proc.entryExists(self.row, dict())
236        result1 = self.proc.entryExists(self.row, self.site1)
237        result2 = self.proc.entryExists(self.row, self.site2)
238        result3 = self.proc.entryExists(self.row, self.site3)
239        self.assertTrue(result0 is False)
240        self.assertTrue(result1 is False)
241        self.assertTrue(result2 is False)
242        self.assertTrue(result3 is True)
243        return
244
245    def test_get_entry(self):
246        # we can get a dept. if it exists
247        result1 = self.proc.getEntry(self.row, self.site2)
248        result2 = self.proc.getEntry(self.row, self.site3)
249        self.assertTrue(result1 is None)
250        self.assertEqual(result2, 'pseudo department')
251        return
252
253    def test_del_entry(self):
254        # we can delete departments
255        self.proc.delEntry(self.row, self.site3)
256        self.assertTrue('DPT' not in self.site3['faculties']['FAC'].keys())
257        return
258
259class CourseProcessorTests(UniversityProcessorSetup):
260
261    #layer = FunctionalLayer
262
263    def setUp(self):
264        super(CourseProcessorTests, self).setUp()
265        self.proc = CourseProcessor()
266        self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1")
267        self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1")
268        self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2")
269        self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1")
270        return
271
272    def test_ifaces(self):
273        # Make sure we fullfill all interface contracts
274        verifyClass(IBatchProcessor, CourseProcessor)
275        verifyObject(IBatchProcessor, self.proc)
276        return
277
278    def test_parents_exist(self):
279        # make sure we lookup parents correctly
280        result1 = self.proc.parentsExist(self.row1, self.app)
281        result2 = self.proc.parentsExist(self.row2, self.app)
282        result3 = self.proc.parentsExist(self.row3, self.app)
283        self.assertTrue(result1 is False)
284        self.assertTrue(result2 is False)
285        self.assertTrue(result3 is True)
286        return
287
288    def test_entry_exists(self):
289        # make sure we find an entry if it exists
290        result1 = self.proc.entryExists(self.row1, self.app)
291        result2 = self.proc.entryExists(self.row2, self.app)
292        result3 = self.proc.entryExists(self.row3, self.app)
293        result4 = self.proc.entryExists(self.row4, self.app)
294        self.assertTrue(result1 is False)
295        self.assertTrue(result2 is False)
296        self.assertTrue(result3 is False)
297        self.assertTrue(result4 is True)
298        return
299
300    def test_get_entry(self):
301        # make sure we can get an entry if it exists
302        result1 = self.proc.getEntry(self.row1, self.app)
303        result2 = self.proc.getEntry(self.row4, self.app)
304        self.assertTrue(result1 is None)
305        self.assertTrue(result2 is self.course)
306        return
307
308    def test_del_entry(self):
309        # make sure we can delete entries
310        self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys())
311        self.proc.delEntry(self.row4, self.app)
312        self.assertTrue('CRS' not in self.app['faculties']['FAC1']['DEP1'].courses.keys())
313        return
314
315class CertificateProcessorTests(FunctionalTestCase):
316
317    layer = FunctionalLayer
318
319    def setUp(self):
320        super(CertificateProcessorTests, self).setUp()
321        self.proc = CertificateProcessor()
322        self.site0 = dict()
323        self.site1 = dict(faculties=dict())
324        self.site2 = dict(faculties=dict(FAC=dict()))
325        self.department1 = Department(code='DPT')
326        self.department2 = Department(code='DPT')
327        self.certificate = Certificate(code='CRT')
328        self.department2.certificates['CRT'] = self.certificate
329        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
330        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
331        self.row = dict(department_code='DPT', faculty_code='FAC', code="CRT")
332        return
333
334    def test_ifaces(self):
335        # Make sure we fullfill all interface contracts
336        verifyClass(IBatchProcessor, CertificateProcessor)
337        verifyObject(IBatchProcessor, self.proc)
338        return
339
340    def test_parents_exist(self):
341        # make sure we can find all certificate parents
342        result0 = self.proc.parentsExist(self.row, self.site0)
343        result1 = self.proc.parentsExist(self.row, self.site1)
344        result2 = self.proc.parentsExist(self.row, self.site2)
345        result3 = self.proc.parentsExist(self.row, self.site3)
346        result4 = self.proc.parentsExist(self.row, self.site4)
347        self.assertTrue(result0 is False)
348        self.assertTrue(result1 is False)
349        self.assertTrue(result2 is False)
350        self.assertTrue(result3 is True)
351        self.assertTrue(result4 is True)
352        return
353
354    def test_entry_exists(self):
355        # make sure we find an entry if it exists
356        result0 = self.proc.entryExists(self.row, self.site0)
357        result1 = self.proc.entryExists(self.row, self.site1)
358        result2 = self.proc.entryExists(self.row, self.site2)
359        result3 = self.proc.entryExists(self.row, self.site3)
360        result4 = self.proc.entryExists(self.row, self.site4)
361        self.assertTrue(result0 is False)
362        self.assertTrue(result1 is False)
363        self.assertTrue(result2 is False)
364        self.assertTrue(result3 is False)
365        self.assertTrue(result4 is True)
366        return
367
368    def test_get_entry(self):
369        # make sure we can get an entry if it exists
370        result1 = self.proc.getEntry(self.row, self.site3)
371        result2 = self.proc.getEntry(self.row, self.site4)
372        self.assertTrue(result1 is None)
373        self.assertTrue(result2 is self.certificate)
374        return
375
376    def test_del_entry(self):
377        # make sure we can delete entries
378        self.assertTrue('CRT' in self.department2.certificates.keys())
379        self.proc.delEntry(self.row, self.site4)
380        self.assertTrue('CRT' not in self.department2.certificates.keys())
381        return
382
383class CertCourseProcessorTests(FunctionalTestCase):
384
385    layer = FunctionalLayer
386
387    def setUp(self):
388        super(CertCourseProcessorTests, self).setUp()
389        self.proc = CertificateCourseProcessor()
390        self.site0 = dict()
391        self.site1 = dict(faculties=dict())
392        self.site2 = dict(faculties=dict(FAC=dict()))
393        self.department1 = Department(code='DPT')
394        self.department2 = Department(code='DPT')
395        self.certificate = Certificate(code='CRT')
396        self.department2.certificates['CRT'] = self.certificate
397        self.course = Course(code='CRS')
398        self.department2.certificates['CRT'].addCertCourse(self.course)
399        self.cert_course = self.certificate['CRS_100']
400        self.department2.courses['CRS'] = self.course
401        self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1)))
402        self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2)))
403        self.row = dict(
404            department_code='DPT',
405            faculty_code='FAC',
406            certificate_code='CRT',
407            course=self.course, level='100',
408            code='CRS_100')
409        return
410
411    def test_ifaces(self):
412        # Make sure we fullfill all interface contracts
413        verifyClass(IBatchProcessor, CertificateCourseProcessor)
414        verifyObject(IBatchProcessor, self.proc)
415        return
416
417    def test_parents_exist(self):
418        # make sure we can find all certificate parents
419        result0 = self.proc.parentsExist(self.row, self.site0)
420        result1 = self.proc.parentsExist(self.row, self.site1)
421        result2 = self.proc.parentsExist(self.row, self.site2)
422        result3 = self.proc.parentsExist(self.row, self.site3)
423        result4 = self.proc.parentsExist(self.row, self.site4)
424        self.assertTrue(result0 is False)
425        self.assertTrue(result1 is False)
426        self.assertTrue(result2 is False)
427        self.assertTrue(result3 is False)
428        self.assertTrue(result4 is True)
429        return
430
431    def test_entry_exists(self):
432        # make sure we find an entry if it exists
433        result0 = self.proc.entryExists(self.row, self.site0)
434        result1 = self.proc.entryExists(self.row, self.site1)
435        result2 = self.proc.entryExists(self.row, self.site2)
436        result3 = self.proc.entryExists(self.row, self.site3)
437        result4 = self.proc.entryExists(self.row, self.site4)
438        self.assertTrue(result0 is False)
439        self.assertTrue(result1 is False)
440        self.assertTrue(result2 is False)
441        self.assertTrue(result3 is False)
442        self.assertTrue(result4 is True)
443        return
444
445    def test_get_entry(self):
446        # make sure we can get an entry if it exists
447        result1 = self.proc.getEntry(self.row, self.site3)
448        result2 = self.proc.getEntry(self.row, self.site4)
449        self.assertTrue(result1 is None)
450        self.assertTrue(result2 is self.cert_course)
451        return
452
453    def test_del_entry(self):
454        # make sure we can delete entries
455        self.assertTrue('CRS_100' in self.certificate.keys())
456        self.proc.delEntry(self.row, self.site4)
457        self.assertTrue('CRS_100' not in self.certificate.keys())
458        return
Note: See TracBrowser for help on using the repository browser.