- Timestamp:
- 21 Sep 2012, 08:19:35 (12 years ago)
- Location:
- main/waeup.kofa/branches/uli-zc-async
- Files:
-
- 6 deleted
- 14 edited
- 1 copied
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/branches/uli-zc-async
- Property svn:mergeinfo changed
/main/waeup.kofa/branches/uli-async-update removed /main/waeup.kofa/branches/uli-autoinclude-less removed /main/waeup.kofa/trunk removed
- Property svn:mergeinfo changed
-
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/api.txt
r9209 r9211 28 28 :maxdepth: 1 29 29 30 c ertcourses.txt30 courserefs.txt -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/batching.py
r9209 r9211 29 29 from zope.schema import getFields 30 30 from zope.catalog.interfaces import ICatalog 31 from zope.event import notify 32 from zope.securitypolicy.interfaces import ( 33 IPrincipalRoleManager, IPrincipalRoleMap) 34 from waeup.kofa.authentication import LocalRoleSetEvent 35 from waeup.kofa.interfaces import ( 36 IBatchProcessor, IGNORE_MARKER, DELETION_MARKER, FatalCSVError) 31 from waeup.kofa.interfaces import IBatchProcessor, IGNORE_MARKER, FatalCSVError 37 32 from waeup.kofa.university.interfaces import ( 38 33 IFacultiesContainer, IFaculty, ICourse, IDepartment, ICertificate, 39 34 ICertificateCourse) 40 from waeup.kofa.university import (41 Faculty, Department, Course, Certificate)42 35 from waeup.kofa.utils.batching import BatchProcessor 43 36 … … 53 46 name = u'Faculty Processor' 54 47 iface = IFaculty 55 allowed_roles = Faculty.local_roles56 48 57 49 location_fields = ['code',] … … 62 54 def parentsExist(self, row, site): 63 55 return 'faculties' in site.keys() 56 57 def entryExists(self, row, site): 58 return row['code'] in site['faculties'].keys() 59 60 def getParent(self, row, site): 61 return site['faculties'] 62 63 def getEntry(self, row, site): 64 if not self.entryExists(row, site): 65 return None 66 parent = self.getParent(row, site) 67 return parent.get(row['code']) 68 69 def addEntry(self, obj, row, site): 70 parent = self.getParent(row, site) 71 parent.addFaculty(obj) 72 return 73 74 def delEntry(self, row, site): 75 parent = self.getParent(row, site) 76 del parent[row['code']] 77 pass 78 79 class DepartmentProcessor(BatchProcessor): 80 """A batch processor for IDepartment objects. 81 """ 82 grok.implements(IBatchProcessor) 83 grok.provides(IBatchProcessor) 84 grok.context(Interface) 85 util_name = 'departmentprocessor' 86 grok.name(util_name) 87 88 name = u'Department Processor' 89 iface = IDepartment 90 91 location_fields = ['code', 'faculty_code'] 92 factory_name = 'waeup.Department' 93 94 mode = None 95 96 def parentsExist(self, row, site): 97 if not 'faculties' in site.keys(): 98 return False 99 return row['faculty_code'] in site['faculties'] 100 101 def entryExists(self, row, site): 102 if not self.parentsExist(row, site): 103 return False 104 parent = self.getParent(row, site) 105 return row['code'] in parent.keys() 106 107 def getParent(self, row, site): 108 return site['faculties'][row['faculty_code']] 109 110 def getEntry(self, row, site): 111 if not self.entryExists(row, site): 112 return None 113 parent = self.getParent(row, site) 114 return parent.get(row['code']) 115 116 def addEntry(self, obj, row, site): 117 parent = self.getParent(row, site) 118 parent.addDepartment(obj) 119 return 120 121 def delEntry(self, row, site): 122 parent = self.getParent(row, site) 123 del parent[row['code']] 124 return 125 126 class CourseProcessor(BatchProcessor): 127 """A batch processor for ICourse objects. 128 """ 129 grok.implements(IBatchProcessor) 130 grok.provides(IBatchProcessor) 131 grok.context(Interface) 132 util_name = 'courseprocessor' 133 grok.name(util_name) 134 135 name = u'Course Processor' 136 iface = ICourse 137 138 location_fields = ['code', 'faculty_code', 'department_code'] 139 factory_name = 'waeup.Course' 140 141 mode = None 142 143 def parentsExist(self, row, site): 144 if not 'faculties' in site.keys(): 145 return False 146 if not row['faculty_code'] in site['faculties'].keys(): 147 return False 148 faculty = site['faculties'][row['faculty_code']] 149 return row['department_code'] in faculty.keys() 150 151 def entryExists(self, row, site): 152 if not self.parentsExist(row, site): 153 return False 154 parent = self.getParent(row, site) 155 return row['code'] in parent.keys() 156 157 def getParent(self, row, site): 158 dept = site['faculties'][row['faculty_code']][row['department_code']] 159 return dept.courses 160 161 def getEntry(self, row, site): 162 if not self.entryExists(row, site): 163 return None 164 parent = self.getParent(row, site) 165 return parent.get(row['code']) 166 167 def addEntry(self, obj, row, site): 168 parent = self.getParent(row, site) 169 parent.addCourse(obj) 170 return 171 172 def delEntry(self, row, site): 173 parent = self.getParent(row, site) 174 del parent[row['code']] 175 return 176 177 class CertificateProcessor(BatchProcessor): 178 """A batch processor for ICertificate objects. 179 """ 180 grok.implements(IBatchProcessor) 181 grok.provides(IBatchProcessor) 182 grok.context(Interface) 183 util_name = 'certificateprocessor' 184 grok.name(util_name) 185 186 name = u'Certificate Processor' 187 iface = ICertificate 188 189 location_fields = ['code'] 190 factory_name = 'waeup.Certificate' 191 192 mode = None 64 193 65 194 @property … … 67 196 fields = getFields(self.iface) 68 197 return sorted(list(set( 69 self.location_fields + fields.keys() + ['local_roles'] 70 ))) 71 72 def entryExists(self, row, site): 73 return row['code'] in site['faculties'].keys() 74 75 def getParent(self, row, site): 76 return site['faculties'] 77 78 def getEntry(self, row, site): 79 if not self.entryExists(row, site): 80 return None 81 parent = self.getParent(row, site) 82 return parent.get(row['code']) 83 84 def addEntry(self, obj, row, site): 85 parent = self.getParent(row, site) 86 parent.addFaculty(obj) 87 return 88 89 def delEntry(self, row, site): 90 parent = self.getParent(row, site) 91 del parent[row['code']] 92 pass 93 94 def updateEntry(self, obj, row, site): 95 """Update obj to the values given in row. 96 """ 97 items_changed = '' 98 99 if row.has_key('local_roles') and row['local_roles'] not in ( 100 None, IGNORE_MARKER): 101 role_manager = IPrincipalRoleManager(obj) 102 role_map = IPrincipalRoleMap(obj) 103 # Remove all existing local roles. 104 for local_role, user_name, setting in role_map.getPrincipalsAndRoles(): 105 role_manager.unsetRoleForPrincipal(local_role, user_name) 106 notify(LocalRoleSetEvent( 107 obj, local_role, user_name, granted=False)) 108 # Add new local roles. 109 if row['local_roles'] != DELETION_MARKER: 110 local_roles = eval(row['local_roles']) 111 for rolemap in local_roles: 112 user = rolemap['user_name'] 113 local_role = rolemap['local_role'] 114 role_manager.assignRoleToPrincipal(local_role, user) 115 notify(LocalRoleSetEvent(obj, local_role, user, granted=True)) 116 items_changed += ( 117 '%s=%s, ' % ('local_roles', '%s|%s' % (user,local_role))) 118 row.pop('local_roles') 119 120 # apply other values... 121 items_changed += super(FacultyProcessor, self).updateEntry( 122 obj, row, site) 123 124 # Log actions... 125 location_field = self.location_fields[0] 126 grok.getSite().logger.info('%s - %s - Record updated: %s' 127 % (self.name, row[location_field], items_changed)) 128 return items_changed 129 130 def checkConversion(self, row, mode='create'): 131 """Validates all values in row. 132 """ 133 errs, inv_errs, conv_dict = super( 134 FacultyProcessor, self).checkConversion(row, mode=mode) 135 if row.has_key('local_roles'): 136 if row['local_roles'] in (None, DELETION_MARKER, IGNORE_MARKER): 137 return errs, inv_errs, conv_dict 138 try: 139 local_roles = eval(row['local_roles']) 140 except: 141 errs.append(('local_roles','Error')) 142 return errs, inv_errs, conv_dict 143 if not isinstance(local_roles, list): 144 errs.append(('local_roles','no list')) 145 return errs, inv_errs, conv_dict 146 for rolemap in local_roles: 147 if not isinstance(rolemap, dict): 148 errs.append(('local_roles','no dicts')) 149 return errs, inv_errs, conv_dict 150 if not 'user_name' in rolemap.keys() or not \ 151 'local_role' in rolemap.keys(): 152 errs.append(('local_roles','user_name or local_role missing')) 153 return errs, inv_errs, conv_dict 154 local_role = rolemap['local_role'] 155 if not local_role in self.allowed_roles: 156 errs.append(('local_roles','%s not allowed' % local_role)) 157 return errs, inv_errs, conv_dict 158 user = rolemap['user_name'] 159 users = grok.getSite()['users'] 160 if not user in users.keys(): 161 errs.append(('local_roles','%s does not exist' % user)) 162 return errs, inv_errs, conv_dict 163 return errs, inv_errs, conv_dict 164 165 class DepartmentProcessor(FacultyProcessor): 166 """A batch processor for IDepartment objects. 167 """ 168 grok.implements(IBatchProcessor) 169 grok.provides(IBatchProcessor) 170 grok.context(Interface) 171 util_name = 'departmentprocessor' 172 grok.name(util_name) 173 174 name = u'Department Processor' 175 iface = IDepartment 176 allowed_roles = Department.local_roles 177 178 location_fields = ['code', 'faculty_code'] 179 factory_name = 'waeup.Department' 180 181 mode = None 182 183 def parentsExist(self, row, site): 184 if not 'faculties' in site.keys(): 185 return False 186 return row['faculty_code'] in site['faculties'] 187 188 def entryExists(self, row, site): 189 if not self.parentsExist(row, site): 190 return False 191 parent = self.getParent(row, site) 192 return row['code'] in parent.keys() 193 194 def getParent(self, row, site): 195 return site['faculties'][row['faculty_code']] 196 197 def getEntry(self, row, site): 198 if not self.entryExists(row, site): 199 return None 200 parent = self.getParent(row, site) 201 return parent.get(row['code']) 202 203 def addEntry(self, obj, row, site): 204 parent = self.getParent(row, site) 205 parent.addDepartment(obj) 206 return 207 208 def delEntry(self, row, site): 209 parent = self.getParent(row, site) 210 del parent[row['code']] 211 return 212 213 class CourseProcessor(FacultyProcessor): 214 """A batch processor for ICourse objects. 215 """ 216 grok.implements(IBatchProcessor) 217 grok.provides(IBatchProcessor) 218 grok.context(Interface) 219 util_name = 'courseprocessor' 220 grok.name(util_name) 221 222 name = u'Course Processor' 223 iface = ICourse 224 allowed_roles = Course.local_roles 225 226 location_fields = ['code', 'faculty_code', 'department_code'] 227 factory_name = 'waeup.Course' 228 229 mode = None 230 231 def parentsExist(self, row, site): 232 if not 'faculties' in site.keys(): 233 return False 234 if not row['faculty_code'] in site['faculties'].keys(): 235 return False 236 faculty = site['faculties'][row['faculty_code']] 237 return row['department_code'] in faculty.keys() 238 239 def entryExists(self, row, site): 240 if not self.parentsExist(row, site): 241 return False 242 parent = self.getParent(row, site) 243 return row['code'] in parent.keys() 244 245 def getParent(self, row, site): 246 dept = site['faculties'][row['faculty_code']][row['department_code']] 247 return dept.courses 248 249 def getEntry(self, row, site): 250 if not self.entryExists(row, site): 251 return None 252 parent = self.getParent(row, site) 253 return parent.get(row['code']) 254 255 def addEntry(self, obj, row, site): 256 parent = self.getParent(row, site) 257 parent.addCourse(obj) 258 return 259 260 def delEntry(self, row, site): 261 parent = self.getParent(row, site) 262 del parent[row['code']] 263 return 264 265 class CertificateProcessor(FacultyProcessor): 266 """A batch processor for ICertificate objects. 267 """ 268 grok.implements(IBatchProcessor) 269 grok.provides(IBatchProcessor) 270 grok.context(Interface) 271 util_name = 'certificateprocessor' 272 grok.name(util_name) 273 274 name = u'Certificate Processor' 275 iface = ICertificate 276 allowed_roles = Certificate.local_roles 277 278 location_fields = ['code'] 279 factory_name = 'waeup.Certificate' 280 281 mode = None 282 283 @property 284 def available_fields(self): 285 fields = getFields(self.iface) 286 return sorted(list(set( 287 ['faculty_code','department_code'] + fields.keys() 288 + ['local_roles']))) 198 ['faculty_code','department_code'] + fields.keys()))) 289 199 290 200 def checkHeaders(self, headerfields, mode='create'): … … 359 269 return 360 270 361 class CertificateCourseProcessor( FacultyProcessor):271 class CertificateCourseProcessor(BatchProcessor): 362 272 """A batch processor for ICertificateCourse objects. 363 273 """ … … 407 317 def addEntry(self, obj, row, site): 408 318 parent = self.getParent(row, site) 409 parent.addC ertCourse(row['course'],319 parent.addCourseRef(row['course'], 410 320 row['level'], row['mandatory']) 411 321 return … … 413 323 def delEntry(self, row, site): 414 324 parent = self.getParent(row, site) 415 parent.delC ertCourse(row['course'].code, row['level'])416 return 325 parent.delCourseRef(row['course'].code, row['level']) 326 return -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/catalog.py
r9209 r9211 159 159 self.title = context.course.title 160 160 self.code = context.getCourseCode 161 self.type = 'C ertificate Course'161 self.type = 'Course Referrer' 162 162 163 163 def search(query=None, view=None): -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/certificate.py
r9209 r9211 36 36 grok.implements(ICertificate, ICertificateAdd) 37 37 38 local_roles = [ 39 'waeup.local.CourseAdviser100', 40 'waeup.local.CourseAdviser200', 41 'waeup.local.CourseAdviser300', 42 'waeup.local.CourseAdviser400', 43 'waeup.local.CourseAdviser500', 44 'waeup.local.CourseAdviser600', 45 ] 38 @property # Make this method read_only and looking like an attr. 39 def local_roles(self): 40 return ['waeup.local.CourseAdviser100', 41 'waeup.local.CourseAdviser200', 42 'waeup.local.CourseAdviser300', 43 'waeup.local.CourseAdviser400', 44 'waeup.local.CourseAdviser500', 45 'waeup.local.CourseAdviser600', 46 ] 46 47 47 48 def __init__(self, code=u'NA', title=u'Unnamed Certificate', 48 49 study_mode=None, start_level=None, 49 50 end_level=None, application_category=None, 50 school_fee_1=None, school_fee_2=None, 51 school_fee_3=None, school_fee_4=None): 51 school_fee_1=None, school_fee_2=None): 52 52 super(Certificate, self).__init__() 53 53 self.code = code … … 59 59 self.school_fee_1 = school_fee_1 60 60 self.school_fee_2 = school_fee_2 61 self.school_fee_3 = school_fee_362 self.school_fee_4 = school_fee_463 61 64 62 def longtitle(self): 65 63 return "%s (%s)" % (self.title,self.code) 66 64 67 def addC ertCourse(self, course, level=100, mandatory=True):68 """Add a c ertificate course.65 def addCourseRef(self, course, level=100, mandatory=True): 66 """Add a course referrer. 69 67 """ 70 68 code = "%s_%s" % (course.code, level) … … 74 72 self._p_changed = True 75 73 76 def delC ertCourse(self, code, level=None):77 """Delete a c ertificate coursedenoted by its code.74 def delCourseRef(self, code, level=None): 75 """Delete a course referrer denoted by its code. 78 76 """ 79 77 keys = list(self.keys()) # create list copy -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/certificate.txt
r9209 r9211 96 96 .. XXX: This is not a proper description 97 97 98 .. method:: addC ertCourse(course[, level=100,[ mandatory=True]])99 100 Add a certificatecourse. A course is an object implementing98 .. method:: addCourseRef(course[, level=100,[ mandatory=True]]) 99 100 Add a referrer of a course. A course is an object implementing 101 101 :class:`waeup.kofa.interfaces.ICourse`. 102 102 103 Please don't be confused by the term ' certificate course'.104 As course objectswill normaly be stored105 in a department, the c ertificate course refers to103 Please don't be confused by the term 'referrer'. As course objects 104 will normaly be stored 105 in a department, the course referrer refers to 106 106 the 'real' course in the department container. 107 107 108 .. method:: delC ertCourse(code)109 110 Remove a c ertificate coursefrom a certificate.111 112 The certificate coursemust be given by its code number.108 .. method:: delCourseRef(code) 109 110 Remove a course referrer from a certificate. 111 112 The referrer must be given by its code number. 113 113 114 114 :class:`CertificateCourse` … … 117 117 .. class:: CertificateCourse(course[, level=100[, mandatory=True]]) 118 118 119 Create a c ertificate course.120 121 A certificate course are held in119 Create a course referrer. 120 121 A certificate course (old term) or course referrer are held in 122 122 certificates and refer to an existing :class:`Course` instance held 123 123 elsewhere. 124 124 125 A certificate can require several c ertificate courses and one126 c ertificate coursecan be required by several certificates.125 A certificate can require several course referrers and one 126 course referrer can be required by several certificates. 127 127 128 128 .. attribute:: course … … 225 225 226 226 To accomplish that, the parents of the removed course are looked up 227 for a certifcate container which contains a c ertificate coursethat227 for a certifcate container which contains a course referrer that 228 228 refers to the deleted course. 229 229 … … 341 341 >>> mycert.code = 'MYCERT' 342 342 >>> mydept.certificates.addCertificate(mycert) 343 >>> mycert.addC ertCourse(mycourse)343 >>> mycert.addCourseRef(mycourse) 344 344 345 345 >>> transaction.commit() … … 354 354 ['mydept'] 355 355 356 The c ertificate coursewe stored in the certificate is indeed a356 The course referrer we stored in the certificate is indeed a 357 357 referrer of the course, not a copy of it: 358 358 -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/course.py
r9209 r9211 29 29 """ 30 30 grok.implements(ICourse, ICourseAdd) 31 32 local_roles = ['waeup.local.Lecturer']33 31 34 32 def __init__(self, … … 80 78 # Remove that referrer... 81 79 cert = certcourse.__parent__ 82 cert.delC ertCourse(code)80 cert.delCourseRef(code) 83 81 cert._p_changed = True 84 82 return -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/department.py
r9209 r9211 32 32 grok.implements(IDepartment, IDepartmentAdd) 33 33 34 local_roles = [ 35 'waeup.local.DepartmentManager', 36 'waeup.local.ClearanceOfficer', 37 'waeup.local.UGClearanceOfficer', 38 'waeup.local.PGClearanceOfficer', 39 'waeup.local.CourseAdviser100', 40 'waeup.local.CourseAdviser200', 41 'waeup.local.CourseAdviser300', 42 'waeup.local.CourseAdviser400', 43 'waeup.local.CourseAdviser500', 44 'waeup.local.CourseAdviser600', 45 ] 34 @property # Make this method read_only and looking like an attr. 35 def local_roles(self): 36 return ['waeup.local.DepartmentManager', 37 'waeup.local.ClearanceOfficer', 38 'waeup.local.CourseAdviser100', 39 'waeup.local.CourseAdviser200', 40 'waeup.local.CourseAdviser300', 41 'waeup.local.CourseAdviser400', 42 'waeup.local.CourseAdviser500', 43 'waeup.local.CourseAdviser600', 44 ] 46 45 47 46 # A simple counter for ids. -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/export.py
r9209 r9211 19 19 """ 20 20 import grok 21 from zope.securitypolicy.interfaces import IPrincipalRoleMap22 21 from waeup.kofa.interfaces import ICSVExporter 23 22 from waeup.kofa.interfaces import MessageFactory as _ … … 31 30 32 31 #: Fieldnames considered by this exporter 33 fields = ('code', 'title', 'title_prefix' , 'users_with_local_roles')32 fields = ('code', 'title', 'title_prefix') 34 33 35 34 #: The title under which this exporter will be displayed 36 35 title = _(u'Faculties') 37 38 def mangle_value(self, value, name, context=None):39 """Hook for mangling values in derived classes40 """41 if name == 'users_with_local_roles':42 value = []43 role_map = IPrincipalRoleMap(context)44 for local_role, user_name, setting in role_map.getPrincipalsAndRoles():45 value.append({'user_name':user_name,'local_role':local_role})46 return super(FacultyExporter, self).mangle_value(47 value, name, context)48 36 49 37 def export(self, faculties, filepath=None): … … 62 50 If `filepath` is ``None``, a raw string with CSV data is returned. 63 51 """ 52 writer, outfile = self.get_csv_writer(filepath) 64 53 faculties = site.get('faculties', {}) 65 54 return self.export(faculties.values(), filepath) … … 72 61 73 62 #: Fieldnames considered by this exporter 74 fields = ('code', 'faculty_code', 'title', 'title_prefix', 75 'users_with_local_roles') 63 fields = ('code', 'faculty_code', 'title', 'title_prefix') 76 64 77 65 #: The title under which this exporter will be displayed … … 89 77 90 78 def export_all(self, site, filepath=None): 91 """Export departments in facultyinto filepath as CSV data.79 """Export faculties in facultycontainer into filepath as CSV data. 92 80 93 81 If `filepath` is ``None``, a raw string with CSV data is returned. … … 109 97 #: Fieldnames considered by this exporter 110 98 fields = ('code', 'faculty_code', 'department_code', 'title', 'credits', 111 'passmark', 'semester' , 'users_with_local_roles')99 'passmark', 'semester') 112 100 113 101 #: The title under which this exporter will be displayed … … 117 105 """Hook for mangling values in derived classes 118 106 """ 119 if name == 'users_with_local_roles': 120 value = [] 121 role_map = IPrincipalRoleMap(context) 122 for local_role, user_name, setting in role_map.getPrincipalsAndRoles(): 123 value.append({'user_name':user_name,'local_role':local_role}) 124 elif name == 'faculty_code': 107 if name == 'faculty_code': 125 108 try: 126 109 value = context.__parent__.__parent__.__parent__.code … … 136 119 137 120 def export_all(self, site, filepath=None): 138 """Export coursesinto filepath as CSV data.121 """Export faculties in facultycontainer into filepath as CSV data. 139 122 140 123 If `filepath` is ``None``, a raw string with CSV data is returned. … … 157 140 fields = ('code', 'faculty_code', 'department_code', 'title', 'study_mode', 158 141 'start_level', 'end_level', 'application_category', 159 'school_fee_1', 'school_fee_2', 'school_fee_3', 'school_fee_4', 160 'users_with_local_roles') 142 'school_fee_1', 'school_fee_2') 161 143 162 144 #: The title under which this exporter will be displayed … … 164 146 165 147 def export_all(self, site, filepath=None): 166 """Export certificatesinto filepath as CSV data.148 """Export faculties in facultycontainer into filepath as CSV data. 167 149 168 150 If `filepath` is ``None``, a raw string with CSV data is returned. … … 211 193 212 194 def export_all(self, site, filepath=None): 213 """Export certificate coursesinto filepath as CSV data.195 """Export faculties in facultycontainer into filepath as CSV data. 214 196 215 197 If `filepath` is ``None``, a raw string with CSV data is returned. -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/faculty.py
r9209 r9211 32 32 grok.implements(IFaculty, IFacultyAdd) 33 33 34 local_roles = [ 35 'waeup.local.DepartmentManager', 36 'waeup.local.ClearanceOfficer', 37 'waeup.local.UGClearanceOfficer', 38 'waeup.local.PGClearanceOfficer', 39 'waeup.local.CourseAdviser100', 40 'waeup.local.CourseAdviser200', 41 'waeup.local.CourseAdviser300', 42 'waeup.local.CourseAdviser400', 43 'waeup.local.CourseAdviser500', 44 'waeup.local.CourseAdviser600', 45 ] 34 @property # Make this method read_only and looking like an attr. 35 def local_roles(self): 36 return ['waeup.local.DepartmentManager', 37 'waeup.local.ClearanceOfficer', 38 'waeup.local.CourseAdviser100', 39 'waeup.local.CourseAdviser200', 40 'waeup.local.CourseAdviser300', 41 'waeup.local.CourseAdviser400', 42 'waeup.local.CourseAdviser500', 43 'waeup.local.CourseAdviser600', 44 ] 46 45 47 46 def __init__(self, -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/interfaces.py
r9209 r9211 240 240 ) 241 241 242 school_fee_3 = schema.Float(243 title = _(u'Foreigner Initial School Fee'),244 required = False,245 )246 247 school_fee_4 = schema.Float(248 title = _(u'Foreigner Returning School Fee'),249 required = False,250 )251 252 242 def longtitle(): 253 243 """ … … 296 286 """ 297 287 course = schema.Choice( 298 title = _(u'Course '),288 title = _(u'Course Referrer'), 299 289 source = CourseSource(), 300 290 readonly = True, … … 310 300 mandatory = schema.Bool( 311 301 title = _(u'Is mandatory course (not elective)'), 312 required = False,302 required = True, 313 303 default = True, 314 304 ) -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/tests/test_batching.py
r9209 r9211 19 19 # Tests for university related batching 20 20 import unittest 21 import tempfile 22 import shutil 23 import os 24 from zope.component.hooks import setSite, clearSite 25 from zope.component import createObject 26 from zope.securitypolicy.interfaces import ( 27 IPrincipalRoleMap, IPrincipalRoleManager) 28 from zope.testbrowser.testing import Browser 21 29 22 from zope.interface.verify import verifyClass, verifyObject 30 23 from waeup.kofa.interfaces import IBatchProcessor 31 24 from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer 32 from waeup.kofa.app import University33 25 from waeup.kofa.university.batching import ( 34 26 FacultyProcessor, DepartmentProcessor, CourseProcessor, … … 36 28 from waeup.kofa.university.certificate import Certificate, CertificateCourse 37 29 from waeup.kofa.university.course import Course 38 from waeup.kofa.university import Faculty, Department 39 from waeup.kofa.university.batching import FacultyProcessor 40 41 FACULTY_SAMPLE_DATA = open( 42 os.path.join(os.path.dirname(__file__), 'sample_faculty_data.csv'), 43 'rb').read() 44 45 FACULTY_HEADER_FIELDS = FACULTY_SAMPLE_DATA.split( 46 '\n')[0].split(',') 47 48 FACULTY_SAMPLE_DATA_UPDATE = open( 49 os.path.join(os.path.dirname(__file__), 'sample_faculty_data_update.csv'), 50 'rb').read() 51 52 FACULTY_HEADER_FIELDS_UPDATE = FACULTY_SAMPLE_DATA_UPDATE.split( 53 '\n')[0].split(',') 54 55 DEPARTMENT_SAMPLE_DATA = open( 56 os.path.join(os.path.dirname(__file__), 'sample_department_data.csv'), 57 'rb').read() 58 59 DEPARTMENT_HEADER_FIELDS = DEPARTMENT_SAMPLE_DATA.split( 60 '\n')[0].split(',') 61 62 CERTIFICATE_SAMPLE_DATA = open( 63 os.path.join(os.path.dirname(__file__), 'sample_certificate_data.csv'), 64 'rb').read() 65 66 CERTIFICATE_HEADER_FIELDS = CERTIFICATE_SAMPLE_DATA.split( 67 '\n')[0].split(',') 68 69 COURSE_SAMPLE_DATA = open( 70 os.path.join(os.path.dirname(__file__), 'sample_course_data.csv'), 71 'rb').read() 72 73 COURSE_HEADER_FIELDS = COURSE_SAMPLE_DATA.split( 74 '\n')[0].split(',') 75 76 class UniversityProcessorSetup(FunctionalTestCase): 77 78 layer = FunctionalLayer 79 80 def setUp(self): 81 super(UniversityProcessorSetup, self).setUp() 82 self.dc_root = tempfile.mkdtemp() 83 self.workdir = tempfile.mkdtemp() 84 app = University() 85 self.getRootFolder()['app'] = app 86 self.app = self.getRootFolder()['app'] 87 setSite(app) 88 89 self.app['users'].addUser('bob', 'bobssecret') 90 self.app['users'].addUser('anne', 'annessecret') 91 92 # Populate university 93 self.certificate = createObject('waeup.Certificate') 94 self.certificate.code = 'CRT1' 95 self.app['faculties']['FAC1'] = Faculty(code='FAC1') 96 self.app['faculties']['FAC1']['DEP1'] = Department(code='DEP1') 97 self.app['faculties']['FAC1']['DEP1'].certificates.addCertificate( 98 self.certificate) 99 self.course = createObject('waeup.Course') 100 self.course.code = 'CRS1' 101 self.app['faculties']['FAC1']['DEP1'].courses.addCourse( 102 self.course) 103 #self.app['faculties']['fac1']['dep1'].certificates['CERT1'].addCertCourse( 104 # self.course, level=100) 105 106 self.logfile = os.path.join( 107 self.app['datacenter'].storage, 'logs', 'main.log') 108 return 109 return 110 111 def tearDown(self): 112 super(UniversityProcessorSetup, self).tearDown() 113 shutil.rmtree(self.workdir) 114 shutil.rmtree(self.dc_root) 115 clearSite() 116 return 117 118 class TestFacultyProcessor(UniversityProcessorSetup): 119 120 def setUp(self): 121 super(TestFacultyProcessor, self).setUp() 122 123 self.browser = Browser() 124 self.browser.handleErrors = False 125 self.datacenter_path = 'http://localhost/app/datacenter' 126 self.app['datacenter'].setStoragePath(self.dc_root) 127 30 from waeup.kofa.university.department import Department 31 32 33 class TestFacultyProcessor(unittest.TestCase): 34 35 def setUp(self): 128 36 self.proc = FacultyProcessor() 129 37 self.site1 = dict(faculties=dict()) 130 38 self.site2 = dict(faculties=dict(FAC='pseudo faculty')) 131 39 self.row = dict(code='FAC') 132 133 self.csv_file_faculty = os.path.join(self.workdir, 'sample_faculty_data.csv')134 open(self.csv_file_faculty, 'wb').write(FACULTY_SAMPLE_DATA)135 self.csv_file_faculty_update = os.path.join(self.workdir, 'sample_faculty_data_update.csv')136 open(self.csv_file_faculty_update, 'wb').write(FACULTY_SAMPLE_DATA_UPDATE)137 40 return 138 41 … … 157 60 return 158 61 159 def test_checkConversion(self): 160 # Make sure we can check conversions. 161 errs, inv_errs, conv_dict = self.proc.checkConversion( 162 dict(faculty_code='ABC', local_roles='[]')) 163 self.assertEqual(len(errs),0) 164 165 errs, inv_errs, conv_dict = self.proc.checkConversion( 166 dict(faculty_code='ABC', 167 local_roles="['nonsense'" 168 )) 169 self.assertEqual(len(errs),1) 170 self.assertEqual(errs, [('local_roles', 'Error')]) 171 errs, inv_errs, conv_dict = self.proc.checkConversion( 172 dict(faculty_code='ABC', 173 local_roles="('abc')" 174 )) 175 self.assertEqual(len(errs),1) 176 self.assertEqual(errs, [('local_roles', 'no list')]) 177 errs, inv_errs, conv_dict = self.proc.checkConversion( 178 dict(faculty_code='ABC', 179 local_roles="[('ABC')]" 180 )) 181 self.assertEqual(len(errs),1) 182 self.assertEqual(errs, [('local_roles', 'no dicts')]) 183 errs, inv_errs, conv_dict = self.proc.checkConversion( 184 dict(faculty_code='ABC', 185 local_roles="('abc')" 186 )) 187 self.assertEqual(len(errs),1) 188 self.assertEqual(errs, [('local_roles', 'no list')]) 189 errs, inv_errs, conv_dict = self.proc.checkConversion( 190 dict(faculty_code='ABC', local_roles= 191 "[{'name':'bob','local_role':'waeup.local.DepartmentManager'},]" 192 )) 193 self.assertEqual(len(errs),1) 194 self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')]) 195 errs, inv_errs, conv_dict = self.proc.checkConversion( 196 dict(faculty_code='ABC', local_roles= 197 "[{'user_name':'bob','localrole':'waeup.local.DepartmentManager'},]" 198 )) 199 self.assertEqual(len(errs),1) 200 self.assertEqual(errs, [('local_roles', 'user_name or local_role missing')]) 201 errs, inv_errs, conv_dict = self.proc.checkConversion( 202 dict(faculty_code='ABC', local_roles= 203 "[{'user_name':'bob','local_role':'waeup.local.Boss'},]" 204 )) 205 self.assertEqual(len(errs),1) 206 self.assertEqual(errs, [('local_roles', 'waeup.local.Boss not allowed')]) 207 errs, inv_errs, conv_dict = self.proc.checkConversion( 208 dict(faculty_code='ABC', local_roles= 209 "[{'user_name':'john','local_role':'waeup.local.DepartmentManager'},]" 210 )) 211 self.assertEqual(len(errs),1) 212 self.assertEqual(errs, [('local_roles', 'john does not exist')]) 213 errs, inv_errs, conv_dict = self.proc.checkConversion( 214 dict(faculty_code='ABC', local_roles= 215 "[{'user_name':'bob','local_role':'waeup.local.DepartmentManager'},]" 216 )) 217 self.assertEqual(len(errs),0) 218 return 219 220 def test_import(self): 221 num, num_warns, fin_file, fail_file = self.proc.doImport( 222 self.csv_file_faculty, FACULTY_HEADER_FIELDS) 223 content = open(fail_file).read() 224 self.assertEqual(num_warns,5) 225 self.assertEqual( 226 content, 227 'code,local_roles,--ERRORS--\r\n' 228 'CDE,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",' 229 'local_roles: alice does not exist\r\n' 230 'DEF,"[{\'user_name\':\'bob\',\'local_role\':\'waeup.local.Boss\'}]",' 231 'local_roles: waeup.local.Boss not allowed\r\n' 232 'EFG,[(\'anything\')],local_roles: no dicts\r\n' 233 'FGH,[,local_roles: Error\r\n' 234 'GHI,"[{\'user\':\'bob\',\'local\':\'waeup.local.DepartmentManager\'}]",' 235 'local_roles: user_name or local_role missing\r\n' 236 ) 237 # Bob got a local role in faculty ABC. 238 abc = self.app['faculties']['ABC'] 239 role_map = IPrincipalRoleMap(abc) 240 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 241 self.assertEqual(user_name, 'bob') 242 self.assertEqual(local_role, 'waeup.local.DepartmentManager') 243 shutil.rmtree(os.path.dirname(fin_file)) 244 return 245 246 def test_import_update(self): 247 self.app['faculties']['FAC2'] = Faculty(code='FAC2') 248 self.app['faculties']['FAC3'] = Faculty(code='FAC3') 249 self.app['faculties']['FAC4'] = Faculty(code='FAC4') 250 251 role_manager1 = IPrincipalRoleManager(self.app['faculties']['FAC1']) 252 role_manager1.assignRoleToPrincipal('alfonsrole', 'alfons') 253 role_map1 = IPrincipalRoleMap(self.app['faculties']['FAC1']) 254 self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 1) 255 256 role_manager2 = IPrincipalRoleManager(self.app['faculties']['FAC2']) 257 role_manager2.assignRoleToPrincipal('alfonsrole', 'alfons') 258 role_map2 = IPrincipalRoleMap(self.app['faculties']['FAC2']) 259 self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1) 260 261 role_manager3 = IPrincipalRoleManager(self.app['faculties']['FAC3']) 262 role_manager3.assignRoleToPrincipal('alfonsrole', 'alfons') 263 role_map3 = IPrincipalRoleMap(self.app['faculties']['FAC3']) 264 self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1) 265 266 role_manager4 = IPrincipalRoleManager(self.app['faculties']['FAC4']) 267 role_manager4.assignRoleToPrincipal('alfonsrole', 'alfons') 268 role_map4 = IPrincipalRoleMap(self.app['faculties']['FAC4']) 269 self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1) 270 271 local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0] 272 self.assertEqual(user_name, 'alfons') 273 self.assertEqual(local_role, 'alfonsrole') 274 275 num, num_warns, fin_file, fail_file = self.proc.doImport( 276 self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update') 277 self.assertEqual(num_warns,0) 278 # Local roles have been removed in FAC1 due to deletion marker. 279 self.assertEqual(len(role_map1.getPrincipalsAndRoles()), 0) 280 # Old local roles have been removed and new roles have been added in FAC2. 281 self.assertEqual(len(role_map2.getPrincipalsAndRoles()), 1) 282 local_role, user_name, setting = role_map2.getPrincipalsAndRoles()[0] 283 self.assertEqual(user_name, 'bob') 284 self.assertEqual(local_role, 'waeup.local.DepartmentManager') 285 # Local roles are not touched in FAC3 due to ignore marker. 286 self.assertEqual(len(role_map3.getPrincipalsAndRoles()), 1) 287 local_role, user_name, setting = role_map3.getPrincipalsAndRoles()[0] 288 self.assertEqual(user_name, 'alfons') 289 self.assertEqual(local_role, 'alfonsrole') 290 # Local roles are not touched in FAC4 due to empty cell. 291 self.assertEqual(len(role_map4.getPrincipalsAndRoles()), 1) 292 local_role, user_name, setting = role_map4.getPrincipalsAndRoles()[0] 293 self.assertEqual(user_name, 'alfons') 294 self.assertEqual(local_role, 'alfonsrole') 295 shutil.rmtree(os.path.dirname(fin_file)) 296 return 297 298 def test_import_update_logging(self): 299 self.app['faculties']['FAC2'] = Faculty(code='FAC2') 300 self.app['faculties']['FAC3'] = Faculty(code='FAC3') 301 self.app['faculties']['FAC4'] = Faculty(code='FAC4') 302 num, num_warns, fin_file, fail_file = self.proc.doImport( 303 self.csv_file_faculty_update, FACULTY_HEADER_FIELDS_UPDATE, 'update') 304 self.assertEqual(num_warns,0) 305 logcontent = open(self.logfile).read() 306 # Logging message from updateEntry 307 self.assertTrue( 308 'INFO - system - Department Processor - DEP2 - ' 309 'Record updated: local_roles=anne|waeup.local.DepartmentManager, ' 310 'code=DEP2' 311 in logcontent) 312 313 def test_upload_import_reupload(self): 314 self.browser.addHeader('Authorization', 'Basic mgr:mgrpw') 315 self.browser.open(self.datacenter_path) 316 self.assertEqual(self.browser.headers['Status'], '200 Ok') 317 self.assertEqual(self.browser.url, self.datacenter_path) 318 self.browser.getLink("Upload data").click() 319 file = open(self.csv_file_faculty) 320 ctrl = self.browser.getControl(name='uploadfile:file') 321 file_ctrl = ctrl.mech_control 322 file_ctrl.add_file(file, filename='sample_faculty_data.csv') 323 self.browser.getControl('Upload').click() 324 self.browser.getLink('Process data').click() 325 self.browser.getControl(name="select").click() 326 importerselect = self.browser.getControl(name='importer') 327 importerselect.getControl('Faculty Processor').selected = True 328 modeselect = self.browser.getControl(name='mode') 329 modeselect.getControl(value='create').selected = True 330 self.browser.getControl('Proceed to step 3').click() 331 self.assertTrue('Header fields OK' in self.browser.contents) 332 self.browser.getControl('Perform import').click() 333 self.assertTrue('Successfully processed 1 rows' in self.browser.contents) 334 # We import the same file a second time. 335 self.browser.open(self.datacenter_path) 336 self.browser.getLink("Upload data").click() 337 file = open(self.csv_file_faculty) 338 ctrl = self.browser.getControl(name='uploadfile:file') 339 file_ctrl = ctrl.mech_control 340 file_ctrl.add_file(file, filename='sample_faculty_data.csv') 341 self.browser.getControl('Upload').click() 342 self.assertTrue( 343 'File with same name was uploaded earlier' in self.browser.contents) 344 return 345 346 class TestDepartmentProcessor(UniversityProcessorSetup): 347 348 def setUp(self): 349 super(TestDepartmentProcessor, self).setUp() 62 class TestDepartmentProcessors(unittest.TestCase): 63 64 def setUp(self): 350 65 self.proc = DepartmentProcessor() 351 66 self.site0 = dict() … … 354 69 self.site3 = dict(faculties=dict(FAC=dict(DPT='pseudo department'))) 355 70 self.row = dict(code='DPT', faculty_code='FAC') 356 357 self.csv_file_department = os.path.join(self.workdir, 'sample_department_data.csv')358 open(self.csv_file_department, 'wb').write(DEPARTMENT_SAMPLE_DATA)359 71 return 360 72 … … 403 115 return 404 116 405 def test_import(self): 406 num, num_warns, fin_file, fail_file = self.proc.doImport( 407 self.csv_file_department, DEPARTMENT_HEADER_FIELDS) 408 content = open(fail_file).read() 409 self.assertEqual(num_warns,6) 410 self.assertEqual( 411 content, 412 'faculty_code,code,local_roles,--ERRORS--\r\n' 413 'FAC1,DEP2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.DepartmentManager\'}]",' 414 'local_roles: alice does not exist\r\n' 415 'FAC1,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",' 416 'local_roles: waeup.local.Boss not allowed\r\n' 417 'FAC1,DEP2,[(\'anything\')],local_roles: no dicts\r\n' 418 'FAC1,DEP2,[,local_roles: Error\r\n' 419 'FAC1,DEP2,"[{\'user\':\'anne\',\'local\':\'waeup.local.DepartmentManager\'}]",' 420 'local_roles: user_name or local_role missing\r\n' 421 'FAC11,DEP2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.DepartmentManager\'}]",' 422 'Not all parents do exist yet. Skipping\r\n' 423 ) 424 # Anne got a local role in department DEP2. 425 dep = self.app['faculties']['FAC1']['DEP2'] 426 role_map = IPrincipalRoleMap(dep) 427 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 428 self.assertEqual(user_name, 'anne') 429 self.assertEqual(local_role, 'waeup.local.DepartmentManager') 430 shutil.rmtree(os.path.dirname(fin_file)) 431 return 432 433 class CourseProcessorTests(UniversityProcessorSetup): 117 class CourseProcessorTests(FunctionalTestCase): 118 119 layer = FunctionalLayer 434 120 435 121 def setUp(self): 436 122 super(CourseProcessorTests, self).setUp() 437 123 self.proc = CourseProcessor() 438 self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRS1") 439 self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRS1") 440 self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS2") 441 self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRS1") 442 443 self.csv_file_course = os.path.join(self.workdir, 'sample_course_data.csv') 444 open(self.csv_file_course, 'wb').write(COURSE_SAMPLE_DATA) 124 self.site0 = dict() 125 self.site1 = dict(faculties=dict()) 126 self.site2 = dict(faculties=dict(FAC=dict())) 127 self.department1 = Department(code='DPT') 128 self.department2 = Department(code='DPT') 129 self.course = Course(code='CRS') 130 self.department2.courses['CRS'] = self.course 131 self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1))) 132 self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2))) 133 self.row = dict(department_code='DPT', faculty_code='FAC', code="CRS") 445 134 return 446 135 … … 453 142 def test_parents_exist(self): 454 143 # make sure we lookup parents correctly 455 result1 = self.proc.parentsExist(self.row1, self.app) 456 result2 = self.proc.parentsExist(self.row2, self.app) 457 result3 = self.proc.parentsExist(self.row3, self.app) 144 result0 = self.proc.parentsExist(self.row, self.site0) 145 result1 = self.proc.parentsExist(self.row, self.site1) 146 result2 = self.proc.parentsExist(self.row, self.site2) 147 result3 = self.proc.parentsExist(self.row, self.site3) 148 result4 = self.proc.parentsExist(self.row, self.site4) 149 self.assertTrue(result0 is False) 458 150 self.assertTrue(result1 is False) 459 151 self.assertTrue(result2 is False) 460 152 self.assertTrue(result3 is True) 153 self.assertTrue(result4 is True) 461 154 return 462 155 463 156 def test_entry_exists(self): 464 157 # make sure we find an entry if it exists 465 result1 = self.proc.entryExists(self.row1, self.app) 466 result2 = self.proc.entryExists(self.row2, self.app) 467 result3 = self.proc.entryExists(self.row3, self.app) 468 result4 = self.proc.entryExists(self.row4, self.app) 158 result0 = self.proc.entryExists(self.row, self.site0) 159 result1 = self.proc.entryExists(self.row, self.site1) 160 result2 = self.proc.entryExists(self.row, self.site2) 161 result3 = self.proc.entryExists(self.row, self.site3) 162 result4 = self.proc.entryExists(self.row, self.site4) 163 self.assertTrue(result0 is False) 469 164 self.assertTrue(result1 is False) 470 165 self.assertTrue(result2 is False) … … 475 170 def test_get_entry(self): 476 171 # make sure we can get an entry if it exists 477 result1 = self.proc.getEntry(self.row 1, self.app)478 result2 = self.proc.getEntry(self.row 4, self.app)172 result1 = self.proc.getEntry(self.row, self.site3) 173 result2 = self.proc.getEntry(self.row, self.site4) 479 174 self.assertTrue(result1 is None) 480 175 self.assertTrue(result2 is self.course) … … 483 178 def test_del_entry(self): 484 179 # make sure we can delete entries 485 self.assertTrue('CRS1' in self.app['faculties']['FAC1']['DEP1'].courses.keys()) 486 self.proc.delEntry(self.row4, self.app) 487 self.assertTrue('CRS1' not in self.app['faculties']['FAC1']['DEP1'].courses.keys()) 488 return 489 490 def test_import(self): 491 num, num_warns, fin_file, fail_file = self.proc.doImport( 492 self.csv_file_course, COURSE_HEADER_FIELDS) 493 content = open(fail_file).read() 494 self.assertEqual(num_warns,6) 495 self.assertEqual( 496 content, 497 'faculty_code,department_code,code,local_roles,--ERRORS--\r\n' 498 'FAC1,DEP1,CRS2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.Lecturer\'}]",' 499 'local_roles: alice does not exist\r\n' 500 'FAC1,DEP1,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",' 501 'local_roles: waeup.local.Boss not allowed\r\n' 502 'FAC1,DEP1,CRS2,[(\'anything\')],local_roles: no dicts\r\n' 503 'FAC1,DEP1,CRS2,[,local_roles: Error\r\n' 504 'FAC1,DEP1,CRS2,"[{\'user\':\'anne\',\'local\':\'waeup.local.Lecturer\'}]",' 505 'local_roles: user_name or local_role missing\r\n' 506 'FAC11,DEP2,CRS2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Lecturer\'}]",' 507 'Not all parents do exist yet. Skipping\r\n' 508 ) 509 # Anne got a local role in course CRS2. 510 dep = self.app['faculties']['FAC1']['DEP1'].courses['CRS2'] 511 role_map = IPrincipalRoleMap(dep) 512 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 513 self.assertEqual(user_name, 'anne') 514 self.assertEqual(local_role, 'waeup.local.Lecturer') 515 shutil.rmtree(os.path.dirname(fin_file)) 516 return 517 518 519 class CertificateProcessorTests(UniversityProcessorSetup): 180 self.assertTrue('CRS' in self.department2.courses.keys()) 181 self.proc.delEntry(self.row, self.site4) 182 self.assertTrue('CRS' not in self.department2.courses.keys()) 183 return 184 185 class CertificateProcessorTests(FunctionalTestCase): 186 187 layer = FunctionalLayer 520 188 521 189 def setUp(self): 522 190 super(CertificateProcessorTests, self).setUp() 523 191 self.proc = CertificateProcessor() 524 self.row1 = dict(department_code='DEP2', faculty_code='FAC1', code="CRT1") 525 self.row2 = dict(department_code='DEP1', faculty_code='FAC2', code="CRT1") 526 self.row3 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT2") 527 self.row4 = dict(department_code='DEP1', faculty_code='FAC1', code="CRT1") 528 529 self.csv_file_certificate = os.path.join(self.workdir, 'sample_certificate_data.csv') 530 open(self.csv_file_certificate, 'wb').write(CERTIFICATE_SAMPLE_DATA) 531 return 532 533 def test_ifaces(self): 534 # Make sure we fullfill all interface contracts 535 verifyClass(IBatchProcessor, CourseProcessor) 192 self.site0 = dict() 193 self.site1 = dict(faculties=dict()) 194 self.site2 = dict(faculties=dict(FAC=dict())) 195 self.department1 = Department(code='DPT') 196 self.department2 = Department(code='DPT') 197 self.certificate = Certificate(code='CRT') 198 self.department2.certificates['CRT'] = self.certificate 199 self.site3 = dict(faculties=dict(FAC=dict(DPT=self.department1))) 200 self.site4 = dict(faculties=dict(FAC=dict(DPT=self.department2))) 201 self.row = dict(department_code='DPT', faculty_code='FAC', code="CRT") 202 return 203 204 def test_ifaces(self): 205 # Make sure we fullfill all interface contracts 206 verifyClass(IBatchProcessor, CertificateProcessor) 536 207 verifyObject(IBatchProcessor, self.proc) 537 208 return 538 209 539 210 def test_parents_exist(self): 540 # make sure we lookup parents correctly 541 result1 = self.proc.parentsExist(self.row1, self.app) 542 result2 = self.proc.parentsExist(self.row2, self.app) 543 result3 = self.proc.parentsExist(self.row3, self.app) 211 # make sure we can find all certificate parents 212 result0 = self.proc.parentsExist(self.row, self.site0) 213 result1 = self.proc.parentsExist(self.row, self.site1) 214 result2 = self.proc.parentsExist(self.row, self.site2) 215 result3 = self.proc.parentsExist(self.row, self.site3) 216 result4 = self.proc.parentsExist(self.row, self.site4) 217 self.assertTrue(result0 is False) 544 218 self.assertTrue(result1 is False) 545 219 self.assertTrue(result2 is False) 546 220 self.assertTrue(result3 is True) 221 self.assertTrue(result4 is True) 547 222 return 548 223 549 224 def test_entry_exists(self): 550 225 # make sure we find an entry if it exists 551 result1 = self.proc.entryExists(self.row1, self.app) 552 result2 = self.proc.entryExists(self.row2, self.app) 553 result3 = self.proc.entryExists(self.row3, self.app) 554 result4 = self.proc.entryExists(self.row4, self.app) 226 result0 = self.proc.entryExists(self.row, self.site0) 227 result1 = self.proc.entryExists(self.row, self.site1) 228 result2 = self.proc.entryExists(self.row, self.site2) 229 result3 = self.proc.entryExists(self.row, self.site3) 230 result4 = self.proc.entryExists(self.row, self.site4) 231 self.assertTrue(result0 is False) 555 232 self.assertTrue(result1 is False) 556 233 self.assertTrue(result2 is False) … … 561 238 def test_get_entry(self): 562 239 # make sure we can get an entry if it exists 563 result1 = self.proc.getEntry(self.row 1, self.app)564 result2 = self.proc.getEntry(self.row 4, self.app)240 result1 = self.proc.getEntry(self.row, self.site3) 241 result2 = self.proc.getEntry(self.row, self.site4) 565 242 self.assertTrue(result1 is None) 566 243 self.assertTrue(result2 is self.certificate) … … 569 246 def test_del_entry(self): 570 247 # make sure we can delete entries 571 self.assertTrue('CRT1' in self.app['faculties']['FAC1']['DEP1'].certificates.keys()) 572 self.proc.delEntry(self.row4, self.app) 573 self.assertTrue('CRT1' not in self.app['faculties']['FAC1']['DEP1'].certificates.keys()) 574 return 575 576 def test_import(self): 577 num, num_warns, fin_file, fail_file = self.proc.doImport( 578 self.csv_file_certificate, CERTIFICATE_HEADER_FIELDS) 579 content = open(fail_file).read() 580 self.assertEqual(num_warns,6) 581 self.assertEqual( 582 content, 583 'faculty_code,department_code,code,local_roles,--ERRORS--\r\n' 584 'FAC1,DEP1,CRT2,"[{\'user_name\':\'alice\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",' 585 'local_roles: alice does not exist\r\n' 586 'FAC1,DEP1,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.Boss\'}]",' 587 'local_roles: waeup.local.Boss not allowed\r\n' 588 'FAC1,DEP1,CRT2,[(\'anything\')],local_roles: no dicts\r\n' 589 'FAC1,DEP1,CRT2,[,local_roles: Error\r\n' 590 'FAC1,DEP1,CRT2,"[{\'user\':\'anne\',\'local\':\'waeup.local.CourseAdviser100\'}]",' 591 'local_roles: user_name or local_role missing\r\n' 592 'FAC11,DEP2,CRT2,"[{\'user_name\':\'anne\',\'local_role\':\'waeup.local.CourseAdviser100\'}]",' 593 'Not all parents do exist yet. Skipping\r\n' 594 ) 595 # Anne got a local role in certificate CRT2. 596 dep = self.app['faculties']['FAC1']['DEP1'].certificates['CRT2'] 597 role_map = IPrincipalRoleMap(dep) 598 local_role, user_name, setting = role_map.getPrincipalsAndRoles()[0] 599 self.assertEqual(user_name, 'anne') 600 self.assertEqual(local_role, 'waeup.local.CourseAdviser100') 601 shutil.rmtree(os.path.dirname(fin_file)) 602 603 logcontent = open(self.logfile).read() 604 # Logging message from updateEntry 605 self.assertTrue( 606 'INFO - system - Certificate Processor - CRT2 - Record updated: ' 607 'local_roles=anne|waeup.local.CourseAdviser100, code=CRT2' 608 in logcontent) 609 610 return 611 248 self.assertTrue('CRT' in self.department2.certificates.keys()) 249 self.proc.delEntry(self.row, self.site4) 250 self.assertTrue('CRT' not in self.department2.certificates.keys()) 251 return 612 252 613 253 class CertCourseProcessorTests(FunctionalTestCase): … … 626 266 self.department2.certificates['CRT'] = self.certificate 627 267 self.course = Course(code='CRS') 628 self.department2.certificates['CRT'].addC ertCourse(self.course)268 self.department2.certificates['CRT'].addCourseRef(self.course) 629 269 self.cert_course = self.certificate['CRS_100'] 630 270 self.department2.courses['CRS'] = self.course -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/tests/test_export.py
r9209 r9211 22 22 from zope.component import queryUtility 23 23 from zope.interface.verify import verifyObject, verifyClass 24 from zope.securitypolicy.interfaces import IPrincipalRoleManager25 24 from waeup.kofa.interfaces import ICSVExporter 26 from waeup.kofa.testing import KofaUnitTestLayer , FunctionalLayer25 from waeup.kofa.testing import KofaUnitTestLayer 27 26 from waeup.kofa.university import ( 28 27 FacultiesContainer, Faculty, Department, Course, Certificate, … … 35 34 class FacultyExporterTest(unittest.TestCase): 36 35 37 layer = FunctionalLayer36 layer = KofaUnitTestLayer 38 37 39 38 def setUp(self): … … 67 66 self.assertEqual( 68 67 result, 69 'code,title,title_prefix ,users_with_local_roles\r\n'70 'F1,Faculty of Cheese,faculty ,[]\r\n'68 'code,title,title_prefix\r\n' 69 'F1,Faculty of Cheese,faculty\r\n' 71 70 ) 72 71 return … … 79 78 self.assertEqual( 80 79 result, 81 'code,title,title_prefix ,users_with_local_roles\r\n'82 'F1,Faculty of Cheese,faculty ,[]\r\n'80 'code,title,title_prefix\r\n' 81 'F1,Faculty of Cheese,faculty\r\n' 83 82 ) 84 83 return … … 97 96 self.assertEqual( 98 97 result, 99 'code,title,title_prefix ,users_with_local_roles\r\n'100 'F1,Faculty of Cheese,faculty ,[]\r\n'101 'F2,Centre of Onion,centre ,[]\r\n'98 'code,title,title_prefix\r\n' 99 'F1,Faculty of Cheese,faculty\r\n' 100 'F2,Centre of Onion,centre\r\n' 102 101 ) 103 102 return … … 115 114 self.assertEqual( 116 115 result, 117 'code,title,title_prefix ,users_with_local_roles\r\n'118 'F1,Faculty of Cheese,faculty ,[]\r\n'119 'F2,Centre of Onion,centre ,[]\r\n'116 'code,title,title_prefix\r\n' 117 'F1,Faculty of Cheese,faculty\r\n' 118 'F2,Centre of Onion,centre\r\n' 120 119 ) 121 120 return … … 124 123 # Tests for DepartmentExporter 125 124 126 layer = FunctionalLayer125 layer = KofaUnitTestLayer 127 126 128 127 def setUp(self): … … 142 141 self.fac1.addDepartment(self.dept2) 143 142 self.fac2.addDepartment(self.dept3) 144 role_manager = IPrincipalRoleManager(self.dept1)145 role_manager.assignRoleToPrincipal(u'bobsrole', u'bob')146 143 return 147 144 … … 170 167 self.assertEqual( 171 168 result, 172 'code,faculty_code,title,title_prefix,users_with_local_roles\r\n' 173 'D1,F1,Department of Cheddar,department,' 174 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 169 'code,faculty_code,title,title_prefix\r\n' 170 'D1,F1,Department of Cheddar,department\r\n' 175 171 ) 176 172 return … … 182 178 self.assertEqual( 183 179 result, 184 'code,faculty_code,title,title_prefix,users_with_local_roles\r\n' 185 'D1,F1,Department of Cheddar,department,' 186 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 187 'D2,F1,Institue of Gouda,institute,[]\r\n' 180 'code,faculty_code,title,title_prefix\r\n' 181 'D1,F1,Department of Cheddar,department\r\n' 182 'D2,F1,Institue of Gouda,institute\r\n' 188 183 ) 189 184 return … … 196 191 self.assertEqual( 197 192 result, 198 'code,faculty_code,title,title_prefix,users_with_local_roles\r\n' 199 'D1,F1,Department of Cheddar,department,' 200 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 201 'D2,F1,Institue of Gouda,institute,[]\r\n' 202 'D3,F2,Department of Rings,department,[]\r\n' 193 'code,faculty_code,title,title_prefix\r\n' 194 'D1,F1,Department of Cheddar,department\r\n' 195 'D2,F1,Institue of Gouda,institute\r\n' 196 'D3,F2,Department of Rings,department\r\n' 203 197 ) 204 198 return … … 210 204 self.assertEqual( 211 205 result, 212 'code,faculty_code,title,title_prefix,users_with_local_roles\r\n' 213 'D1,F1,Department of Cheddar,department,' 214 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 215 'D2,F1,Institue of Gouda,institute,[]\r\n' 216 'D3,F2,Department of Rings,department,[]\r\n' 206 'code,faculty_code,title,title_prefix\r\n' 207 'D1,F1,Department of Cheddar,department\r\n' 208 'D2,F1,Institue of Gouda,institute\r\n' 209 'D3,F2,Department of Rings,department\r\n' 217 210 ) 218 211 return … … 221 214 # Tests for CourseExporter 222 215 223 layer = FunctionalLayer216 layer = KofaUnitTestLayer 224 217 225 218 def setUp(self): … … 267 260 self.assertEqual( 268 261 result, 269 'code,faculty_code,department_code,title,credits,' 270 'passmark,semester,users_with_local_roles\r\n' 271 'C1,F1,D1,Cheese Basics,0,40,1,[]\r\n' 262 'code,faculty_code,department_code,title,credits,passmark,semester\r\n' 263 'C1,F1,D1,Cheese Basics,0,40,1\r\n' 272 264 ) 273 265 return … … 279 271 self.assertEqual( 280 272 result, 281 'code,faculty_code,department_code,title,credits,passmark,' 282 'semester,users_with_local_roles\r\n' 283 'C1,F1,D1,Cheese Basics,0,40,1,[]\r\n' 284 'C2,F1,D1,Advanced Cheese Making,0,40,1,[]\r\n' 273 'code,faculty_code,department_code,title,credits,passmark,semester\r\n' 274 'C1,F1,D1,Cheese Basics,0,40,1\r\n' 275 'C2,F1,D1,Advanced Cheese Making,0,40,1\r\n' 285 276 ) 286 277 return … … 293 284 self.assertEqual( 294 285 result, 295 'code,faculty_code,department_code,title,credits,passmark,' 296 'semester,users_with_local_roles\r\n' 297 'C1,F1,D1,Cheese Basics,0,40,1,[]\r\n' 298 'C2,F1,D1,Advanced Cheese Making,0,40,1,[]\r\n' 299 'C3,F1,D2,Selling Cheese,0,40,1,[]\r\n' 286 'code,faculty_code,department_code,title,credits,passmark,semester\r\n' 287 'C1,F1,D1,Cheese Basics,0,40,1\r\n' 288 'C2,F1,D1,Advanced Cheese Making,0,40,1\r\n' 289 'C3,F1,D2,Selling Cheese,0,40,1\r\n' 300 290 ) 301 291 return … … 307 297 self.assertEqual( 308 298 result, 309 'code,faculty_code,department_code,title,credits,passmark,' 310 'semester,users_with_local_roles\r\n' 311 'C1,F1,D1,Cheese Basics,0,40,1,[]\r\n' 312 'C2,F1,D1,Advanced Cheese Making,0,40,1,[]\r\n' 313 'C3,F1,D2,Selling Cheese,0,40,1,[]\r\n' 299 'code,faculty_code,department_code,title,credits,passmark,semester\r\n' 300 'C1,F1,D1,Cheese Basics,0,40,1\r\n' 301 'C2,F1,D1,Advanced Cheese Making,0,40,1\r\n' 302 'C3,F1,D2,Selling Cheese,0,40,1\r\n' 314 303 ) 315 304 return … … 318 307 # Tests for CertificateExporter 319 308 320 layer = FunctionalLayer309 layer = KofaUnitTestLayer 321 310 322 311 def setUp(self): … … 350 339 self.dept1.certificates.addCertificate(self.cert2) 351 340 self.dept2.certificates.addCertificate(self.cert3) 352 role_manager = IPrincipalRoleManager(self.cert1)353 role_manager.assignRoleToPrincipal(u'bobsrole', u'bob')354 341 return 355 342 … … 378 365 self.assertEqual( 379 366 result, 380 'code,faculty_code,department_code,title,study_mode,start_level,' 381 'end_level,application_category,school_fee_1,' 382 'school_fee_2,school_fee_3,school_fee_4,users_with_local_roles\r\n' 383 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,,,,' 384 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 367 'code,faculty_code,department_code,title,study_mode,start_level,end_level,application_category,school_fee_1,school_fee_2\r\n' 368 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,\r\n' 385 369 ) 386 370 return … … 392 376 self.assertEqual( 393 377 result, 394 'code,faculty_code,department_code,title,study_mode,start_level,' 395 'end_level,application_category,school_fee_1,' 396 'school_fee_2,school_fee_3,school_fee_4,users_with_local_roles\r\n' 397 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,,,,' 398 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 399 'CERT2,F1,D1,Master of Cheddar,ct_ft,400,700,cest,,,,,[]\r\n' 378 'code,faculty_code,department_code,title,study_mode,start_level,end_level,application_category,school_fee_1,school_fee_2\r\n' 379 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,\r\n' 380 'CERT2,F1,D1,Master of Cheddar,ct_ft,400,700,cest,,\r\n' 400 381 ) 401 382 return … … 408 389 self.assertEqual( 409 390 result, 410 'code,faculty_code,department_code,title,study_mode,start_level,' 411 'end_level,application_category,' 412 'school_fee_1,school_fee_2,school_fee_3,school_fee_4,' 413 'users_with_local_roles\r\n' 414 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,,,,' 415 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 416 'CERT2,F1,D1,Master of Cheddar,ct_ft,400,700,cest,,,,,[]\r\n' 417 'CERT3,F1,D2,Cert. of Rubbish,dp_pt,100,200,no,,,,,[]\r\n' 391 'code,faculty_code,department_code,title,study_mode,start_level,end_level,application_category,school_fee_1,school_fee_2\r\n' 392 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,\r\n' 393 'CERT2,F1,D1,Master of Cheddar,ct_ft,400,700,cest,,\r\n' 394 'CERT3,F1,D2,Cert. of Rubbish,dp_pt,100,200,no,,\r\n' 418 395 ) 419 396 return … … 425 402 self.assertEqual( 426 403 result, 427 'code,faculty_code,department_code,title,study_mode,start_level,' 428 'end_level,application_category,' 429 'school_fee_1,school_fee_2,school_fee_3,school_fee_4,' 430 'users_with_local_roles\r\n' 431 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,,,,' 432 '"[{\'user_name\': u\'bob\', \'local_role\': u\'bobsrole\'}]"\r\n' 433 'CERT2,F1,D1,Master of Cheddar,ct_ft,400,700,cest,,,,,[]\r\n' 434 'CERT3,F1,D2,Cert. of Rubbish,dp_pt,100,200,no,,,,,[]\r\n' 404 'code,faculty_code,department_code,title,study_mode,start_level,end_level,application_category,school_fee_1,school_fee_2\r\n' 405 'CERT1,F1,D1,Master of Cheese,ct_ft,100,300,basic,,\r\n' 406 'CERT2,F1,D1,Master of Cheddar,ct_ft,400,700,cest,,\r\n' 407 'CERT3,F1,D2,Cert. of Rubbish,dp_pt,100,200,no,,\r\n' 435 408 ) 436 409 return … … 471 444 self.dept1.certificates.addCertificate(self.cert2) 472 445 self.dept2.certificates.addCertificate(self.cert3) 473 self.cert1.addC ertCourse(self.course1, 100, True)474 self.cert1.addC ertCourse(self.course2, 400, False)475 self.cert3.addC ertCourse(self.course3, 100, False)446 self.cert1.addCourseRef(self.course1, 100, True) 447 self.cert1.addCourseRef(self.course2, 400, False) 448 self.cert3.addCourseRef(self.course3, 100, False) 476 449 self.certcourse1 = self.cert1['C1_100'] 477 450 self.certcourse2 = self.cert1['C2_400'] -
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/university/tests/test_university.py
r9209 r9211 33 33 def test_suite(): 34 34 # collect doctests for university subpackage 35 suite = get_doctest_suite(['university/c ertcourses.txt',])35 suite = get_doctest_suite(['university/courserefs.txt',]) 36 36 # add local unittests (actually only one) 37 37 for testcase in [UniversitySubpackageTests,]:
Note: See TracChangeset for help on using the changeset viewer.