## $Id: pages.py 12525 2015-01-30 11:24:26Z henrik $
##
## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
""" Viewing components for Ikoba objects.
"""
# XXX: All csv ops should move to a dedicated module soon
import unicodecsv as csv
import grok
import os
import re
import sys
from datetime import datetime, timedelta
from urllib import urlencode
from hurry.query import Eq, Text
from hurry.query.query import Query
from zope import schema
from zope.i18n import translate
from zope.authentication.interfaces import (
IAuthentication, IUnauthenticatedPrincipal, ILogout)
from zope.catalog.interfaces import ICatalog
from zope.component import (
getUtility, queryUtility, createObject, getAllUtilitiesRegisteredFor,
getUtilitiesFor,
)
from zope.event import notify
from zope.security import checkPermission
from zope.securitypolicy.interfaces import IPrincipalRoleManager
from zope.session.interfaces import ISession
from zope.password.interfaces import IPasswordManager
from waeup.ikoba.utils.helpers import html2dict
from waeup.ikoba.browser.layout import (
IkobaPage, IkobaForm, IkobaEditFormPage, IkobaAddFormPage,
IkobaDisplayFormPage, NullValidator)
from waeup.ikoba.browser.interfaces import (
ICompany, ICaptchaManager, IChangePassword)
from waeup.ikoba.browser.layout import jsaction, action, UtilityView
from waeup.ikoba.interfaces import MessageFactory as _
from waeup.ikoba.interfaces import(
IIkobaObject, IUsersContainer, IUserAccount, IDataCenter,
IIkobaXMLImporter, IIkobaXMLExporter, IBatchProcessor,
ILocalRolesAssignable, DuplicationError, IConfigurationContainer,
IJobManager,
IPasswordValidator, IContactForm, IIkobaUtils, ICSVExporter)
from waeup.ikoba.permissions import (
get_users_with_local_roles, get_all_roles, get_all_users,
get_users_with_role)
from waeup.ikoba.authentication import LocalRoleSetEvent
from waeup.ikoba.utils.helpers import get_user_account, check_csv_charset
from waeup.ikoba.mandates.mandate import PasswordMandate
from waeup.ikoba.datacenter import DataCenterFile
FORBIDDEN_CHARACTERS = (160,)
grok.context(IIkobaObject)
grok.templatedir('templates')
def add_local_role(view, tab, **data):
localrole = view.request.form.get('local_role', None)
user = view.request.form.get('user', None)
if user is None or localrole is None:
view.flash('No user selected.', type='danger')
view.redirect(view.url(view.context, '@@manage')+'#tab%s' % tab)
return
role_manager = IPrincipalRoleManager(view.context)
role_manager.assignRoleToPrincipal(localrole, user)
notify(LocalRoleSetEvent(view.context, localrole, user, granted=True))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
grok.getSite().logger.info(
'%s - added: %s|%s' % (ob_class, user, localrole))
view.redirect(view.url(view.context, u'@@manage')+'#tab%s' % tab)
return
def del_local_roles(view, tab, **data):
child_ids = view.request.form.get('role_id', None)
if child_ids is None:
view.flash(_('No local role selected.'), type='danger')
view.redirect(view.url(view.context, '@@manage')+'#tab%s' % tab)
return
if not isinstance(child_ids, list):
child_ids = [child_ids]
deleted = []
role_manager = IPrincipalRoleManager(view.context)
for child_id in child_ids:
localrole = child_id.split('|')[1]
user_name = child_id.split('|')[0]
try:
role_manager.unsetRoleForPrincipal(localrole, user_name)
notify(LocalRoleSetEvent(
view.context, localrole, user_name, granted=False))
deleted.append(child_id)
except:
view.flash('Could not remove %s: %s: %s' % (
child_id, sys.exc_info()[0], sys.exc_info()[1]),
type='danger')
if len(deleted):
view.flash(
_('Local role successfully removed: ${a}',
mapping = {'a':', '.join(deleted)}))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
grok.getSite().logger.info(
'%s - removed: %s' % (ob_class, ', '.join(deleted)))
view.redirect(view.url(view.context, u'@@manage')+'#tab%s' % tab)
return
def delSubobjects(view, redirect, tab=None, subcontainer=None):
form = view.request.form
if 'val_id' in form:
child_id = form['val_id']
else:
view.flash(_('No item selected.'), type='danger')
if tab:
view.redirect(view.url(view.context, redirect)+'#tab%s' % tab)
else:
view.redirect(view.url(view.context, redirect))
return
if not isinstance(child_id, list):
child_id = [child_id]
deleted = []
for id in child_id:
try:
if subcontainer:
container = getattr(view.context, subcontainer, None)
del container[id]
else:
del view.context[id]
deleted.append(id)
except:
view.flash('Could not delete %s: %s: %s' % (
id, sys.exc_info()[0], sys.exc_info()[1]), type='danger')
if len(deleted):
view.flash(_('Successfully removed: ${a}',
mapping = {'a': ', '.join(deleted)}))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
grok.getSite().logger.info(
'%s - removed: %s' % (ob_class, ', '.join(deleted)))
if tab:
view.redirect(view.url(view.context, redirect)+'#tab%s' % tab)
else:
view.redirect(view.url(view.context, redirect))
return
def getPreviewTable(view, n):
"""Get transposed table with n sample record.
The first column contains the headers.
"""
if not view.reader:
return
header = view.getPreviewHeader()
num = 0
data = []
for line in view.reader:
if num > n-1:
break
num += 1
data.append(line)
result = []
for name in header:
result_line = []
result_line.append(name)
for d in data:
result_line.append(d[name])
result.append(result_line)
return result
# Save function used for save methods in pages
def msave(view, **data):
changed_fields = view.applyData(view.context, **data)
# Turn list of lists into single list
if changed_fields:
changed_fields = reduce(lambda x,y: x+y, changed_fields.values())
fields_string = ' + '.join(changed_fields)
view.flash(_('Form has been saved.'))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
if fields_string:
grok.getSite().logger.info('%s - %s - saved: %s' % (ob_class, view.context.__name__, fields_string))
return
def doll_up(view, user=None):
"""Doll up export jobs for displaying in table.
"""
job_entries = view.context.get_running_export_jobs(user)
job_manager = getUtility(IJobManager)
entries = []
for job_id, exporter_name, user_id in job_entries:
job = job_manager.get(job_id)
exporter = getUtility(ICSVExporter, name=exporter_name)
exporter_title = getattr(exporter, 'title', 'Unknown')
args = ', '.join(['%s=%s' % (item[0], item[1])
for item in job.kwargs.items()])
status = job.finished and 'ready' or 'running'
status = job.failed and 'FAILED' or status
start_time = getattr(job, 'begin_after', None)
time_delta = None
if start_time:
tz = getUtility(IIkobaUtils).tzinfo
time_delta = datetime.now(tz) - start_time
start_time = start_time.astimezone(tz).strftime(
"%Y-%m-%d %H:%M:%S %Z")
download_url = view.url(view.context, 'download_export',
data=dict(job_id=job_id))
show_download_button = job.finished and not \
job.failed and time_delta and \
time_delta.days < 1
new_entry = dict(
job=job_id,
creator=user_id,
args=args,
exporter=exporter_title,
status=status,
start_time=start_time,
download_url=download_url,
show_download_button = show_download_button,
show_refresh_button = not job.finished,
show_discard_button = job.finished,)
entries.append(new_entry)
return entries
class LocalRoleAssignmentUtilityView(object):
"""A view mixin with useful methods for local role assignment.
"""
def getLocalRoles(self):
roles = ILocalRolesAssignable(self.context)
return roles()
def getUsers(self):
return get_all_users()
def getUsersWithLocalRoles(self):
return get_users_with_local_roles(self.context)
#
# Login/logout and language switch pages...
#
class LoginPage(IkobaPage):
"""A login page, available for all objects.
"""
grok.name('login')
grok.context(IIkobaObject)
grok.require('waeup.Public')
label = _(u'Login')
camefrom = None
login_button = label
def _comment(self, customer):
return getattr(customer, 'suspended_comment', None)
def update(self, SUBMIT=None, camefrom=None):
self.camefrom = camefrom
if SUBMIT is not None:
if self.request.principal.id != 'zope.anybody':
self.flash(_('You logged in.'))
if self.request.principal.user_type == 'customer':
customer = grok.getSite()['customers'][
self.request.principal.id]
rel_link = '/customers/%s' % self.request.principal.id
# Maybe we need this in Ikoba too
#if customer.personal_data_expired:
# rel_link = '/customers/%s/edit_personal' % (
# self.request.principal.id)
# self.flash(
# _('Your personal data record is outdated. Please update.'),
# type='warning')
self.redirect(self.application_url() + rel_link)
return
if not self.camefrom:
self.redirect(self.application_url() + '/index')
return
self.redirect(self.camefrom)
return
# Display appropriate flash message if credentials are correct
# but customer has been deactivated or a temporary password
# has been set.
login = self.request.form['form.login']
if len(login) == 8 and login in grok.getSite()['customers']:
customer = grok.getSite()['customers'][login]
password = self.request.form['form.password']
passwordmanager = getUtility(IPasswordManager, 'SSHA')
if customer.password is not None and \
passwordmanager.checkPassword(customer.password, password):
# The customer entered valid credentials.
# First we check if a temporary password has been set.
delta = timedelta(minutes=10)
now = datetime.utcnow()
temp_password_dict = getattr(customer, 'temp_password', None)
if temp_password_dict is not None and \
now < temp_password_dict.get('timestamp', now) + delta:
self.flash(
_('Your account has been temporarily deactivated.'),
type='warning')
return
# Now we know that the customer is suspended.
comment = self._comment(customer)
if comment:
self.flash(comment, type='warning')
else:
self.flash(_('Your account has been deactivated.'),
type='warning')
return
self.flash(_('You entered invalid credentials.'), type='danger')
return
class LogoutPage(IkobaPage):
"""A logout page. Calling this page will log the current user out.
"""
grok.context(IIkobaObject)
grok.require('waeup.Public')
grok.name('logout')
def update(self):
if not IUnauthenticatedPrincipal.providedBy(self.request.principal):
auth = getUtility(IAuthentication)
ILogout(auth).logout(self.request)
self.flash(_("You have been logged out. Thanks for using WAeUP Ikoba!"))
self.redirect(self.application_url() + '/index')
return
class LanguageChangePage(IkobaPage):
""" Language switch
"""
grok.context(IIkobaObject)
grok.name('change_language')
grok.require('waeup.Public')
def update(self, lang='en', view_name='@@index'):
self.response.setCookie('ikoba.language', lang, path='/')
self.redirect(self.url(self.context, view_name))
return
def render(self):
return
#
# Contact form...
#
class ContactAdminForm(IkobaForm):
grok.name('contactadmin')
#grok.context(ICompany)
grok.template('contactform')
grok.require('waeup.Authenticated')
pnav = 2
form_fields = grok.AutoFields(IContactForm).select('body')
def update(self):
super(ContactAdminForm, self).update()
self.form_fields.get('body').field.default = None
return
@property
def config(self):
return grok.getSite()['configuration']
def label(self):
return _(u'Contact ${a}', mapping = {'a': self.config.name_admin})
@property
def get_user_account(self):
return get_user_account(self.request)
@action(_('Send message now'), style='primary')
def send(self, *args, **data):
fullname = self.request.principal.title
try:
email = self.request.principal.email
except AttributeError:
email = self.config.email_admin
username = self.request.principal.id
usertype = getattr(self.request.principal,
'user_type', 'system').title()
ikoba_utils = getUtility(IIkobaUtils)
success = ikoba_utils.sendContactForm(
fullname,email,
self.config.name_admin,self.config.email_admin,
username,usertype,self.config.name,
data['body'],self.config.email_subject)
# Success is always True if sendContactForm didn't fail.
# TODO: Catch exceptions.
if success:
self.flash(_('Your message has been sent.'))
return
class EnquiriesForm(ContactAdminForm):
"""Captcha'd page to let anonymous send emails to the administrator.
"""
grok.name('enquiries')
grok.require('waeup.Public')
pnav = 2
form_fields = grok.AutoFields(IContactForm).select(
'fullname', 'email_from', 'body')
def update(self):
super(EnquiriesForm, self).update()
# Handle captcha
self.captcha = getUtility(ICaptchaManager).getCaptcha()
self.captcha_result = self.captcha.verify(self.request)
self.captcha_code = self.captcha.display(self.captcha_result.error_code)
return
@action(_('Send now'), style='primary')
def send(self, *args, **data):
if not self.captcha_result.is_valid:
# Captcha will display error messages automatically.
# No need to flash something.
return
ikoba_utils = getUtility(IIkobaUtils)
success = ikoba_utils.sendContactForm(
data['fullname'],data['email_from'],
self.config.name_admin,self.config.email_admin,
u'None',u'Anonymous',self.config.name,
data['body'],self.config.email_subject)
if success:
self.flash(_('Your message has been sent.'))
else:
self.flash(_('A smtp server error occurred.'), type='danger')
return
#
# Company related pages...
#
class CompanyPage(IkobaDisplayFormPage):
""" The main company page.
"""
grok.require('waeup.Public')
grok.name('index')
grok.context(ICompany)
pnav = 0
label = ''
@property
def frontpage(self):
lang = self.request.cookies.get('ikoba.language')
html = self.context['configuration'].frontpage_dict.get(lang,'')
if html =='':
portal_language = getUtility(IIkobaUtils).PORTAL_LANGUAGE
html = self.context[
'configuration'].frontpage_dict.get(portal_language,'')
if html =='':
return _(u'
Welcome to WAeUP.Ikoba
')
else:
return html
class AdministrationPage(IkobaPage):
""" The administration overview page.
"""
grok.name('administration')
grok.context(ICompany)
grok.require('waeup.managePortal')
label = _(u'Administration')
pnav = 0
class RSS20Feed(grok.View):
"""An RSS 2.0 feed.
"""
grok.name('feed.rss')
grok.context(ICompany)
grok.require('waeup.Public')
grok.template('companyrss20feed')
name = 'General news feed'
description = 'waeup.ikoba now supports RSS 2.0 feeds :-)'
language = None
date = None
buildDate = None
editor = None
webmaster = None
@property
def title(self):
return getattr(grok.getSite(), 'name', u'Sample Company')
@property
def contexttitle(self):
return self.name
@property
def link(self):
return self.url(grok.getSite())
def update(self):
self.response.setHeader('Content-Type', 'text/xml; charset=UTF-8')
def entries(self):
return ()
#
# User container pages...
#
class UsersContainerPage(IkobaPage):
"""Overview page for all local users.
"""
grok.require('waeup.manageUsers')
grok.context(IUsersContainer)
grok.name('index')
label = _('Officers')
manage_button = _(u'Manage')
delete_button = _(u'Remove')
def update(self, userid=None, adduser=None, manage=None, delete=None):
if manage is not None and userid is not None:
self.redirect(self.url(userid) + '/@@manage')
if delete is not None and userid is not None:
self.context.delUser(userid)
self.flash(_('User account ${a} successfully deleted.',
mapping = {'a': userid}))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.__parent__.logger.info(
'%s - removed: %s' % (ob_class, userid))
def getLocalRoles(self, account):
local_roles = account.getLocalRoles()
local_roles_string = ''
site_url = self.url(grok.getSite())
for local_role in local_roles.keys():
role_title = getattr(
dict(get_all_roles()).get(local_role, None), 'title', None)
objects_string = ''
for object in local_roles[local_role]:
objects_string += '%s, ' %(self.url(object),
self.url(object).replace(site_url,''))
local_roles_string += '%s:
%s
' %(role_title,
objects_string.rstrip(', '))
return local_roles_string
def getSiteRoles(self, account):
site_roles = account.roles
site_roles_string = ''
for site_role in site_roles:
role_title = dict(get_all_roles())[site_role].title
site_roles_string += '%s
' % role_title
return site_roles_string
class AddUserFormPage(IkobaAddFormPage):
"""Add a user account.
"""
grok.require('waeup.manageUsers')
grok.context(IUsersContainer)
grok.name('add')
grok.template('usereditformpage')
form_fields = grok.AutoFields(IUserAccount)
label = _('Add user')
@action(_('Add user'), style='primary')
def addUser(self, **data):
name = data['name']
title = data['title']
email = data['email']
phone = data['phone']
description = data['description']
#password = data['password']
roles = data['roles']
form = self.request.form
password = form.get('password', None)
password_ctl = form.get('control_password', None)
if password:
validator = getUtility(IPasswordValidator)
errors = validator.validate_password(password, password_ctl)
if errors:
self.flash( ' '.join(errors), type='danger')
return
try:
self.context.addUser(name, password, title=title, email=email,
phone=phone, description=description,
roles=roles)
self.flash(_('User account ${a} successfully added.',
mapping = {'a': name}))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.__parent__.logger.info(
'%s - added: %s' % (ob_class, name))
except KeyError:
self.status = self.flash('The userid chosen already exists '
'in the database.', type='danger')
return
self.redirect(self.url(self.context))
class UserManageFormPage(IkobaEditFormPage):
"""Manage a user account.
"""
grok.context(IUserAccount)
grok.name('manage')
grok.template('usereditformpage')
grok.require('waeup.manageUsers')
form_fields = grok.AutoFields(IUserAccount).omit('name')
def label(self):
return _("Edit user ${a}", mapping = {'a':self.context.__name__})
def setUpWidgets(self, ignore_request=False):
super(UserManageFormPage,self).setUpWidgets(ignore_request)
self.widgets['title'].displayWidth = 30
self.widgets['description'].height = 3
return
@action(_('Save'), style='primary')
def save(self, **data):
form = self.request.form
password = form.get('password', None)
password_ctl = form.get('control_password', None)
if password:
validator = getUtility(IPasswordValidator)
errors = validator.validate_password(password, password_ctl)
if errors:
self.flash( ' '.join(errors), type='danger')
return
changed_fields = self.applyData(self.context, **data)
if changed_fields:
changed_fields = reduce(lambda x,y: x+y, changed_fields.values())
else:
changed_fields = []
if password:
# Now we know that the form has no errors and can set password ...
self.context.setPassword(password)
changed_fields.append('password')
fields_string = ' + '.join(changed_fields)
if fields_string:
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.__parent__.logger.info(
'%s - %s edited: %s' % (
ob_class, self.context.name, fields_string))
self.flash(_('User settings have been saved.'))
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context.__parent__))
return
class ContactUserForm(ContactAdminForm):
grok.name('contactuser')
grok.context(IUserAccount)
grok.template('contactform')
grok.require('waeup.manageUsers')
pnav = 0
form_fields = grok.AutoFields(IContactForm).select('body')
def label(self):
return _(u'Send message to ${a}', mapping = {'a':self.context.title})
@action(_('Send message now'), style='primary')
def send(self, *args, **data):
try:
email = self.request.principal.email
except AttributeError:
email = self.config.email_admin
usertype = getattr(self.request.principal,
'user_type', 'system').title()
ikoba_utils = getUtility(IIkobaUtils)
success = ikoba_utils.sendContactForm(
self.request.principal.title,email,
self.context.title,self.context.email,
self.request.principal.id,usertype,self.config.name,
data['body'],self.config.email_subject)
# Success is always True if sendContactForm didn't fail.
# TODO: Catch exceptions.
if success:
self.flash(_('Your message has been sent.'))
return
class UserEditFormPage(UserManageFormPage):
"""Edit a user account by user
"""
grok.name('index')
grok.require('waeup.editUser')
form_fields = grok.AutoFields(IUserAccount).omit(
'name', 'description', 'roles')
label = _(u"My Preferences")
def setUpWidgets(self, ignore_request=False):
super(UserManageFormPage,self).setUpWidgets(ignore_request)
self.widgets['title'].displayWidth = 30
class MyRolesPage(IkobaPage):
"""Display site roles and local roles assigned to officers.
"""
grok.name('my_roles')
grok.require('waeup.editUser')
grok.context(IUserAccount)
grok.template('myrolespage')
label = _(u"My Roles")
@property
def getLocalRoles(self):
local_roles = get_user_account(self.request).getLocalRoles()
local_roles_userfriendly = {}
for local_role in local_roles:
role_title = dict(get_all_roles())[local_role].title
local_roles_userfriendly[role_title] = local_roles[local_role]
return local_roles_userfriendly
@property
def getSiteRoles(self):
site_roles = get_user_account(self.request).roles
site_roles_userfriendly = []
for site_role in site_roles:
role_title = dict(get_all_roles())[site_role].title
site_roles_userfriendly.append(role_title)
return site_roles_userfriendly
#
# Configuration pages...
#
class ConfigurationContainerManageFormPage(IkobaEditFormPage):
"""Manage page of the configuration container. We always use the
manage page in the UI not the view page, thus we use the index name here.
"""
grok.require('waeup.managePortalConfiguration')
grok.name('index')
grok.context(IConfigurationContainer)
pnav = 0
label = _(u'Edit portal configuration')
form_fields = grok.AutoFields(IConfigurationContainer).omit(
'frontpage_dict')
@action(_('Save'), style='primary')
def save(self, **data):
msave(self, **data)
frontpage = getattr(self.context, 'frontpage', None)
portal_language = getUtility(IIkobaUtils).PORTAL_LANGUAGE
self.context.frontpage_dict = html2dict(frontpage, portal_language)
return
@action(_('Update plugins'),
tooltip=_('For experts only!'),
warning=_('Plugins may only be updated after software upgrades. '
'Are you really sure?'),
validator=NullValidator)
def updatePlugins(self, **data):
grok.getSite().updatePlugins()
self.flash(_('Plugins were updated. See log file for details.'))
return
#
# Datacenter pages...
#
class DatacenterPage(IkobaEditFormPage):
grok.context(IDataCenter)
grok.name('index')
grok.require('waeup.manageDataCenter')
label = _(u'Data Center')
pnav = 0
@jsaction(_('Remove selected'))
def delFiles(self, **data):
form = self.request.form
if 'val_id' in form:
child_id = form['val_id']
else:
self.flash(_('No item selected.'), type='danger')
return
if not isinstance(child_id, list):
child_id = [child_id]
deleted = []
for id in child_id:
fullpath = os.path.join(self.context.storage, id)
try:
os.remove(fullpath)
deleted.append(id)
except OSError:
self.flash(_('OSError: The file could not be deleted.'),
type='danger')
return
if len(deleted):
self.flash(_('Successfully deleted: ${a}',
mapping = {'a': ', '.join(deleted)}))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - deleted: %s' % (ob_class, ', '.join(deleted)))
return
class DatacenterFinishedPage(IkobaEditFormPage):
grok.context(IDataCenter)
grok.name('processed')
grok.require('waeup.manageDataCenter')
label = _(u'Processed Files')
pnav = 0
cancel_button =_('Back to Data Center')
def update(self, CANCEL=None):
if CANCEL is not None:
self.redirect(self.url(self.context))
return
return super(DatacenterFinishedPage, self).update()
class DatacenterUploadPage(IkobaPage):
grok.context(IDataCenter)
grok.name('upload')
grok.require('waeup.manageDataCenter')
label = _(u'Upload portal data as CSV file')
pnav = 0
max_files = 20
upload_button =_(u'Upload')
cancel_button =_(u'Back to Data Center')
def getPreviewHeader(self):
"""Get the header fields of uploaded CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def _notifyImportManagers(self, filename,
normalized_filename, importer, import_mode):
"""Send email to Import Managers
"""
# Get information about file
self.fullpath = os.path.join(self.context.storage, normalized_filename)
uploadfile = DataCenterFile(self.fullpath)
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
table = getPreviewTable(self, 3)
mail_table = ''
for line in table:
header = line[0]
data = str(line[1:]).strip('[').strip(']')
mail_table += '%s: %s ...\n' % (line[0], data)
# Collect all recipient addresses
ikoba_utils = getUtility(IIkobaUtils)
import_managers = get_users_with_role(
'waeup.ImportManager', grok.getSite())
rcpt_addrs = ','.join(
[user['user_email'] for user in import_managers if
user['user_email'] is not None])
if rcpt_addrs:
config = grok.getSite()['configuration']
fullname = self.request.principal.title
try:
email = self.request.principal.email
except AttributeError:
email = config.email_admin
username = self.request.principal.id
usertype = getattr(self.request.principal,
'user_type', 'system').title()
rcpt_name = _('Import Manager')
subject = translate(
_('${a}: ${b} uploaded',
mapping = {'a':config.acronym, 'b':filename}),
'waeup.ikoba',
target_language=ikoba_utils.PORTAL_LANGUAGE)
text = _("""File: ${a}
Importer: ${b}
Import Mode: ${c}
Datasets: ${d}
${e}
Comment by Import Manager:""", mapping = {'a':normalized_filename,
'b':importer,
'c':import_mode,
'd':uploadfile.lines - 1,
'e':mail_table})
success = ikoba_utils.sendContactForm(
fullname,email,
rcpt_name,rcpt_addrs,
username,usertype,config.name,
text,subject)
if success:
self.flash(
_('All import managers have been notified by email.'))
else:
self.flash(_('An smtp server error occurred.'), type='danger')
return
def update(self, uploadfile=None, import_mode=None,
importer=None, CANCEL=None, SUBMIT=None):
number_of_pendings = len(self.context.getPendingFiles())
if number_of_pendings > self.max_files:
self.flash(
_('Maximum number of files in the data center exceeded.'),
type='danger')
self.redirect(self.url(self.context))
return
if CANCEL is not None:
self.redirect(self.url(self.context))
return
if not uploadfile:
return
try:
filename = uploadfile.filename
#if 'pending' in filename:
# self.flash(_("You can't re-upload pending data files."), type='danger')
# return
if not filename.endswith('.csv'):
self.flash(_("Only csv files are allowed."), type='danger')
return
normalized_filename = self.getNormalizedFileName(filename)
finished_file = os.path.join(
self.context.storage, 'finished', normalized_filename)
unfinished_file = os.path.join(
self.context.storage, 'unfinished', normalized_filename)
if os.path.exists(finished_file) or os.path.exists(unfinished_file):
self.flash(_("File with same name was uploaded earlier."),
type='danger')
return
target = os.path.join(self.context.storage, normalized_filename)
filecontent = uploadfile.read()
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
logger = self.context.logger
# Forbid certain characters in import files.
invalid_line = check_csv_charset(filecontent.splitlines())
if invalid_line:
self.flash(_(
"Your file contains forbidden characters or "
"has invalid CSV format. "
"First problematic line detected: line %s. "
"Please replace." % invalid_line), type='danger')
logger.info('%s - invalid file uploaded: %s' %
(ob_class, target))
return
open(target, 'wb').write(filecontent)
os.chmod(target, 0664)
logger.info('%s - uploaded: %s' % (ob_class, target))
self._notifyImportManagers(filename,
normalized_filename, importer, import_mode)
except IOError:
self.flash('Error while uploading file. Please retry.', type='danger')
self.flash('I/O error: %s' % sys.exc_info()[1], type='danger')
return
self.redirect(self.url(self.context))
def getNormalizedFileName(self, filename):
"""Build sane filename.
An uploaded file foo.csv will be stored as foo_USERNAME.csv
where username is the principal id of the currently logged in
user.
Spaces in filename are replaced by underscore.
Pending data filenames remain unchanged.
"""
if filename.endswith('.pending.csv'):
return filename
username = self.request.principal.id
filename = filename.replace(' ', '_')
# Only accept typical filname chars...
filtered_username = ''.join(re.findall('[a-zA-Z0-9_\.\-]', username))
base, ext = os.path.splitext(filename)
return '%s_%s%s' % (base, filtered_username, ext.lower())
def getImporters(self):
importers = getAllUtilitiesRegisteredFor(IBatchProcessor)
ikoba_utils = getUtility(IIkobaUtils)
importer_props = []
for x in importers:
if not x.util_name in ikoba_utils.BATCH_PROCESSOR_NAMES:
continue
iface_fields = schema.getFields(x.iface)
available_fields = []
for key in iface_fields.keys():
iface_fields[key] = (iface_fields[key].__class__.__name__,
iface_fields[key].required)
for value in x.available_fields:
available_fields.append(
dict(f_name=value,
f_type=iface_fields.get(value, (None, False))[0],
f_required=iface_fields.get(value, (None, False))[1]
)
)
available_fields = sorted(available_fields, key=lambda k: k['f_name'])
importer_props.append(
dict(title=x.name, name=x.util_name, fields=available_fields))
return sorted(importer_props, key=lambda k: k['title'])
class FileDownloadView(UtilityView, grok.View):
grok.context(IDataCenter)
grok.name('download')
grok.require('waeup.manageDataCenter')
def update(self, filename=None):
self.filename = self.request.form['filename']
return
def render(self):
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - downloaded: %s' % (ob_class, self.filename))
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition:', 'attachment; filename="%s' %
self.filename.replace('finished/',''))
fullpath = os.path.join(self.context.storage, self.filename)
return open(fullpath, 'rb').read()
class SkeletonDownloadView(UtilityView, grok.View):
grok.context(IDataCenter)
grok.name('skeleton')
grok.require('waeup.manageDataCenter')
def update(self, processorname=None):
self.processorname = self.request.form['name']
self.filename = ('%s_000.csv' %
self.processorname.replace('processor','import'))
return
def render(self):
#ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
#self.context.logger.info(
# '%s - skeleton downloaded: %s' % (ob_class, self.filename))
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition:', 'attachment; filename="%s' % self.filename)
processor = getUtility(IBatchProcessor, name=self.processorname)
csv_data = processor.get_csv_skeleton()
return csv_data
class DatacenterImportStep1(IkobaPage):
"""Manual import step 1: choose file
"""
grok.context(IDataCenter)
grok.name('import1')
grok.template('datacenterimport1page')
grok.require('waeup.manageDataCenter')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Back to Data Center')
def getFiles(self):
files = self.context.getPendingFiles(sort='date')
for file in files:
name = file.name
if not name.endswith('.csv') and not name.endswith('.pending'):
continue
yield file
def update(self, filename=None, select=None, cancel=None):
if cancel is not None:
self.redirect(self.url(self.context))
return
if select is not None:
# A filename was selected
session = ISession(self.request)['waeup.ikoba']
session['import_filename'] = select
self.redirect(self.url(self.context, '@@import2'))
class DatacenterImportStep2(IkobaPage):
"""Manual import step 2: choose processor
"""
grok.context(IDataCenter)
grok.name('import2')
grok.template('datacenterimport2page')
grok.require('waeup.manageDataCenter')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Cancel')
back_button =_(u'Back to step 1')
proceed_button =_(u'Proceed to step 3')
filename = None
mode = 'create'
importer = None
mode_locked = False
def getPreviewHeader(self):
"""Get the header fields of attached CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def getPreviewTable(self):
return getPreviewTable(self, 3)
def getImporters(self):
importers = getAllUtilitiesRegisteredFor(IBatchProcessor)
ikoba_utils = getUtility(IIkobaUtils)
importers = sorted(
[dict(title=x.name, name=x.util_name) for x in importers
if x.util_name in ikoba_utils.BATCH_PROCESSOR_NAMES])
return importers
def getModeFromFilename(self, filename):
"""Lookup filename or path and return included mode name or None.
"""
if not filename.endswith('.pending.csv'):
return None
base = os.path.basename(filename)
parts = base.rsplit('.', 3)
if len(parts) != 4:
return None
if parts[1] not in ['create', 'update', 'remove']:
return None
return parts[1]
def getWarnings(self):
import sys
result = []
try:
headerfields = self.getPreviewHeader()
headerfields_clean = list(set(headerfields))
if len(headerfields) > len(headerfields_clean):
result.append(
_("Double headers: each column name may only appear once. "))
except:
fatal = '%s' % sys.exc_info()[1]
result.append(fatal)
if result:
warnings = ""
for line in result:
warnings += line + '
'
warnings += _('Replace imported file!')
return warnings
return False
def update(self, mode=None, importer=None,
back1=None, cancel=None, proceed=None):
session = ISession(self.request)['waeup.ikoba']
self.filename = session.get('import_filename', None)
if self.filename is None or back1 is not None:
self.redirect(self.url(self.context, '@@import1'))
return
if cancel is not None:
self.flash(_('Import aborted.'), type='warning')
self.redirect(self.url(self.context))
return
self.mode = mode or session.get('import_mode', self.mode)
filename_mode = self.getModeFromFilename(self.filename)
if filename_mode is not None:
self.mode = filename_mode
self.mode_locked = True
self.importer = importer or session.get('import_importer', None)
session['import_importer'] = self.importer
if self.importer and 'update' in self.importer:
if self.mode != 'update':
self.flash(_('Update mode only!'), type='warning')
self.mode_locked = True
self.mode = 'update'
proceed = None
session['import_mode'] = self.mode
if proceed is not None:
self.redirect(self.url(self.context, '@@import3'))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
warnings = self.getWarnings()
if not warnings:
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
else:
self.reader = ()
self.flash(warnings, type='warning')
class DatacenterImportStep3(IkobaPage):
"""Manual import step 3: modify header
"""
grok.context(IDataCenter)
grok.name('import3')
grok.template('datacenterimport3page')
grok.require('waeup.manageDataCenter')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Cancel')
reset_button =_(u'Reset')
update_button =_(u'Set headerfields')
back_button =_(u'Back to step 2')
proceed_button =_(u'Perform import')
filename = None
mode = None
importername = None
@property
def nextstep(self):
return self.url(self.context, '@@import4')
def getPreviewHeader(self):
"""Get the header fields of attached CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def getPreviewTable(self):
"""Get transposed table with 1 sample record.
The first column contains the headers.
"""
if not self.reader:
return
headers = self.getPreviewHeader()
num = 0
data = []
for line in self.reader:
if num > 0:
break
num += 1
data.append(line)
result = []
field_num = 0
for name in headers:
result_line = []
result_line.append(field_num)
field_num += 1
for d in data:
result_line.append(d[name])
result.append(result_line)
return result
def getPossibleHeaders(self):
"""Get the possible headers.
The headers are described as dicts {value:internal_name,
title:displayed_name}
"""
result = [dict(title='', value='--IGNORE--')]
headers = self.importer.getHeaders()
result.extend([dict(title=x, value=x) for x in headers])
return result
def getWarnings(self):
import sys
result = []
try:
self.importer.checkHeaders(self.headerfields, mode=self.mode)
except:
fatal = '%s' % sys.exc_info()[1]
result.append(fatal)
if result:
warnings = ""
for line in result:
warnings += line + '
'
warnings += _('Edit headers or replace imported file!')
return warnings
return False
def update(self, headerfield=None, back2=None, cancel=None, proceed=None):
session = ISession(self.request)['waeup.ikoba']
self.filename = session.get('import_filename', None)
self.mode = session.get('import_mode', None)
self.importername = session.get('import_importer', None)
if None in (self.filename, self.mode, self.importername):
self.redirect(self.url(self.context, '@@import2'))
return
if back2 is not None:
self.redirect(self.url(self.context ,'@@import2'))
return
if cancel is not None:
self.flash(_('Import aborted.'), type='warning')
self.redirect(self.url(self.context))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
self.headerfields = headerfield or self.getPreviewHeader()
session['import_headerfields'] = self.headerfields
if proceed is not None:
self.redirect(self.url(self.context, '@@import4'))
return
self.importer = getUtility(IBatchProcessor, name=self.importername)
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
warnings = self.getWarnings()
if warnings:
self.flash(warnings, type='warning')
class DatacenterImportStep4(IkobaPage):
"""Manual import step 4: do actual import
"""
grok.context(IDataCenter)
grok.name('import4')
grok.template('datacenterimport4page')
grok.require('waeup.importData')
label = _(u'Process CSV file')
pnav = 0
back_button =_(u'Process next')
filename = None
mode = None
importername = None
headerfields = None
warnnum = None
def update(self, back=None, finish=None, showlog=None):
if finish is not None:
self.redirect(self.url(self.context, '@@import1'))
return
session = ISession(self.request)['waeup.ikoba']
self.filename = session.get('import_filename', None)
self.mode = session.get('import_mode', None)
self.importername = session.get('import_importer', None)
# If the import file contains only one column
# the import_headerfields attribute is a string.
ihf = session.get('import_headerfields', None)
if not isinstance(ihf, list):
self.headerfields = ihf.split()
else:
self.headerfields = ihf
if None in (self.filename, self.mode, self.importername,
self.headerfields):
self.redirect(self.url(self.context, '@@import3'))
return
if showlog is not None:
logfilename = "datacenter.log"
session['logname'] = logfilename
self.redirect(self.url(self.context, '@@show'))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
self.importer = getUtility(IBatchProcessor, name=self.importername)
# Perform batch processing...
# XXX: This might be better placed in datacenter module.
(linenum, self.warn_num,
fin_path, pending_path) = self.importer.doImport(
self.fullpath, self.headerfields, self.mode,
self.request.principal.id, logger=self.context.logger)
# Put result files in desired locations...
self.context.distProcessedFiles(
self.warn_num == 0, self.fullpath, fin_path, pending_path,
self.mode)
if self.warn_num:
self.flash(_('Processing of ${a} rows failed.',
mapping = {'a':self.warn_num}), type='warning')
self.flash(_('Successfully processed ${a} rows.',
mapping = {'a':linenum - self.warn_num}))
class DatacenterLogsOverview(IkobaPage):
grok.context(IDataCenter)
grok.name('logs')
grok.template('datacenterlogspage')
grok.require('waeup.manageDataCenter')
label = _(u'Show logfiles')
pnav = 0
back_button = _(u'Back to Data Center')
show_button = _(u'Show')
def update(self, back=None):
if back is not None:
self.redirect(self.url(self.context))
return
self.files = self.context.getLogFiles()
class DatacenterLogsFileview(IkobaPage):
grok.context(IDataCenter)
grok.name('show')
grok.template('datacenterlogsshowfilepage')
grok.require('waeup.manageDataCenter')
title = _(u'Data Center')
pnav = 0
search_button = _('Search')
back_button = _('Back to Data Center')
placeholder = _('Enter a regular expression here...')
def label(self):
return "Logfile %s" % self.filename
def update(self, back=None, query=None, logname=None):
if os.name != 'posix':
self.flash(
_('Log files can only be searched ' +
'on Unix-based operating systems.'), type='danger')
self.redirect(self.url(self.context, '@@logs'))
return
if back is not None or logname is None:
self.redirect(self.url(self.context, '@@logs'))
return
self.filename = logname
self.query = query
if not query:
return
try:
self.result = ''.join(
self.context.queryLogfiles(logname, query))
except ValueError:
self.flash(_('Invalid search expression.'), type='danger')
return
if not self.result:
self.flash(_('No search results found.'), type='warning')
return
class DatacenterSettings(IkobaPage):
grok.context(IDataCenter)
grok.name('manage')
grok.template('datacentermanagepage')
grok.require('waeup.managePortal')
label = _('Edit data center settings')
pnav = 0
save_button =_(u'Save')
reset_button =_(u'Reset')
cancel_button =_(u'Back to Data Center')
def update(self, newpath=None, move=False, overwrite=False,
save=None, cancel=None):
if move:
move = True
if overwrite:
overwrite = True
if newpath is None:
return
if cancel is not None:
self.redirect(self.url(self.context))
return
try:
not_copied = self.context.setStoragePath(newpath, move=move)
for name in not_copied:
self.flash(_('File already existed (not copied): ${a}',
mapping = {'a':name}), type='danger')
except:
self.flash(_('Given storage path cannot be used: ${a}',
mapping = {'a':sys.exc_info()[1]}), type='danger')
return
if newpath:
self.flash(_('New storage path succefully set.'))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - storage path set: %s' % (ob_class, newpath))
self.redirect(self.url(self.context))
return
class ExportCSVPage(IkobaPage):
grok.context(IDataCenter)
grok.name('export')
grok.template('datacenterexportpage')
grok.require('waeup.exportData')
label = _('Download portal data as CSV file')
pnav = 0
export_button = _(u'Create CSV file')
cancel_button =_(u'Back to Data Center')
def getExporters(self):
exporter_utils = getUtilitiesFor(ICSVExporter)
ikoba_utils = getUtility(IIkobaUtils)
title_name_tuples = [
(util.title, name) for name, util in exporter_utils
if name in ikoba_utils.EXPORTER_NAMES]
return sorted(title_name_tuples)
def update(self, CREATE=None, DISCARD=None, exporter=None,
job_id=None, CANCEL=None):
if CANCEL is not None:
self.redirect(self.url(self.context))
return
if CREATE:
job_id = self.context.start_export_job(
exporter, self.request.principal.id)
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - exported: %s, job_id=%s' % (ob_class, exporter, job_id))
if DISCARD and job_id:
entry = self.context.entry_from_job_id(job_id)
self.context.delete_export_entry(entry)
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - discarded: job_id=%s' % (ob_class, job_id))
self.flash(_('Discarded export') + ' %s' % job_id)
self.entries = doll_up(self, user=None)
return
class ExportCSVView(grok.View):
grok.context(IDataCenter)
grok.name('download_export')
grok.require('waeup.exportData')
def render(self, job_id=None):
manager = getUtility(IJobManager)
job = manager.get(job_id)
if job is None:
return
if hasattr(job.result, 'traceback'):
# XXX: Some error happened. Do something more approriate here...
return
path = job.result
if not os.path.exists(path):
# XXX: Do something more appropriate here...
return
result = open(path, 'rb').read()
acronym = grok.getSite()['configuration'].acronym.replace(' ','')
filename = "%s_%s" % (acronym, os.path.basename(path))
filename = filename.replace('.csv', '_%s.csv' % job_id)
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition', 'attachment; filename="%s' % filename)
# remove job and running_exports entry from context
#self.context.delete_export_entry(
# self.context.entry_from_job_id(job_id))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - downloaded: %s, job_id=%s' % (ob_class, filename, job_id))
return result
class ChangePasswordRequestPage(IkobaForm):
"""Captcha'd page for all kind of users to request a password change.
"""
grok.context(ICompany)
grok.name('changepw')
grok.require('waeup.Anonymous')
grok.template('changepw')
label = _('Send me a new password')
form_fields = grok.AutoFields(IChangePassword)
def update(self):
# Handle captcha
self.captcha = getUtility(ICaptchaManager).getCaptcha()
self.captcha_result = self.captcha.verify(self.request)
self.captcha_code = self.captcha.display(self.captcha_result.error_code)
return
def _searchUser(self, identifier, email):
# Search customer
cat = queryUtility(ICatalog, name='customers_catalog')
results = cat.searchResults(
reg_number=(identifier, identifier),
email=(email,email))
for result in results:
if result.customer_id == identifier \
or result.reg_number == identifier:
return result
# Search portal user
user = grok.getSite()['users'].get(identifier, None)
if user is not None and user.email == email:
return user
return None
@action(_('Send login credentials to email address'), style='primary')
def request(self, **data):
if not self.captcha_result.is_valid:
# Captcha will display error messages automatically.
# No need to flash something.
return
# Search customer
identifier = data['identifier']
email = data['email']
user = self._searchUser(identifier, email)
if user is None:
self.flash(_('No record found.'), type='warning')
return
# Change password
ikoba_utils = getUtility(IIkobaUtils)
password = ikoba_utils.genPassword()
mandate = PasswordMandate()
mandate.params['password'] = password
mandate.params['user'] = user
site = grok.getSite()
site['mandates'].addMandate(mandate)
# Send email with credentials
args = {'mandate_id':mandate.mandate_id}
mandate_url = self.url(site) + '/mandate?%s' % urlencode(args)
url_info = u'Confirmation link: %s' % mandate_url
msg = _('You have successfully requested a password for the')
success = ikoba_utils.sendCredentials(
IUserAccount(user),password,url_info,msg)
if success:
self.flash(_('An email with your user name and password ' +
'has been sent to ${a}.', mapping = {'a':email}))
else:
self.flash(_('An smtp server error occurred.'), type='danger')
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - %s - %s' % (ob_class, data['identifier'], data['email']))
return