## $Id: pages.py 9209 2012-09-20 08:49:37Z uli $
##
## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
""" Viewing components for Kofa objects.
"""
import csv
import grok
import os
import re
import sys
from urllib import urlencode
from zope import schema
from zope.authentication.interfaces import (
IAuthentication, IUnauthenticatedPrincipal, ILogout)
from zope.catalog.interfaces import ICatalog
from zope.component import (
getUtility, queryUtility, createObject, getAllUtilitiesRegisteredFor,
getUtilitiesFor,
)
from zope.event import notify
from zope.securitypolicy.interfaces import IPrincipalRoleManager
from zope.session.interfaces import ISession
from zope.password.interfaces import IPasswordManager
from waeup.kofa.browser.layout import (
KofaPage, KofaForm, KofaEditFormPage, KofaAddFormPage,
KofaDisplayFormPage, NullValidator)
from waeup.kofa.browser.interfaces import (
IUniversity, IFacultiesContainer, IFaculty, IFacultyAdd,
IDepartment, IDepartmentAdd, ICourse, ICourseAdd, ICertificate,
ICertificateAdd, ICertificateCourse, ICertificateCourseAdd,
ICaptchaManager, IChangePassword)
from waeup.kofa.browser.layout import jsaction, action, UtilityView
from waeup.kofa.browser.resources import (
warning, tabs, datatable)
from waeup.kofa.interfaces import MessageFactory as _
from waeup.kofa.interfaces import(
IKofaObject, IUsersContainer, IUserAccount, IDataCenter,
IKofaXMLImporter, IKofaXMLExporter, IBatchProcessor,
ILocalRolesAssignable, DuplicationError, IConfigurationContainer,
ISessionConfiguration, ISessionConfigurationAdd, IJobManager,
IPasswordValidator, IContactForm, IKofaUtils, ICSVExporter,)
from waeup.kofa.permissions import (
get_users_with_local_roles, get_all_roles, get_all_users)
from waeup.kofa.students.catalog import search as searchstudents
from waeup.kofa.university.catalog import search
from waeup.kofa.university.vocabularies import course_levels
from waeup.kofa.authentication import LocalRoleSetEvent
from waeup.kofa.widgets.htmlwidget import HTMLDisplayWidget
from waeup.kofa.utils.helpers import get_user_account
from waeup.kofa.mandates.mandate import PasswordMandate
grok.context(IKofaObject)
grok.templatedir('templates')
def add_local_role(view, tab, **data):
localrole = view.request.form.get('local_role', None)
user = view.request.form.get('user', None)
if user is None or localrole is None:
view.flash('No user selected.')
view.redirect(view.url(view.context, '@@manage')+'?tab%s' % tab)
return
role_manager = IPrincipalRoleManager(view.context)
role_manager.assignRoleToPrincipal(localrole, user)
notify(LocalRoleSetEvent(view.context, localrole, user, granted=True))
ob_class = view.__implemented__.__name__.replace('waeup.kofa.','')
grok.getSite().logger.info(
'%s - added: %s|%s' % (ob_class, user, localrole))
view.redirect(view.url(view.context, u'@@manage')+'?tab%s' % tab)
return
def del_local_roles(view, tab, **data):
child_ids = view.request.form.get('role_id', None)
if child_ids is None:
view.flash(_('No local role selected.'))
view.redirect(view.url(view.context, '@@manage')+'?tab%s' % tab)
return
if not isinstance(child_ids, list):
child_ids = [child_ids]
deleted = []
role_manager = IPrincipalRoleManager(view.context)
for child_id in child_ids:
localrole = child_id.split('|')[1]
user_name = child_id.split('|')[0]
try:
role_manager.unsetRoleForPrincipal(localrole, user_name)
notify(LocalRoleSetEvent(
view.context, localrole, user_name, granted=False))
deleted.append(child_id)
except:
view.flash('Could not remove %s: %s: %s' % (
child_id, sys.exc_info()[0], sys.exc_info()[1]))
if len(deleted):
view.flash(
_('Local role successfully removed: ${a}',
mapping = {'a':', '.join(deleted)}))
ob_class = view.__implemented__.__name__.replace('waeup.kofa.','')
grok.getSite().logger.info(
'%s - removed: %s' % (ob_class, ', '.join(deleted)))
view.redirect(view.url(view.context, u'@@manage')+'?tab%s' % tab)
return
def delSubobjects(view, redirect, tab=None, subcontainer=None):
form = view.request.form
if form.has_key('val_id'):
child_id = form['val_id']
else:
view.flash(_('No item selected.'))
if tab:
view.redirect(view.url(view.context, redirect)+'?tab%s' % tab)
else:
view.redirect(view.url(view.context, redirect))
return
if not isinstance(child_id, list):
child_id = [child_id]
deleted = []
for id in child_id:
try:
if subcontainer:
container = getattr(view.context, subcontainer, None)
del container[id]
else:
del view.context[id]
deleted.append(id)
except:
view.flash('Could not delete %s: %s: %s' % (
id, sys.exc_info()[0], sys.exc_info()[1]))
if len(deleted):
view.flash(_('Successfully removed: ${a}',
mapping = {'a': ', '.join(deleted)}))
ob_class = view.__implemented__.__name__.replace('waeup.kofa.','')
grok.getSite().logger.info(
'%s - removed: %s' % (ob_class, ', '.join(deleted)))
if tab:
view.redirect(view.url(view.context, redirect)+'?tab%s' % tab)
else:
view.redirect(view.url(view.context, redirect))
return
# Save function used for save methods in pages
def msave(view, **data):
changed_fields = view.applyData(view.context, **data)
# Turn list of lists into single list
if changed_fields:
changed_fields = reduce(lambda x,y: x+y, changed_fields.values())
fields_string = ' + '.join(changed_fields)
view.flash(_('Form has been saved.'))
ob_class = view.__implemented__.__name__.replace('waeup.kofa.','')
if fields_string:
grok.getSite().logger.info('%s - %s - saved: %s' % (ob_class, view.context.__name__, fields_string))
return
#
# Login/logout and language switch pages...
#
class LoginPage(KofaPage):
"""A login page, available for all objects.
"""
grok.name('login')
grok.context(IKofaObject)
grok.require('waeup.Public')
label = _(u'Login')
camefrom = None
login_button = label
def update(self, SUBMIT=None, camefrom=None):
self.camefrom = camefrom
if SUBMIT is not None:
if self.request.principal.id != 'zope.anybody':
self.flash(_('You logged in.'))
if self.request.principal.user_type == 'student':
rel_link = '/students/%s' % self.request.principal.id
self.redirect(self.application_url() + rel_link)
return
elif self.request.principal.user_type == 'applicant':
container, application_number = self.request.principal.id.split('_')
rel_link = '/applicants/%s/%s' % (
container, application_number)
self.redirect(self.application_url() + rel_link)
return
if not self.camefrom:
# User might have entered the URL directly. Let's beam
# him back to our context.
self.redirect(self.url(self.context))
return
self.redirect(self.camefrom)
return
# Display appropriate flash message if credentials are correct
# but student has been deactivated.
login = self.request.form['form.login']
if len(login) == 8 and grok.getSite()['students'].has_key(login):
student = grok.getSite()['students'][
self.request.form['form.login']]
password = self.request.form['form.password']
passwordmanager = getUtility(IPasswordManager, 'SSHA')
if passwordmanager.checkPassword(student.password, password):
self.flash(_('Your account has been deactivated.'))
return
self.flash(_('You entered invalid credentials.'))
class LogoutPage(KofaPage):
"""A logout page. Calling this page will log the current user out.
"""
grok.context(IKofaObject)
grok.require('waeup.Public')
grok.name('logout')
def update(self):
if not IUnauthenticatedPrincipal.providedBy(self.request.principal):
auth = getUtility(IAuthentication)
ILogout(auth).logout(self.request)
self.flash(_("You have been logged out. Thanks for using WAeUP Kofa!"))
self.redirect(self.application_url())
class LanguageChangePage(KofaPage):
""" Language switch
"""
grok.context(IKofaObject)
grok.name('change_language')
grok.require('waeup.Public')
def update(self, lang='en', view_name='@@index'):
self.response.setCookie('kofa.language', lang, path='/')
self.redirect(self.url(self.context, view_name))
return
def render(self):
return
#
# Contact form...
#
class ContactAdminForm(KofaForm):
grok.name('contactadmin')
#grok.context(IUniversity)
grok.template('contactform')
grok.require('waeup.Authenticated')
pnav = 2
form_fields = grok.AutoFields(IContactForm).select('body')
@property
def config(self):
return grok.getSite()['configuration']
def label(self):
return _(u'Contact ${a}', mapping = {'a': self.config.name_admin})
@property
def get_user_account(self):
return get_user_account(self.request)
@action(_('Send message now'), style='primary')
def send(self, *args, **data):
fullname = self.request.principal.title
try:
email = self.request.principal.email
except AttributeError:
email = self.config.email_admin
username = self.request.principal.id
usertype = getattr(self.request.principal,
'user_type', 'system').title()
kofa_utils = getUtility(IKofaUtils)
success = kofa_utils.sendContactForm(
fullname,email,
self.config.name_admin,self.config.email_admin,
username,usertype,self.config.name,
data['body'],self.config.email_subject)
# Success is always True if sendContactForm didn't fail.
# TODO: Catch exceptions.
if success:
self.flash(_('Your message has been sent.'))
return
class EnquiriesForm(ContactAdminForm):
"""Captcha'd page to let anonymous send emails to the administrator.
"""
grok.name('enquiries')
grok.require('waeup.Public')
pnav = 2
form_fields = grok.AutoFields(IContactForm).select(
'fullname', 'email_from', 'body')
def update(self):
# Handle captcha
self.captcha = getUtility(ICaptchaManager).getCaptcha()
self.captcha_result = self.captcha.verify(self.request)
self.captcha_code = self.captcha.display(self.captcha_result.error_code)
return super(EnquiriesForm, self).update()
@action(_('Send now'), style='primary')
def send(self, *args, **data):
if not self.captcha_result.is_valid:
# Captcha will display error messages automatically.
# No need to flash something.
return
kofa_utils = getUtility(IKofaUtils)
success = kofa_utils.sendContactForm(
data['fullname'],data['email_from'],
self.config.name_admin,self.config.email_admin,
u'None',u'Anonymous',self.config.name,
data['body'],self.config.email_subject)
if success:
self.flash(_('Your message has been sent.'))
else:
self.flash(_('A smtp server error occurred.'))
return
#
# University related pages...
#
class UniversityPage(KofaDisplayFormPage):
""" The main university page.
"""
grok.require('waeup.Public')
grok.name('index')
grok.context(IUniversity)
pnav = 0
label = ''
@property
def frontpage(self):
lang = self.request.cookies.get('kofa.language')
html = self.context['configuration'].frontpage_dict.get(lang,'')
if html =='':
portal_language = getUtility(IKofaUtils).PORTAL_LANGUAGE
html = self.context[
'configuration'].frontpage_dict.get(portal_language,'')
if html =='':
return _(u'
Welcome to WAeUP.Kofa
')
else:
return html
class AdministrationPage(KofaPage):
""" The administration overview page.
"""
grok.name('administration')
grok.context(IUniversity)
grok.require('waeup.managePortal')
label = _(u'Administration')
pnav = 0
class RSS20Feed(grok.View):
"""An RSS 2.0 feed.
"""
grok.name('feed.rss')
grok.context(IUniversity)
grok.require('waeup.Public')
grok.template('universityrss20feed')
name = 'General news feed'
description = 'waeup.kofa now supports RSS 2.0 feeds :-)'
language = None
date = None
buildDate = None
editor = None
webmaster = None
@property
def title(self):
return getattr(grok.getSite(), 'name', u'Sample University')
@property
def contexttitle(self):
return self.name
@property
def link(self):
return self.url(grok.getSite())
def update(self):
self.response.setHeader('Content-Type', 'text/xml; charset=UTF-8')
def entries(self):
return ()
#
# User container pages...
#
class UsersContainerPage(KofaPage):
"""Overview page for all local users.
"""
grok.require('waeup.manageUsers')
grok.context(IUsersContainer)
grok.name('index')
label = _('Portal Users')
manage_button = _(u'Manage')
delete_button = _(u'Remove')
def update(self, userid=None, adduser=None, manage=None, delete=None):
datatable.need()
if manage is not None and userid is not None:
self.redirect(self.url(userid) + '/@@manage')
if delete is not None and userid is not None:
self.context.delUser(userid)
self.flash(_('User account ${a} successfully deleted.',
mapping = {'a': userid}))
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.logger.info(
'%s - removed: %s' % (ob_class, userid))
def getLocalRoles(self, account):
local_roles = account.getLocalRoles()
local_roles_string = ''
site_url = self.url(grok.getSite())
for local_role in local_roles.keys():
role_title = dict(get_all_roles())[local_role].title
objects_string = ''
for object in local_roles[local_role]:
objects_string += '%s, ' %(self.url(object),
self.url(object).replace(site_url,''))
local_roles_string += '%s:
%s
' %(role_title,
objects_string.rstrip(', '))
return local_roles_string
def getSiteRoles(self, account):
site_roles = account.roles
site_roles_string = ''
for site_role in site_roles:
role_title = dict(get_all_roles())[site_role].title
site_roles_string += '%s
' % role_title
return site_roles_string
class AddUserFormPage(KofaAddFormPage):
"""Add a user account.
"""
grok.require('waeup.manageUsers')
grok.context(IUsersContainer)
grok.name('add')
grok.template('usereditformpage')
form_fields = grok.AutoFields(IUserAccount)
label = _('Add user')
@action(_('Add user'), style='primary')
def addUser(self, **data):
name = data['name']
title = data['title']
email = data['email']
phone = data['phone']
description = data['description']
#password = data['password']
roles = data['roles']
form = self.request.form
password = form.get('password', None)
password_ctl = form.get('control_password', None)
if password:
validator = getUtility(IPasswordValidator)
errors = validator.validate_password(password, password_ctl)
if errors:
self.flash( ' '.join(errors))
return
try:
self.context.addUser(name, password, title=title, email=email,
phone=phone, description=description,
roles=roles)
self.flash(_('User account ${a} successfully added.',
mapping = {'a': name}))
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.logger.info(
'%s - added: %s' % (ob_class, name))
except KeyError:
self.status = self.flash('The userid chosen already exists '
'in the database.')
return
self.redirect(self.url(self.context))
class UserManageFormPage(KofaEditFormPage):
"""Manage a user account.
"""
grok.context(IUserAccount)
grok.name('manage')
grok.template('usereditformpage')
grok.require('waeup.manageUsers')
form_fields = grok.AutoFields(IUserAccount).omit('name')
def label(self):
return _("Edit user ${a}", mapping = {'a':self.context.__name__})
def setUpWidgets(self, ignore_request=False):
super(UserManageFormPage,self).setUpWidgets(ignore_request)
self.widgets['title'].displayWidth = 30
self.widgets['description'].height = 3
return
@action(_('Save'), style='primary')
def save(self, **data):
form = self.request.form
password = form.get('password', None)
password_ctl = form.get('control_password', None)
if password:
validator = getUtility(IPasswordValidator)
errors = validator.validate_password(password, password_ctl)
if errors:
self.flash( ' '.join(errors))
return
changed_fields = self.applyData(self.context, **data)
if changed_fields:
changed_fields = reduce(lambda x,y: x+y, changed_fields.values())
else:
changed_fields = []
if password:
# Now we know that the form has no errors and can set password ...
self.context.setPassword(password)
changed_fields.append('password')
fields_string = ' + '.join(changed_fields)
if fields_string:
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.logger.info(
'%s - %s edited: %s' % (
ob_class, self.context.name, fields_string))
self.flash(_('User settings have been saved.'))
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context.__parent__))
return
class ContactUserForm(ContactAdminForm):
grok.name('contactuser')
grok.context(IUserAccount)
grok.template('contactform')
grok.require('waeup.manageUsers')
pnav = 0
form_fields = grok.AutoFields(IContactForm).select('body')
def label(self):
return _(u'Send message to ${a}', mapping = {'a':self.context.title})
@action(_('Send message now'), style='primary')
def send(self, *args, **data):
try:
email = self.request.principal.email
except AttributeError:
email = self.config.email_admin
usertype = getattr(self.request.principal,
'user_type', 'system').title()
kofa_utils = getUtility(IKofaUtils)
success = kofa_utils.sendContactForm(
self.request.principal.title,email,
self.context.title,self.context.email,
self.request.principal.id,usertype,self.config.name,
data['body'],self.config.email_subject)
# Success is always True if sendContactForm didn't fail.
# TODO: Catch exceptions.
if success:
self.flash(_('Your message has been sent.'))
return
class UserEditFormPage(UserManageFormPage):
"""Edit a user account by user
"""
grok.name('index')
grok.require('waeup.editUser')
form_fields = grok.AutoFields(IUserAccount).omit(
'name', 'description', 'roles')
label = _(u"My Preferences")
def setUpWidgets(self, ignore_request=False):
super(UserManageFormPage,self).setUpWidgets(ignore_request)
self.widgets['title'].displayWidth = 30
class MyRolesPage(KofaPage):
"""Display site roles and local roles assigned to officers.
"""
grok.name('my_roles')
grok.require('waeup.editUser')
grok.context(IUserAccount)
grok.template('myrolespage')
label = _(u"My Roles")
@property
def getLocalRoles(self):
local_roles = get_user_account(self.request).getLocalRoles()
local_roles_userfriendly = {}
for local_role in local_roles:
role_title = dict(get_all_roles())[local_role].title
local_roles_userfriendly[role_title] = local_roles[local_role]
return local_roles_userfriendly
@property
def getSiteRoles(self):
site_roles = get_user_account(self.request).roles
site_roles_userfriendly = []
for site_role in site_roles:
role_title = dict(get_all_roles())[site_role].title
site_roles_userfriendly.append(role_title)
return site_roles_userfriendly
#
# Search pages...
#
class SearchPage(KofaPage):
"""General search page for the academics section.
"""
grok.context(IFacultiesContainer)
grok.name('search')
grok.template('searchpage')
grok.require('waeup.viewAcademics')
label = _(u"Search Academic Section")
pnav = 1
search_button = _(u'Search')
def update(self, *args, **kw):
datatable.need()
form = self.request.form
self.hitlist = []
self.query = ''
if not 'query' in form:
return
query = form['query']
self.query = query
self.hitlist = search(query=self.query, view=self)
return
#
# Configuration pages...
#
class ConfigurationContainerDisplayFormPage(KofaDisplayFormPage):
"""View page of the configuration container.
"""
grok.require('waeup.managePortalConfiguration')
grok.name('view')
grok.context(IConfigurationContainer)
pnav = 0
label = _(u'View portal configuration')
form_fields = grok.AutoFields(IConfigurationContainer)
form_fields['frontpage'].custom_widget = HTMLDisplayWidget
class ConfigurationContainerManageFormPage(KofaEditFormPage):
"""Manage page of the configuration container. We always use the
manage page in the UI not the view page, thus we use the index name here.
"""
grok.require('waeup.managePortalConfiguration')
grok.name('index')
grok.context(IConfigurationContainer)
grok.template('configurationmanagepage')
pnav = 0
label = _(u'Edit portal configuration')
taboneactions = [_('Save'), _('Update plugins')]
tabtwoactions = [
_('Add session configuration'),
_('Remove selected')]
form_fields = grok.AutoFields(IConfigurationContainer).omit('frontpage_dict')
def update(self):
tabs.need()
self.tab1 = self.tab2 = ''
qs = self.request.get('QUERY_STRING', '')
if not qs:
qs = 'tab1'
setattr(self, qs, 'active')
datatable.need()
warning.need()
return super(ConfigurationContainerManageFormPage, self).update()
def _frontpage(self):
view = ConfigurationContainerDisplayFormPage(
self.context,self.request)
view.setUpWidgets()
return view.widgets['frontpage']()
@action(_('Save'), style='primary')
def save(self, **data):
msave(self, **data)
self.context.frontpage_dict = self._frontpage()
return
@action(_('Add session configuration'), validator=NullValidator,
style='primary')
def addSubunit(self, **data):
self.redirect(self.url(self.context, '@@add'))
return
def getSessionConfigurations(self):
"""Get a list of all stored session configuration objects.
"""
for key, val in self.context.items():
url = self.url(val)
session_string = val.getSessionString()
title = _('Session ${a} Configuration',
mapping = {'a':session_string})
yield(dict(url=url, name=key, title=title))
@jsaction(_('Remove selected'))
def delSessonConfigurations(self, **data):
delSubobjects(self, redirect='@@index', tab='2')
return
@action(_('Update plugins'), validator=NullValidator)
def updatePlugins(self, **data):
grok.getSite().updatePlugins()
self.flash(_('Plugins were updated. See log file for details.'))
return
class SessionConfigurationAddFormPage(KofaAddFormPage):
"""Add a session configuration object to configuration container.
"""
grok.context(IConfigurationContainer)
grok.name('add')
grok.require('waeup.managePortalConfiguration')
label = _('Add session configuration')
form_fields = grok.AutoFields(ISessionConfigurationAdd)
pnav = 0
@action(_('Add Session Configuration'), style='primary')
def addSessionConfiguration(self, **data):
sessionconfiguration = createObject(u'waeup.SessionConfiguration')
self.applyData(sessionconfiguration, **data)
try:
self.context.addSessionConfiguration(sessionconfiguration)
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.logger.info(
'%s - added: %s' % (
ob_class, sessionconfiguration.academic_session))
except KeyError:
self.flash(_('The session chosen already exists.'))
return
self.redirect(self.url(self.context, '@@index')+'?tab2')
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self):
self.redirect(self.url(self.context, '@@index')+'?tab2')
return
class SessionConfigurationManageFormPage(KofaEditFormPage):
"""Manage session configuration object.
"""
grok.context(ISessionConfiguration)
grok.name('index')
grok.require('waeup.managePortalConfiguration')
form_fields = grok.AutoFields(ISessionConfiguration)
pnav = 0
@property
def label(self):
session_string = self.context.getSessionString()
return _('Edit academic session ${a} configuration',
mapping = {'a':session_string})
@action(_('Save'), style='primary')
def save(self, **data):
msave(self, **data)
self.redirect(self.url(self.context.__parent__, '@@index')+'?tab2')
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self):
self.redirect(self.url(self.context.__parent__, '@@index')+'?tab2')
return
#
# Datacenter pages...
#
class DatacenterPage(KofaEditFormPage):
grok.context(IDataCenter)
grok.name('index')
grok.require('waeup.manageDataCenter')
label = _(u'Data Center')
pnav = 0
def update(self):
datatable.need()
warning.need()
return super(DatacenterPage, self).update()
@jsaction(_('Remove selected'))
def delFiles(self, **data):
form = self.request.form
if form.has_key('val_id'):
child_id = form['val_id']
else:
self.flash(_('No item selected.'))
return
if not isinstance(child_id, list):
child_id = [child_id]
deleted = []
for id in child_id:
fullpath = os.path.join(self.context.storage, id)
try:
os.remove(fullpath)
deleted.append(id)
except OSError:
self.flash(_('OSError: The file could not be deleted.'))
return
if len(deleted):
self.flash(_('Successfully deleted: ${a}',
mapping = {'a': ', '.join(deleted)}))
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.logger.info(
'%s - deleted: %s' % (ob_class, ', '.join(deleted)))
return
class DatacenterFinishedPage(KofaEditFormPage):
grok.context(IDataCenter)
grok.name('processed')
grok.require('waeup.manageDataCenter')
label = _(u'Processed Files')
pnav = 0
def update(self):
datatable.need()
return super(DatacenterFinishedPage, self).update()
class DatacenterUploadPage(KofaPage):
grok.context(IDataCenter)
grok.name('upload')
grok.require('waeup.manageDataCenter')
label = _(u'Upload portal data as CSV file')
pnav = 0
upload_button =_(u'Upload')
cancel_button =_(u'Cancel')
def update(self, uploadfile=None, CANCEL=None, SUBMIT=None):
if CANCEL is not None:
self.redirect(self.url(self.context))
return
if not uploadfile:
return
try:
filename = uploadfile.filename
#if 'pending' in filename:
# self.flash(_("You can't re-upload pending data files."))
# return
if not filename.endswith('.csv'):
self.flash(_("Only csv files are allowed."))
return
normalized_filename = self.getNormalizedFileName(filename)
finished_file = os.path.join(
self.context.storage, 'finished', normalized_filename)
unfinished_file = os.path.join(
self.context.storage, 'unfinished', normalized_filename)
if os.path.exists(finished_file) or os.path.exists(unfinished_file):
self.flash(_("File with same name was uploaded earlier."))
return
target = os.path.join(self.context.storage, normalized_filename)
open(target, 'wb').write(uploadfile.read())
os.chmod(target, 0664)
logger = self.context.logger
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
logger.info('%s - uploaded: %s' % (ob_class, target))
except IOError:
self.flash('Error while uploading file. Please retry.')
self.flash('I/O error: %s' % sys.exc_info()[1])
return
self.redirect(self.url(self.context))
def getNormalizedFileName(self, filename):
"""Build sane filename.
An uploaded file foo.csv will be stored as foo_USERNAME.csv
where username is the principal id of the currently logged in
user.
Spaces in filename are replaced by underscore.
Pending data filenames remain unchanged.
"""
if filename.endswith('.pending.csv'):
return filename
username = self.request.principal.id
filename = filename.replace(' ', '_')
# Only accept typical filname chars...
filtered_username = ''.join(re.findall('[a-zA-Z0-9_\.\-]', username))
base, ext = os.path.splitext(filename)
return '%s_%s%s' % (base, filtered_username, ext.lower())
def getImporters(self):
importers = getAllUtilitiesRegisteredFor(IBatchProcessor)
importer_props = []
for x in importers:
iface_fields = schema.getFields(x.iface)
available_fields = []
for key in iface_fields.keys():
iface_fields[key] = (iface_fields[key].__class__.__name__,
iface_fields[key].required)
for value in x.available_fields:
available_fields.append(
dict(f_name=value,
f_type=iface_fields.get(value, (None, False))[0],
f_required=iface_fields.get(value, (None, False))[1]
)
)
available_fields = sorted(available_fields, key=lambda k: k['f_name'])
importer_props.append(
dict(title=x.name, name=x.util_name, fields=available_fields))
return sorted(importer_props, key=lambda k: k['title'])
class FileDownloadView(UtilityView, grok.View):
grok.context(IDataCenter)
grok.name('download')
grok.require('waeup.manageDataCenter')
def update(self, filename=None):
self.filename = self.request.form['filename']
return
def render(self):
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.logger.info(
'%s - downloaded: %s' % (ob_class, self.filename))
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition:', 'attachment; filename="%s' %
self.filename.replace('finished/',''))
fullpath = os.path.join(self.context.storage, self.filename)
return open(fullpath, 'rb').read()
class SkeletonDownloadView(UtilityView, grok.View):
grok.context(IDataCenter)
grok.name('skeleton')
grok.require('waeup.manageDataCenter')
def update(self, processorname=None):
self.processorname = self.request.form['name']
self.filename = ('%s_000.csv' %
self.processorname.replace('processor','import'))
return
def render(self):
#ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
#self.context.logger.info(
# '%s - skeleton downloaded: %s' % (ob_class, self.filename))
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition:', 'attachment; filename="%s' % self.filename)
processor = getUtility(IBatchProcessor, name=self.processorname)
csv_data = processor.get_csv_skeleton()
return csv_data
class DatacenterImportStep1(KofaPage):
"""Manual import step 1: choose file
"""
grok.context(IDataCenter)
grok.name('import1')
grok.template('datacenterimport1page')
grok.require('waeup.importData')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Cancel')
def getFiles(self):
files = self.context.getPendingFiles(sort='date')
for file in files:
name = file.name
if not name.endswith('.csv') and not name.endswith('.pending'):
continue
yield file
def update(self, filename=None, select=None, cancel=None):
if cancel is not None:
self.flash(_('Import aborted.'))
self.redirect(self.url(self.context))
return
if select is not None:
# A filename was selected
session = ISession(self.request)['waeup.kofa']
session['import_filename'] = select
self.redirect(self.url(self.context, '@@import2'))
class DatacenterImportStep2(KofaPage):
"""Manual import step 2: choose processor
"""
grok.context(IDataCenter)
grok.name('import2')
grok.template('datacenterimport2page')
grok.require('waeup.importData')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Cancel')
back_button =_(u'Back to step 1')
proceed_button =_(u'Proceed to step 3')
filename = None
mode = 'create'
importer = None
mode_locked = False
def getPreviewHeader(self):
"""Get the header fields of attached CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def getPreviewTable(self):
"""Get transposed table with 3 sample records.
The first column contains the headers.
"""
if not self.reader:
return
header = self.getPreviewHeader()
num = 0
data = []
for line in self.reader:
if num > 2:
break
num += 1
data.append(line)
result = []
for name in header:
result_line = []
result_line.append(name)
for d in data:
result_line.append(d[name])
result.append(result_line)
return result
def getImporters(self):
importers = getAllUtilitiesRegisteredFor(IBatchProcessor)
importers = sorted(
[dict(title=x.name, name=x.util_name) for x in importers])
return importers
def getModeFromFilename(self, filename):
"""Lookup filename or path and return included mode name or None.
"""
if not filename.endswith('.pending.csv'):
return None
base = os.path.basename(filename)
parts = base.rsplit('.', 3)
if len(parts) != 4:
return None
if parts[1] not in ['create', 'update', 'remove']:
return None
return parts[1]
def getWarnings(self):
import sys
result = []
try:
headerfields = self.getPreviewHeader()
headerfields_clean = list(set(headerfields))
if len(headerfields) > len(headerfields_clean):
result.append(
_("Double headers: each column name may only appear once. "))
except:
fatal = '%s' % sys.exc_info()[1]
result.append(fatal)
if result:
warnings = ""
for line in result:
warnings += line + '
'
warnings += _('Replace imported file!')
return warnings
return False
def update(self, mode=None, importer=None,
back1=None, cancel=None, proceed=None):
session = ISession(self.request)['waeup.kofa']
self.filename = session.get('import_filename', None)
if self.filename is None or back1 is not None:
self.redirect(self.url(self.context, '@@import1'))
return
if cancel is not None:
self.flash(_('Import aborted.'))
self.redirect(self.url(self.context))
return
self.mode = mode or session.get('import_mode', self.mode)
filename_mode = self.getModeFromFilename(self.filename)
if filename_mode is not None:
self.mode = filename_mode
self.mode_locked = True
self.importer = importer or session.get('import_importer', None)
session['import_importer'] = self.importer
if self.importer and 'update' in self.importer:
if self.mode != 'update':
self.flash(_('Update mode only!'))
self.mode_locked = True
self.mode = 'update'
proceed = None
session['import_mode'] = self.mode
if proceed is not None:
self.redirect(self.url(self.context, '@@import3'))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
warnings = self.getWarnings()
if not warnings:
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
else:
self.reader = ()
self.flash(warnings)
class DatacenterImportStep3(KofaPage):
"""Manual import step 3: modify header
"""
grok.context(IDataCenter)
grok.name('import3')
grok.template('datacenterimport3page')
grok.require('waeup.importData')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Cancel')
reset_button =_(u'Reset')
update_button =_(u'Set headerfields')
back_button =_(u'Back to step 2')
proceed_button =_(u'Perform import')
filename = None
mode = None
importername = None
@property
def nextstep(self):
return self.url(self.context, '@@import4')
def getPreviewHeader(self):
"""Get the header fields of attached CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def getPreviewTable(self):
"""Get transposed table with 1 sample record.
The first column contains the headers.
"""
if not self.reader:
return
headers = self.getPreviewHeader()
num = 0
data = []
for line in self.reader:
if num > 0:
break
num += 1
data.append(line)
result = []
field_num = 0
for name in headers:
result_line = []
result_line.append(field_num)
field_num += 1
for d in data:
result_line.append(d[name])
result.append(result_line)
return result
def getPossibleHeaders(self):
"""Get the possible headers.
The headers are described as dicts {value:internal_name,
title:displayed_name}
"""
result = [dict(title='', value='--IGNORE--')]
headers = self.importer.getHeaders()
result.extend([dict(title=x, value=x) for x in headers])
return result
def getWarnings(self):
import sys
result = []
try:
self.importer.checkHeaders(self.headerfields, mode=self.mode)
except:
fatal = '%s' % sys.exc_info()[1]
result.append(fatal)
if result:
warnings = ""
for line in result:
warnings += line + '
'
warnings += _('Edit headers or replace imported file!')
return warnings
return False
def update(self, headerfield=None, back2=None, cancel=None, proceed=None):
datatable.need()
session = ISession(self.request)['waeup.kofa']
self.filename = session.get('import_filename', None)
self.mode = session.get('import_mode', None)
self.importername = session.get('import_importer', None)
if None in (self.filename, self.mode, self.importername):
self.redirect(self.url(self.context, '@@import2'))
return
if back2 is not None:
self.redirect(self.url(self.context ,'@@import2'))
return
if cancel is not None:
self.flash(_('Import aborted.'))
self.redirect(self.url(self.context))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
self.headerfields = headerfield or self.getPreviewHeader()
session['import_headerfields'] = self.headerfields
if proceed is not None:
self.redirect(self.url(self.context, '@@import4'))
return
self.importer = getUtility(IBatchProcessor, name=self.importername)
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
warnings = self.getWarnings()
if warnings:
self.flash(warnings)
class DatacenterImportStep4(KofaPage):
"""Manual import step 4: do actual import
"""
grok.context(IDataCenter)
grok.name('import4')
grok.template('datacenterimport4page')
grok.require('waeup.importData')
label = _(u'Process CSV file')
pnav = 0
show_button =_(u'View processing log')
back_button =_(u'Back to data center')
filename = None
mode = None
importername = None
headerfields = None
warnnum = None
def update(self, back=None, finish=None, showlog=None):
if finish is not None:
self.redirect(self.url(self.context))
return
session = ISession(self.request)['waeup.kofa']
self.filename = session.get('import_filename', None)
self.mode = session.get('import_mode', None)
self.importername = session.get('import_importer', None)
self.headerfields = session.get('import_headerfields', None)
if None in (self.filename, self.mode, self.importername,
self.headerfields):
self.redirect(self.url(self.context, '@@import3'))
return
if showlog is not None:
logfilename = "datacenter.log"
session['logname'] = logfilename
self.redirect(self.url(self.context, '@@show'))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
self.importer = getUtility(IBatchProcessor, name=self.importername)
# Perform batch processing...
# XXX: This might be better placed in datacenter module.
(linenum, self.warn_num,
fin_path, pending_path) = self.importer.doImport(
self.fullpath, self.headerfields, self.mode,
self.request.principal.id, logger=self.context.logger)
# Put result files in desired locations...
self.context.distProcessedFiles(
self.warn_num == 0, self.fullpath, fin_path, pending_path,
self.mode)
if self.warn_num:
self.flash(_('Processing of ${a} rows failed.',
mapping = {'a':self.warn_num}))
self.flash(_('Successfully processed ${a} rows.',
mapping = {'a':linenum - self.warn_num}))
class DatacenterLogsOverview(KofaPage):
grok.context(IDataCenter)
grok.name('logs')
grok.template('datacenterlogspage')
grok.require('waeup.manageDataCenter')
label = _(u'Show logfiles')
pnav = 0
back_button = _(u'Back to Data Center')
show_button = _(u'Show')
def update(self, back=None):
if back is not None:
self.redirect(self.url(self.context))
return
self.files = self.context.getLogFiles()
class DatacenterLogsFileview(KofaPage):
grok.context(IDataCenter)
grok.name('show')
grok.template('datacenterlogsshowfilepage')
grok.require('waeup.manageDataCenter')
title = _(u'Data Center')
pnav = 0
search_button = _('Search')
back_button = _('Back')
placeholder = _('Enter a regular expression here...')
def label(self):
return "Logfile %s" % self.filename
def update(self, back=None, query=None, logname=None):
if os.name != 'posix':
self.flash(
_('Log files can only be searched ' +
'on Unix-based operating systems.'))
self.redirect(self.url(self.context, '@@logs'))
return
if back is not None or logname is None:
self.redirect(self.url(self.context, '@@logs'))
return
self.filename = logname
self.query = query
if search is None or not query:
return
try:
self.result = ''.join(
self.context.queryLogfiles(logname, query))
except ValueError:
self.flash(_('Invalid search expression.'))
return
if not self.result:
self.flash(_('No search results found.'))
return
class DatacenterSettings(KofaPage):
grok.context(IDataCenter)
grok.name('manage')
grok.template('datacentermanagepage')
grok.require('waeup.managePortal')
label = _('Edit data center settings')
pnav = 0
save_button =_(u'Save')
reset_button =_(u'Reset')
cancel_button =_(u'Cancel')
def update(self, newpath=None, move=False, overwrite=False,
save=None, cancel=None):
if move:
move = True
if overwrite:
overwrite = True
if newpath is None:
return
if cancel is not None:
self.redirect(self.url(self.context))
return
try:
not_copied = self.context.setStoragePath(newpath, move=move)
for name in not_copied:
self.flash(_('File already existed (not copied): ${a}',
mapping = {'a':name}))
except:
self.flash(_('Given storage path cannot be used.'))
self.flash(_('Error: ${a}', mapping = {'a':sys.exc_info()[1]}))
return
if newpath:
self.flash(_('New storage path succefully set.'))
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.logger.info(
'%s - storage path set: %s' % (ob_class, newpath))
self.redirect(self.url(self.context))
return
class ExportCSVPage(KofaPage):
grok.context(IDataCenter)
grok.name('export')
grok.template('datacenterexportpage')
grok.require('waeup.manageDataCenter')
label = _('Download portal data as CSV file')
pnav = 0
export_button = _(u'Create CSV file')
_running_exports = None
def getExporters(self):
utils = getUtilitiesFor(ICSVExporter)
title_name_tuples = [
(util.title, name) for name, util in utils]
return sorted(title_name_tuples)
def job_finished(self, status):
return status == 'completed'
def getRunningExports(self):
"""Returns running exports as list of tuples.
Only exports triggered by the current user (identified by
principal.id) are returned.
Each tuple has the form (, , ).
``STATUS``:
the status as machine readable string (something like
``'completed'``)
``STATUS_TITLE``:
status of export as translated string.
``EXPORTER_NAME``:
string representing the exporter title used when triggering
the export job.
"""
if self._running_exports is None:
self._running_exports = self._getRunningExports()
return self._running_exports
def _getRunningExports(self):
result = self.context.get_export_jobs_status(self.user_id)
return result
def update(self, export=None, start_export=None, exporter=None,
discard=None, download=None):
self.user_id = self.request.principal.id
if discard:
myjobs = self.context.get_running_export_jobs(self.user_id)
for entry in myjobs:
self.context.delete_export_entry(entry)
self.flash(_('Discarded export result'))
return
if download:
myjobs = self.context.get_running_export_jobs(self.user_id)
if not len(myjobs):
self.flash(_('This export was already discarded.'))
return
job_id = myjobs[0][0]
self.redirect(
self.url(self.context, 'export.csv') + '?job_id=%s' % job_id)
return
if None in (start_export, exporter):
return
job_id = self.context.start_export_job(
exporter, self.request.principal.id)
self.redirect(self.url(self.context, 'export'))
return
class ExportCSVView(grok.View):
grok.context(IDataCenter)
grok.name('export.csv')
grok.require('waeup.manageDataCenter')
def render(self, exporter=None, job_id=None):
manager = getUtility(IJobManager)
job = manager.get(job_id)
if job is None:
return
if hasattr(job.result, 'traceback'):
# XXX: Some error happened. Do something more approriate here...
return
path = job.result
if not os.path.exists(path):
# XXX: Do something more appropriate here...
return
result = open(path, 'rb').read()
acronym = grok.getSite()['configuration'].acronym.replace(' ','')
filename = "%s_%s" % (acronym, os.path.basename(path))
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition', 'attachment; filename="%s' % filename)
# remove job and running_exports entry from context
self.context.delete_export_entry(
self.context.entry_from_job_id(job_id))
return result
class ExportXMLPage(grok.View):
"""Deliver an XML representation of the context.
"""
grok.name('export.xml')
grok.require('waeup.managePortal')
def render(self):
exporter = IKofaXMLExporter(self.context)
xml = exporter.export().read()
self.response.setHeader(
'Content-Type', 'text/xml; charset=UTF-8')
return xml
class ImportXMLPage(KofaPage):
"""Replace the context object by an object created from an XML
representation.
XXX: This does not work for ISite objects, i.e. for instance
for complete University objects.
"""
grok.name('importxml')
grok.require('waeup.managePortal')
def update(self, xmlfile=None, CANCEL=None, SUBMIT=None):
if CANCEL is not None:
self.redirect(self.url(self.context))
return
if not xmlfile:
return
importer = IKofaXMLImporter(self.context)
obj = importer.doImport(xmlfile)
if type(obj) != type(self.context):
return
parent = self.context.__parent__
name = self.context.__name__
self.context = obj
if hasattr(parent, name):
setattr(parent, name, obj)
else:
del parent[name]
parent[name] = obj
pass
return
#
# FacultiesContainer pages...
#
class FacultiesContainerPage(KofaPage):
""" Index page for faculty containers.
"""
grok.context(IFacultiesContainer)
grok.require('waeup.viewAcademics')
grok.name('index')
label = _('Academic Section')
pnav = 1
grok.template('facultypage')
class FacultiesContainerManageFormPage(KofaEditFormPage):
"""Manage the basic properties of a `Faculty` instance.
"""
grok.context(IFacultiesContainer)
grok.name('manage')
grok.require('waeup.manageAcademics')
grok.template('facultiescontainermanagepage')
pnav = 1
taboneactions = [_('Add faculty'), _('Remove selected'),_('Cancel')]
subunits = _('Faculties')
@property
def label(self):
return _('Manage academic section')
def update(self):
warning.need()
return super(FacultiesContainerManageFormPage, self).update()
@jsaction(_('Remove selected'))
def delFaculties(self, **data):
delSubobjects(self, redirect='@@manage', tab='1')
return
@action(_('Add faculty'), validator=NullValidator)
def addFaculty(self, **data):
self.redirect(self.url(self.context, '@@add'))
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
class FacultyAddFormPage(KofaAddFormPage):
""" Page form to add a new faculty to a faculty container.
"""
grok.context(IFacultiesContainer)
grok.require('waeup.manageAcademics')
grok.name('add')
label = _('Add faculty')
form_fields = grok.AutoFields(IFacultyAdd)
pnav = 1
@action(_('Add faculty'), style='primary')
def addFaculty(self, **data):
faculty = createObject(u'waeup.Faculty')
self.applyData(faculty, **data)
try:
self.context.addFaculty(faculty)
except KeyError:
self.flash(_('The faculty code chosen already exists.'))
return
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.logger.info(
'%s - added: %s' % (ob_class, faculty.code))
self.redirect(self.url(self.context, u'@@manage')+'?tab1')
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
#
# Faculty pages
#
class FacultyPage(KofaPage):
"""Index page of faculties.
"""
grok.context(IFaculty)
grok.require('waeup.viewAcademics')
grok.name('index')
pnav = 1
@property
def label(self):
return _('Departments')
class FacultyManageFormPage(KofaEditFormPage):
"""Manage the basic properties of a `Faculty` instance.
"""
grok.context(IFaculty)
grok.name('manage')
grok.require('waeup.manageAcademics')
grok.template('facultymanagepage')
pnav = 1
subunits = _('Departments')
taboneactions = [_('Save'),_('Cancel')]
tabtwoactions = [_('Add department'), _('Remove selected'),_('Cancel')]
tabthreeactions1 = [_('Remove selected local roles')]
tabthreeactions2 = [_('Add local role')]
form_fields = grok.AutoFields(IFaculty)
@property
def label(self):
return _('Manage faculty')
def update(self):
tabs.need()
self.tab1 = self.tab2 = self.tab3 = ''
qs = self.request.get('QUERY_STRING', '')
if not qs:
qs = 'tab1'
setattr(self, qs, 'active')
warning.need()
datatable.need()
return super(FacultyManageFormPage, self).update()
def getLocalRoles(self):
roles = ILocalRolesAssignable(self.context)
return roles()
def getUsers(self):
return get_all_users()
def getUsersWithLocalRoles(self):
return get_users_with_local_roles(self.context)
@jsaction(_('Remove selected'))
def delDepartments(self, **data):
delSubobjects(self, redirect='@@manage', tab='2')
return
@action(_('Save'), style='primary')
def save(self, **data):
return msave(self, **data)
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
@action(_('Add department'), validator=NullValidator)
def addSubunit(self, **data):
self.redirect(self.url(self.context, '@@add'))
return
@action(_('Add local role'), validator=NullValidator)
def addLocalRole(self, **data):
return add_local_role(self, '3', **data)
@action(_('Remove selected local roles'))
def delLocalRoles(self, **data):
return del_local_roles(self,3,**data)
class DepartmentAddFormPage(KofaAddFormPage):
"""Add a department to a faculty.
"""
grok.context(IFaculty)
grok.name('add')
grok.require('waeup.manageAcademics')
label = _('Add department')
form_fields = grok.AutoFields(IDepartmentAdd)
pnav = 1
@action(_('Add department'), style='primary')
def addDepartment(self, **data):
department = createObject(u'waeup.Department')
self.applyData(department, **data)
try:
self.context.addDepartment(department)
except KeyError:
self.flash(_('The code chosen already exists in this faculty.'))
return
self.status = self.flash(
_("Department ${a} added.", mapping = {'a':data['code']}))
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.__parent__.logger.info(
'%s - added: %s' % (ob_class, data['code']))
self.redirect(self.url(self.context, u'@@manage')+'?tab2')
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
#
# Department pages
#
class DepartmentPage(KofaPage):
"""Department index page.
"""
grok.context(IDepartment)
grok.require('waeup.viewAcademics')
grok.name('index')
pnav = 1
label = _('Courses and Certificates')
def update(self):
tabs.need()
datatable.need()
super(DepartmentPage, self).update()
return
def getCourses(self):
"""Get a list of all stored courses.
"""
for key, val in self.context.courses.items():
url = self.url(val)
yield(dict(url=url, name=key, container=val))
def getCertificates(self):
"""Get a list of all stored certificates.
"""
for key, val in self.context.certificates.items():
url = self.url(val)
yield(dict(url=url, name=key, container=val))
class ShowStudentsInDepartmentPage(KofaPage):
"""Page that lists all students in the department.
"""
grok.context(IDepartment)
grok.require('waeup.showStudents')
grok.name('showdepstudents')
grok.template('showstudentspage')
pnav = 1
label = _('Students')
@property
def getStudents(self):
hitlist = searchstudents(query=self.context.code,
searchtype='depcode', view=self)
return hitlist
def update(self, *args, **kw):
datatable.need()
return
class ShowStudentsInCertificatePage(ShowStudentsInDepartmentPage):
"""Page that lists all students studying a certificate.
"""
grok.context(ICertificate)
grok.require('waeup.showStudents')
grok.name('showcertstudents')
pnav = 1
label = _('Students')
@property
def getStudents(self):
hitlist = searchstudents(query=self.context.code,
searchtype='certcode', view=self)
return hitlist
class DepartmentManageFormPage(KofaEditFormPage):
"""Manage the basic properties of a `Department` instance.
"""
grok.context(IDepartment)
grok.name('manage')
grok.require('waeup.manageAcademics')
pnav = 1
grok.template('departmentmanagepage')
taboneactions = [_('Save'),_('Cancel')]
tabtwoactions = [_('Add course'), _('Remove selected courses'),_('Cancel')]
tabthreeactions = [_('Add certificate'), _('Remove selected certificates'),
_('Cancel')]
tabfouractions1 = [_('Remove selected local roles')]
tabfouractions2 = [_('Add local role')]
form_fields = grok.AutoFields(IDepartment)
@property
def label(self):
return _('Manage department')
def getCourses(self):
"""Get a list of all stored courses.
"""
for key, val in self.context.courses.items():
url = self.url(val)
yield(dict(url=url, name=key, container=val))
def getCertificates(self):
"""Get a list of all stored certificates.
"""
for key, val in self.context.certificates.items():
url = self.url(val)
yield(dict(url=url, name=key, container=val))
def update(self):
tabs.need()
self.tab1 = self.tab2 = self.tab3 = self.tab4 = ''
qs = self.request.get('QUERY_STRING', '')
if not qs:
qs = 'tab1'
setattr(self, qs, 'active')
warning.need()
datatable.need()
super(DepartmentManageFormPage, self).update()
return
def getLocalRoles(self):
roles = ILocalRolesAssignable(self.context)
return roles()
def getUsers(self):
return get_all_users()
def getUsersWithLocalRoles(self):
return get_users_with_local_roles(self.context)
@action(_('Save'), style='primary')
def save(self, **data):
return msave(self, **data)
@jsaction(_('Remove selected courses'))
def delCourses(self, **data):
delSubobjects(
self, redirect='@@manage', tab='2', subcontainer='courses')
return
@jsaction(_('Remove selected certificates'))
def delCertificates(self, **data):
delSubobjects(
self, redirect='@@manage', tab='3', subcontainer='certificates')
return
@action(_('Add course'), validator=NullValidator)
def addCourse(self, **data):
self.redirect(self.url(self.context, 'addcourse'))
return
@action(_('Add certificate'), validator=NullValidator)
def addCertificate(self, **data):
self.redirect(self.url(self.context, 'addcertificate'))
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
@action(_('Add local role'), validator=NullValidator)
def addLocalRole(self, **data):
return add_local_role(self, 4, **data)
@action(_('Remove selected local roles'))
def delLocalRoles(self, **data):
return del_local_roles(self,4,**data)
class CourseAddFormPage(KofaAddFormPage):
"""Add-form to add course to a department.
"""
grok.context(IDepartment)
grok.name('addcourse')
grok.require('waeup.manageAcademics')
label = _(u'Add course')
form_fields = grok.AutoFields(ICourseAdd)
pnav = 1
@action(_('Add course'))
def addCourse(self, **data):
course = createObject(u'waeup.Course')
self.applyData(course, **data)
try:
self.context.courses.addCourse(course)
except DuplicationError, error:
# signals duplication error
entries = error.entries
for entry in entries:
# do not use error.msg but render a more detailed message instead
# and show links to all certs with same code
message = _('A course with same code already exists: ')
message += '%s' % (
self.url(entry), getattr(entry, '__name__', u'Unnamed'))
self.flash(message)
self.redirect(self.url(self.context, u'@@addcourse'))
return
message = _(u'Course ${a} successfully created.', mapping = {'a':course.code})
self.flash(message)
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.__parent__.__parent__.logger.info(
'%s - added: %s' % (ob_class, data['code']))
self.redirect(self.url(self.context, u'@@manage')+'?tab2')
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
class CertificateAddFormPage(KofaAddFormPage):
"""Add-form to add certificate to a department.
"""
grok.context(IDepartment)
grok.name('addcertificate')
grok.require('waeup.manageAcademics')
label = _(u'Add certificate')
form_fields = grok.AutoFields(ICertificateAdd)
pnav = 1
@action(_('Add certificate'))
def addCertificate(self, **data):
certificate = createObject(u'waeup.Certificate')
self.applyData(certificate, **data)
try:
self.context.certificates.addCertificate(certificate)
except DuplicationError, error:
# signals duplication error
entries = error.entries
for entry in entries:
# do not use error.msg but render a more detailed message instead
# and show links to all certs with same code
message = _('A certificate with same code already exists: ')
message += '%s' % (
self.url(entry), getattr(entry, '__name__', u'Unnamed'))
self.flash(message)
self.redirect(self.url(self.context, u'@@addcertificate'))
return
message = _(u'Certificate ${a} successfully created.', mapping = {'a':certificate.code})
self.flash(message)
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.__parent__.__parent__.__parent__.logger.info(
'%s - added: %s' % (ob_class, data['code']))
self.redirect(self.url(self.context, u'@@manage')+'?tab3')
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self): #, **data):
self.redirect(self.url(self.context))
return
#
# Courses pages
#
class CoursePage(KofaDisplayFormPage):
"""Course index page.
"""
grok.context(ICourse)
grok.name('index')
grok.require('waeup.viewAcademics')
pnav = 1
form_fields = grok.AutoFields(ICourse)
@property
def label(self):
return '%s (%s)' % (self.context.title, self.context.code)
class CourseManageFormPage(KofaEditFormPage):
"""Edit form page for courses.
"""
grok.context(ICourse)
grok.name('manage')
grok.require('waeup.manageAcademics')
label = _(u'Edit course')
pnav = 1
form_fields = grok.AutoFields(ICourse)
@action(_('Save'), style='primary')
def save(self, **data):
return msave(self, **data)
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
#
# Certificate pages
#
class CertificatePage(KofaDisplayFormPage):
"""Index page for certificates.
"""
grok.context(ICertificate)
grok.name('index')
grok.require('waeup.viewAcademics')
pnav = 1
form_fields = grok.AutoFields(ICertificate)
grok.template('certificatepage')
@property
def label(self):
return "%s (%s)" % (self.context.title, self.context.code)
def update(self):
datatable.need()
return super(CertificatePage, self).update()
class CertificateManageFormPage(KofaEditFormPage):
"""Manage the properties of a `Certificate` instance.
"""
grok.context(ICertificate)
grok.name('manage')
grok.require('waeup.manageAcademics')
pnav = 1
label = _('Edit certificate')
form_fields = grok.AutoFields(ICertificate)
pnav = 1
grok.template('certificatemanagepage')
taboneactions = [_('Save'),_('Cancel')]
tabtwoactions = [_('Add certificate course'),
_('Remove selected certificate courses'),_('Cancel')]
tabthreeactions1 = [_('Remove selected local roles')]
tabthreeactions2 = [_('Add local role')]
@property
def label(self):
return _('Manage certificate')
def update(self):
tabs.need()
self.tab1 = self.tab2 = self.tab3 = ''
qs = self.request.get('QUERY_STRING', '')
if not qs:
qs = 'tab1'
setattr(self, qs, 'active')
warning.need()
datatable.need()
return super(CertificateManageFormPage, self).update()
@action(_('Save'), style='primary')
def save(self, **data):
return msave(self, **data)
@jsaction(_('Remove selected certificate courses'))
def delCertificateCourses(self, **data):
delSubobjects(self, redirect='@@manage', tab='2')
return
@action(_('Add certificate course'), validator=NullValidator)
def addCertificateCourse(self, **data):
self.redirect(self.url(self.context, 'addcertificatecourse'))
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
def getLocalRoles(self):
roles = ILocalRolesAssignable(self.context)
return roles()
def getUsers(self):
return get_all_users()
def getUsersWithLocalRoles(self):
return get_users_with_local_roles(self.context)
@action(_('Add local role'), validator=NullValidator)
def addLocalRole(self, **data):
return add_local_role(self, 3, **data)
@action(_('Remove selected local roles'))
def delLocalRoles(self, **data):
return del_local_roles(self,3,**data)
class CertificateCourseAddFormPage(KofaAddFormPage):
"""Add-page to add a course ref to a certificate
"""
grok.context(ICertificate)
grok.name('addcertificatecourse')
grok.require('waeup.manageAcademics')
form_fields = grok.AutoFields(ICertificateCourseAdd)
pnav = 1
label = _('Add certificate course')
@action(_('Add certificate course'))
def addCertcourse(self, **data):
try:
self.context.addCertCourse(**data)
except KeyError:
self.status = self.flash(_('The chosen certificate course is already '
'part of this certificate.'))
return
self.status = self.flash(
_("certificate course ${a}_${b} added.",
mapping = {'a': data['course'].code, 'b': data['level']}))
code = "%s_%s" % (data['course'].code, data['level'])
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
grok.getSite().logger.info('%s - added: %s' % (ob_class, code))
self.redirect(self.url(self.context, u'@@manage')+'?tab2')
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
#
# Certificate course pages...
#
class CertificateCoursePage(KofaPage):
"""CertificateCourse index page.
"""
grok.context(ICertificateCourse)
grok.name('index')
grok.require('waeup.viewAcademics')
pnav = 1
#form_fields = grok.AutoFields(ICertificateCourse)
@property
def label(self):
return '%s' % (self.context.longtitle())
@property
def leveltitle(self):
return course_levels.getTerm(self.context.level).title
class CertificateCourseManageFormPage(KofaEditFormPage):
"""Manage the basic properties of a `CertificateCourse` instance.
"""
grok.context(ICertificateCourse)
grok.name('manage')
grok.require('waeup.manageAcademics')
form_fields = grok.AutoFields(ICertificateCourse)
label = _('Edit certificate course')
pnav = 1
@action(_('Save and return'), style='primary')
def saveAndReturn(self, **data):
parent = self.context.__parent__
if self.context.level == data['level']:
msave(self, **data)
else:
# The level changed. We have to create a new certcourse as
# the key of the entry will change as well...
old_level = self.context.level
data['course'] = self.context.course
parent.addCertCourse(**data)
parent.delCertCourse(data['course'].code, level=old_level)
self.flash(_('Form has been saved.'))
old_code = "%s_%s" % (data['course'].code, old_level)
new_code = "%s_%s" % (data['course'].code, data['level'])
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
grok.getSite().logger.info(
'%s - renamed: %s to %s' % (ob_class, old_code, new_code))
self.redirect(self.url(parent))
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context))
return
class ChangePasswordRequestPage(KofaForm):
"""Captcha'd page for all kind of users to request a password change.
"""
grok.context(IUniversity)
grok.name('changepw')
grok.require('waeup.Anonymous')
grok.template('changepw')
label = _('Send me a new password')
form_fields = grok.AutoFields(IChangePassword)
def update(self):
# Handle captcha
self.captcha = getUtility(ICaptchaManager).getCaptcha()
self.captcha_result = self.captcha.verify(self.request)
self.captcha_code = self.captcha.display(self.captcha_result.error_code)
return
def _searchUser(self, identifier, email):
# Search student
cat = queryUtility(ICatalog, name='students_catalog')
results = cat.searchResults(
#reg_number=(identifier, identifier),
email=(email,email))
for result in results:
if result.student_id == identifier or \
result.reg_number == identifier or \
result.matric_number == identifier:
return result
# Search applicant
cat = queryUtility(ICatalog, name='applicants_catalog')
if cat is not None:
results = cat.searchResults(
#reg_number=(identifier, identifier),
email=(email,email))
for result in results:
if result.applicant_id == identifier or \
result.reg_number == identifier:
return result
# Search portal user
user = grok.getSite()['users'].get(identifier, None)
if user is not None and user.email == email:
return user
return None
@action(_('Send login credentials to email address'), style='primary')
def request(self, **data):
if not self.captcha_result.is_valid:
# Captcha will display error messages automatically.
# No need to flash something.
return
# Search student or applicant
identifier = data['identifier']
email = data['email']
user = self._searchUser(identifier, email)
if user is None:
self.flash(_('No record found.'))
return
# Change password
kofa_utils = getUtility(IKofaUtils)
password = kofa_utils.genPassword()
mandate = PasswordMandate()
mandate.params['password'] = password
mandate.params['user'] = user
site = grok.getSite()
site['mandates'].addMandate(mandate)
# Send email with credentials
args = {'mandate_id':mandate.mandate_id}
mandate_url = self.url(site) + '/mandate?%s' % urlencode(args)
url_info = u'Confirmation link: %s' % mandate_url
msg = _('You have successfully requested a password for the')
success = kofa_utils.sendCredentials(
IUserAccount(user),password,url_info,msg)
if success:
self.flash(_('An email with your user name and password ' +
'has been sent to ${a}.', mapping = {'a':email}))
else:
self.flash(_('An smtp server error occurred.'))
ob_class = self.__implemented__.__name__.replace('waeup.kofa.','')
self.context.logger.info(
'%s - %s - %s' % (ob_class, data['identifier'], data['email']))
return