## $Id: pages.py 14183 2016-09-23 16:28:55Z henrik $
##
## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
""" Viewing components for Ikoba objects.
"""
# XXX: All csv ops should move to a dedicated module soon
import unicodecsv as csv
import cgi
import grok
import os
import re
import sys
from datetime import datetime, timedelta
from urllib import urlencode
from hurry.query import Eq, Text
from hurry.query.query import Query
from zope import schema
from zope.i18n import translate
from zope.authentication.interfaces import (
IAuthentication, IUnauthenticatedPrincipal, ILogout)
from zope.catalog.interfaces import ICatalog
from zope.component import (
getUtility, queryUtility, createObject, getAllUtilitiesRegisteredFor,
getUtilitiesFor,
)
from zope.event import notify
from zope.security import checkPermission
from zope.securitypolicy.interfaces import IPrincipalRoleManager
from zope.session.interfaces import ISession
from zope.password.interfaces import IPasswordManager
from waeup.ikoba.utils.helpers import html2dict
from waeup.ikoba.browser.layout import (
IkobaPage, IkobaFormPage, IkobaEditFormPage, IkobaAddFormPage,
IkobaDisplayFormPage, NullValidator)
from waeup.ikoba.browser.interfaces import (
ICompany, ICaptchaManager, IChangePassword)
from waeup.ikoba.browser.layout import jsaction, action, UtilityView
from waeup.ikoba.interfaces import MessageFactory as _
from waeup.ikoba.interfaces import(
IIkobaObject, IUsersContainer, IUserAccount, IDataCenter,
IBatchProcessor,
ILocalRolesAssignable, DuplicationError, IConfigurationContainer,
IJobManager,
IPasswordValidator, IContactForm, IIkobaUtils, ICSVExporter,
CurrencySource)
from waeup.ikoba.permissions import (
get_users_with_local_roles, get_all_roles, get_all_users,
get_users_with_role)
from waeup.ikoba.customers.vocabularies import GenderSource
from waeup.ikoba.authentication import LocalRoleSetEvent
from waeup.ikoba.utils.helpers import get_user_account, check_csv_charset
from waeup.ikoba.mandates.mandate import PasswordMandate
from waeup.ikoba.datacenter import DataCenterFile
from waeup.ikoba.customers.utils import ICustomersUtils
from waeup.ikoba.documents.utils import IDocumentsUtils
FORBIDDEN_CHARACTERS = (160,)
grok.context(IIkobaObject)
grok.templatedir('templates')
def add_local_role(view, tab, **data):
localrole = view.request.form.get('local_role', None)
user = view.request.form.get('user', None)
if user is None or localrole is None:
view.flash('No user selected.', type='danger')
view.redirect(view.url(view.context, '@@manage')+'#tab%s' % tab)
return
role_manager = IPrincipalRoleManager(view.context)
role_manager.assignRoleToPrincipal(localrole, user)
notify(LocalRoleSetEvent(view.context, localrole, user, granted=True))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
grok.getSite().logger.info(
'%s - added: %s|%s' % (ob_class, user, localrole))
view.redirect(view.url(view.context, u'@@manage')+'#tab%s' % tab)
return
def del_local_roles(view, tab, **data):
child_ids = view.request.form.get('role_id', None)
if child_ids is None:
view.flash(_('No local role selected.'), type='danger')
view.redirect(view.url(view.context, '@@manage')+'#tab%s' % tab)
return
if not isinstance(child_ids, list):
child_ids = [child_ids]
deleted = []
role_manager = IPrincipalRoleManager(view.context)
for child_id in child_ids:
localrole = child_id.split('|')[1]
user_name = child_id.split('|')[0]
try:
role_manager.unsetRoleForPrincipal(localrole, user_name)
notify(LocalRoleSetEvent(
view.context, localrole, user_name, granted=False))
deleted.append(child_id)
except:
view.flash('Could not remove %s: %s: %s' % (
child_id, sys.exc_info()[0], sys.exc_info()[1]),
type='danger')
if len(deleted):
view.flash(
_('Local role successfully removed: ${a}',
mapping = {'a':', '.join(deleted)}))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
grok.getSite().logger.info(
'%s - removed: %s' % (ob_class, ', '.join(deleted)))
view.redirect(view.url(view.context, u'@@manage')+'#tab%s' % tab)
return
def delSubobjects(view, redirect, tab=None, subcontainer=None):
form = view.request.form
if 'val_id' in form:
child_id = form['val_id']
else:
view.flash(_('No item selected.'), type='danger')
if tab:
view.redirect(view.url(view.context, redirect)+'#tab%s' % tab)
else:
view.redirect(view.url(view.context, redirect))
return
if not isinstance(child_id, list):
child_id = [child_id]
deleted = []
for id in child_id:
try:
if subcontainer:
container = getattr(view.context, subcontainer, None)
del container[id]
else:
del view.context[id]
deleted.append(id)
except:
view.flash('Could not delete %s: %s: %s' % (
id, sys.exc_info()[0], sys.exc_info()[1]), type='danger')
if len(deleted):
view.flash(_('Successfully removed: ${a}',
mapping = {'a': ', '.join(deleted)}))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
grok.getSite().logger.info(
'%s - removed: %s' % (ob_class, ', '.join(deleted)))
if tab:
view.redirect(view.url(view.context, redirect)+'#tab%s' % tab)
else:
view.redirect(view.url(view.context, redirect))
return
def getPreviewTable(view, n):
"""Get transposed table with n sample record.
The first column contains the headers.
"""
if not view.reader:
return
header = view.getPreviewHeader()
num = 0
data = []
for line in view.reader:
if num > n-1:
break
num += 1
data.append(line)
result = []
for name in header:
result_line = []
result_line.append(name)
for d in data:
result_line.append(d[name])
result.append(result_line)
return result
# Save function used for save methods in pages
def msave(view, **data):
changed_fields = view.applyData(view.context, **data)
# Turn list of lists into single list
if changed_fields:
changed_fields = reduce(lambda x,y: x+y, changed_fields.values())
fields_string = ' + '.join(changed_fields)
view.flash(_('Form has been saved.'))
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
if fields_string:
grok.getSite().logger.info('%s - %s - saved: %s' % (ob_class, view.context.__name__, fields_string))
return
def doll_up(view, user=None, purge=False):
"""Doll up export jobs for displaying in table.
"""
ob_class = view.__implemented__.__name__.replace('waeup.ikoba.','')
job_entries = view.context.get_running_export_jobs(user)
job_manager = getUtility(IJobManager)
entries = []
for job_id, exporter_name, user_id in job_entries:
job = job_manager.get(job_id)
exporter = getUtility(ICSVExporter, name=exporter_name)
exporter_title = getattr(exporter, 'title', 'Unknown')
args = ', '.join(['%s=%s' % (item[0], item[1])
for item in job.kwargs.items()])
status = job.finished and 'ready' or 'running'
status = job.failed and 'FAILED' or status
start_time = getattr(job, 'begin_after', None)
time_delta = None
if start_time:
tz = getUtility(IIkobaUtils).tzinfo
time_delta = datetime.now(tz) - start_time
start_time = start_time.astimezone(tz).strftime(
"%Y-%m-%d %H:%M:%S %Z")
download_url = view.url(view.context, 'download_export',
data=dict(job_id=job_id))
show_download_button = job.finished and not \
job.failed and time_delta and \
time_delta.days < 1
if not show_download_button and purge:
entry = view.context.entry_from_job_id(job_id)
view.context.delete_export_entry(entry)
view.context.logger.info(
'%s - purged: job_id=%s' % (ob_class, job_id))
continue
new_entry = dict(
job=job_id,
creator=user_id,
args=args,
exporter=exporter_title,
status=status,
start_time=start_time,
download_url=download_url,
show_download_button = show_download_button,
show_refresh_button = not job.finished,
show_discard_button = job.finished,)
entries.append(new_entry)
return entries
def running_jobs(view):
"""True if any job is running.
"""
job_entries = view.context.get_running_export_jobs()
job_manager = getUtility(IJobManager)
entries = []
for entry in job_entries:
job = job_manager.get(entry[0])
status = job.finished and 'ready' or 'running'
status = job.failed and 'FAILED' or status
if status == 'running':
return True
return False
def getImporters(context):
importers = getAllUtilitiesRegisteredFor(IBatchProcessor)
ikoba_utils = getUtility(IIkobaUtils)
importers = [x for x in importers
if x.util_name in ikoba_utils.BATCH_PROCESSOR_NAMES]
importer_props = []
for x in importers:
# Skip User Processor if user isn't allowed to manage users.
if x.util_name == 'userprocessor' and not checkPermission(
'waeup.manageUsers', context):
continue
iface_fields = schema.getFields(x.iface)
available_fields = []
for key in iface_fields.keys():
iface_fields[key] = (iface_fields[key].__class__.__name__,
iface_fields[key].required)
for value in x.available_fields:
available_fields.append(
dict(f_name=value,
f_type=iface_fields.get(value, (None, False))[0],
f_required=iface_fields.get(value, (None, False))[1]
)
)
available_fields = sorted(available_fields, key=lambda k: k[
'f_name'])
importer_props.append(
dict(title=x.name, name=x.util_name, fields=available_fields))
return sorted(importer_props, key=lambda k: k['title'])
class LocalRoleAssignmentUtilityView(object):
"""A view mixin with useful methods for local role assignment.
"""
def getLocalRoles(self):
roles = ILocalRolesAssignable(self.context)
return roles()
def getUsers(self):
return get_all_users()
def getUsersWithLocalRoles(self):
return get_users_with_local_roles(self.context)
#
# Login/logout and language switch pages...
#
class LoginPage(IkobaPage):
"""A login page, available for all objects.
"""
grok.name('login')
grok.context(IIkobaObject)
grok.require('waeup.Public')
label = _(u'Login')
camefrom = None
login_button = label
def _comment(self, customer):
return getattr(customer, 'suspended_comment', None)
def update(self, SUBMIT=None, camefrom=None):
blocker = grok.getSite()['configuration'].maintmode_enabled_by
if blocker:
self.flash(_('The portal is in maintenance mode '
'and authentication (login) temporarily disabled.'),
type='warning')
self.camefrom = camefrom
if SUBMIT is not None:
if self.request.principal.id != 'zope.anybody':
self.flash(_('You logged in.'))
if self.request.principal.user_type == 'customer':
customer = grok.getSite()['customers'][
self.request.principal.id]
rel_link = '/customers/%s' % self.request.principal.id
# Maybe we need this in Ikoba too
#if customer.personal_data_expired:
# rel_link = '/customers/%s/edit_personal' % (
# self.request.principal.id)
# self.flash(
# _('Your personal data record is outdated. Please update.'),
# type='warning')
if camefrom == 'PasswordMandate':
self.redirect(
self.application_url() +
rel_link +
'/changepassword')
else:
self.redirect(self.application_url() + rel_link)
return
if not self.camefrom:
self.redirect(self.application_url() + '/index')
return
self.redirect(self.camefrom)
return
# Display second warning message and remind user that
# s/he can't login.
if blocker:
self.flash(_('The portal is in maintenance mode. '
'You can\'t login!'),
type='danger')
return
# Display appropriate flash message if credentials are correct
# but customer has been deactivated or a temporary password
# has been set.
login = self.request.form['form.login']
if len(login) == 8 and login in grok.getSite()['customers']:
customer = grok.getSite()['customers'][login]
password = self.request.form['form.password']
passwordmanager = getUtility(IPasswordManager, 'SSHA')
if customer.password is not None and \
passwordmanager.checkPassword(customer.password, password):
# The customer entered valid credentials.
# First we check if a temporary password has been set.
delta = timedelta(minutes=10)
now = datetime.utcnow()
temp_password_dict = getattr(customer, 'temp_password', None)
if temp_password_dict is not None and \
now < temp_password_dict.get('timestamp', now) + delta:
self.flash(
_('Your account has been temporarily deactivated.'),
type='warning')
return
# Now we know that the customer is suspended.
comment = self._comment(customer)
if comment:
self.flash(comment, type='warning')
else:
self.flash(_('Your account has been deactivated.'),
type='warning')
return
# Display appropriate flash message if credentials are correct
# but officer has been deactivated.
login = self.request.form['form.login']
if login in grok.getSite()['users']:
user = grok.getSite()['users'][login]
password = self.request.form['form.password']
passwordmanager = getUtility(IPasswordManager, 'SSHA')
if user.password is not None and \
passwordmanager.checkPassword(user.password, password):
self.flash(_('Your user name and password are correct '
'but yor account has been temporarily '
'deactivated.'),
type='warning')
return
self.flash(_('You entered invalid credentials.'), type='danger')
return
class LogoutPage(IkobaPage):
"""A logout page. Calling this page will log the current user out.
"""
grok.context(IIkobaObject)
grok.require('waeup.Public')
grok.name('logout')
def update(self):
if not IUnauthenticatedPrincipal.providedBy(self.request.principal):
auth = getUtility(IAuthentication)
ILogout(auth).logout(self.request)
self.flash(_("You have been logged out. Thanks for using WAeUP Ikoba!"))
self.redirect(self.application_url() + '/index')
return
class LanguageChangePage(IkobaPage):
""" Language switch
"""
grok.context(IIkobaObject)
grok.name('change_language')
grok.require('waeup.Public')
def update(self, lang='en', view_name='@@index'):
self.response.setCookie('ikoba.language', lang, path='/')
self.redirect(self.url(self.context, view_name))
return
def render(self):
return
#
# Contact form...
#
class ContactAdminForm(IkobaFormPage):
grok.name('contactadmin')
#grok.context(ICompany)
grok.template('contactform')
grok.require('waeup.Authenticated')
pnav = 2
form_fields = grok.AutoFields(IContactForm).select('body')
def update(self):
super(ContactAdminForm, self).update()
self.form_fields.get('body').field.default = None
return
@property
def config(self):
return grok.getSite()['configuration']
def label(self):
return _(u'Contact ${a}', mapping = {'a': self.config.name_admin})
@property
def get_user_account(self):
return get_user_account(self.request)
@action(_('Send message now'), style='primary')
def send(self, *args, **data):
fullname = self.request.principal.title
try:
email = self.request.principal.email
except AttributeError:
email = self.config.email_admin
username = self.request.principal.id
usertype = getattr(self.request.principal,
'user_type', 'system').title()
ikoba_utils = getUtility(IIkobaUtils)
success = ikoba_utils.sendContactForm(
fullname,email,
self.config.name_admin,self.config.email_admin,
username,usertype,self.config.name,
data['body'],self.config.email_subject)
# Success is always True if sendContactForm didn't fail.
# TODO: Catch exceptions.
if success:
self.flash(_('Your message has been sent.'))
return
class EnquiriesForm(ContactAdminForm):
"""Captcha'd page to let anonymous send emails to the administrator.
"""
grok.name('enquiries')
grok.require('waeup.Public')
pnav = 2
form_fields = grok.AutoFields(IContactForm).select(
'fullname', 'email_from', 'body')
def update(self):
super(EnquiriesForm, self).update()
# Handle captcha
self.captcha = getUtility(ICaptchaManager).getCaptcha()
self.captcha_result = self.captcha.verify(self.request)
self.captcha_code = self.captcha.display(self.captcha_result.error_code)
return
@action(_('Send now'), style='primary')
def send(self, *args, **data):
if not self.captcha_result.is_valid:
# Captcha will display error messages automatically.
# No need to flash something.
return
ikoba_utils = getUtility(IIkobaUtils)
success = ikoba_utils.sendContactForm(
data['fullname'],data['email_from'],
self.config.name_admin,self.config.email_admin,
u'None',u'Anonymous',self.config.name,
data['body'],self.config.email_subject)
if success:
self.flash(_('Your message has been sent.'))
else:
self.flash(_('A smtp server error occurred.'), type='danger')
return
#
# Company related pages...
#
class CompanyPage(IkobaDisplayFormPage):
""" The main company page.
"""
grok.require('waeup.Public')
grok.name('index')
grok.context(ICompany)
pnav = 0
label = ''
@property
def frontpage(self):
lang = self.request.cookies.get('ikoba.language')
html = self.context['configuration'].frontpage_dict.get(lang,'')
if html =='':
portal_language = getUtility(IIkobaUtils).PORTAL_LANGUAGE
html = self.context[
'configuration'].frontpage_dict.get(portal_language,'')
if html =='':
return _(u'
Welcome to WAeUP.Ikoba
')
else:
return html
class SourcesOverview(grok.View):
grok.context(ICompany)
grok.name('sourcesoverview')
grok.require('waeup.manageDataCenter')
def _set_con_cats(self):
con_cats = getUtility(IIkobaUtils).CON_CATS_DICT
self.con_cats = con_cats.items()
return
def _set_payment_categories(self):
payment_categories = getUtility(IIkobaUtils).PAYMENT_CATEGORIES
self.payment_categories = payment_categories.items()
return
def _set_customer_doctypes(self):
customer_doctypes = getUtility(ICustomersUtils).DOCTYPES_DICT
self.customer_doctypes = customer_doctypes.items()
return
def _set_contypes(self):
contypes = getUtility(ICustomersUtils).CONTYPES_DICT
self.contypes = contypes.items()
return
def _set_doctypes(self):
doctypes = getUtility(IDocumentsUtils).DOCTYPES_DICT
self.doctypes = doctypes.items()
return
def _set_customer_states(self):
customer_states = getUtility(ICustomersUtils).TRANSLATED_CUSTOMER_STATES
self.customer_states = customer_states.items()
return
def _set_contract_states(self):
contract_states = getUtility(ICustomersUtils).TRANSLATED_CONTRACT_STATES
self.contract_states = contract_states.items()
return
def _set_customer_document_states(self):
customer_document_states = getUtility(
ICustomersUtils).TRANSLATED_DOCUMENT_STATES
self.customer_document_states = customer_document_states.items()
return
def _set_document_states(self):
document_states = getUtility(
IDocumentsUtils).TRANSLATED_DOCUMENT_STATES
self.document_states = document_states.items()
return
def _set_currencies(self):
currenciessource = CurrencySource().factory
self.currencies = []
for code in currenciessource.getValues():
title = currenciessource.getTitle(code)
self.currencies.append((code, title))
return
def _set_sex(self):
gendersource = GenderSource().factory
self.sex = []
for sex in gendersource.getValues():
title = gendersource.getTitle(sex)
self.sex.append((sex, title))
return
def update(self):
self._set_con_cats()
self._set_payment_categories()
self._set_customer_doctypes()
self._set_contypes()
self._set_doctypes()
self._set_currencies()
self._set_sex()
self._set_customer_states()
self._set_contract_states()
self._set_customer_document_states()
self._set_document_states()
class SourcesOverviewPage(IkobaPage, SourcesOverview):
grok.name('sources')
grok.require('waeup.Public')
label = _(u'Sources & Vocabularies')
pnav = 0
class ProcessorsOverview(grok.View):
grok.context(ICompany)
grok.name('processorsoverview')
grok.require('waeup.manageDataCenter')
def getImporters(self):
return getImporters(self.context)
class ProcessorsOverviewPage(IkobaPage, ProcessorsOverview):
grok.name('processors')
grok.require('waeup.Public')
label = _(u'Available Processors (Importers)')
pnav = 0
class SkeletonDownloadView(UtilityView, grok.View):
grok.context(ICompany)
grok.name('skeleton')
grok.require('waeup.Public')
def update(self, processorname=None):
self.processorname = self.request.form['name']
self.filename = ('%s_000.csv' %
self.processorname.replace('processor','import'))
return
def render(self):
#ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
#self.context.logger.info(
# '%s - skeleton downloaded: %s' % (ob_class, self.filename))
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition:', 'attachment; filename="%s' % self.filename)
processor = getUtility(IBatchProcessor, name=self.processorname)
csv_data = processor.get_csv_skeleton()
return csv_data
class AdministrationPage(IkobaPage):
""" The administration overview page.
"""
grok.name('administration')
grok.context(ICompany)
grok.require('waeup.managePortal')
label = _(u'Administration')
pnav = 0
class RSS20Feed(grok.View):
"""An RSS 2.0 feed.
"""
grok.name('feed.rss')
grok.context(ICompany)
grok.require('waeup.Public')
grok.template('companyrss20feed')
name = 'General news feed'
description = 'waeup.ikoba now supports RSS 2.0 feeds :-)'
language = None
date = None
buildDate = None
editor = None
webmaster = None
@property
def title(self):
return getattr(grok.getSite(), 'name', u'Sample Company')
@property
def contexttitle(self):
return self.name
@property
def link(self):
return self.url(grok.getSite())
def update(self):
self.response.setHeader('Content-Type', 'text/xml; charset=UTF-8')
def entries(self):
return ()
#
# User container pages...
#
class UsersContainerPage(IkobaPage):
"""Overview page for all local users.
"""
grok.require('waeup.manageUsers')
grok.context(IUsersContainer)
grok.name('index')
label = _('Officers')
manage_button = _(u'Manage')
delete_button = _(u'Remove')
def update(self, userid=None, adduser=None, manage=None, delete=None):
if manage is not None and userid is not None:
self.redirect(self.url(userid) + '/@@manage')
if delete is not None and userid is not None:
self.context.delUser(userid)
self.flash(_('User account ${a} successfully deleted.',
mapping = {'a': userid}))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.__parent__.logger.info(
'%s - removed: %s' % (ob_class, userid))
def getLocalRoles(self, account):
local_roles = account.getLocalRoles()
local_roles_string = ''
site_url = self.url(grok.getSite())
for local_role in local_roles.keys():
role_title = getattr(
dict(get_all_roles()).get(local_role, None), 'title', None)
objects_string = ''
for object in local_roles[local_role]:
objects_string += '%s, ' %(self.url(object),
self.url(object).replace(site_url,''))
local_roles_string += '%s:
%s
' %(role_title,
objects_string.rstrip(', '))
return local_roles_string
def getSiteRoles(self, account):
site_roles = account.roles
site_roles_string = ''
for site_role in site_roles:
role_title = dict(get_all_roles())[site_role].title
site_roles_string += '%s
' % role_title
return site_roles_string
class AddUserFormPage(IkobaAddFormPage):
"""Add a user account.
"""
grok.require('waeup.manageUsers')
grok.context(IUsersContainer)
grok.name('add')
grok.template('usereditformpage')
form_fields = grok.AutoFields(IUserAccount)
label = _('Add user')
@action(_('Add user'), style='primary')
def addUser(self, **data):
name = data['name']
title = data['title']
email = data['email']
phone = data['phone']
description = data['description']
#password = data['password']
roles = data['roles']
form = self.request.form
password = form.get('password', None)
password_ctl = form.get('control_password', None)
if password:
validator = getUtility(IPasswordValidator)
errors = validator.validate_password(password, password_ctl)
if errors:
self.flash( ' '.join(errors), type='danger')
return
try:
self.context.addUser(name, password, title=title, email=email,
phone=phone, description=description,
roles=roles)
self.flash(_('User account ${a} successfully added.',
mapping = {'a': name}))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.__parent__.logger.info(
'%s - added: %s' % (ob_class, name))
except KeyError:
self.status = self.flash('The userid chosen already exists '
'in the database.', type='danger')
return
self.redirect(self.url(self.context))
class UserManageFormPage(IkobaEditFormPage):
"""Manage a user account.
"""
grok.context(IUserAccount)
grok.name('manage')
grok.template('usereditformpage')
grok.require('waeup.manageUsers')
form_fields = grok.AutoFields(IUserAccount).omit('name')
def label(self):
return _("Edit user ${a}", mapping = {'a':self.context.__name__})
def setUpWidgets(self, ignore_request=False):
super(UserManageFormPage,self).setUpWidgets(ignore_request)
self.widgets['title'].displayWidth = 30
self.widgets['description'].height = 3
return
@action(_('Save'), style='primary')
def save(self, **data):
form = self.request.form
password = form.get('password', None)
password_ctl = form.get('control_password', None)
if password:
validator = getUtility(IPasswordValidator)
errors = validator.validate_password(password, password_ctl)
if errors:
self.flash( ' '.join(errors), type='danger')
return
changed_fields = self.applyData(self.context, **data)
if changed_fields:
changed_fields = reduce(lambda x,y: x+y, changed_fields.values())
else:
changed_fields = []
if password:
# Now we know that the form has no errors and can set password ...
self.context.setPassword(password)
changed_fields.append('password')
fields_string = ' + '.join(changed_fields)
if fields_string:
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.__parent__.logger.info(
'%s - %s edited: %s' % (
ob_class, self.context.name, fields_string))
self.flash(_('User settings have been saved.'))
return
@action(_('Cancel'), validator=NullValidator)
def cancel(self, **data):
self.redirect(self.url(self.context.__parent__))
return
class ContactUserForm(ContactAdminForm):
grok.name('contactuser')
grok.context(IUserAccount)
grok.template('contactform')
grok.require('waeup.manageUsers')
pnav = 0
form_fields = grok.AutoFields(IContactForm).select('body')
def label(self):
return _(u'Send message to ${a}', mapping = {'a':self.context.title})
@action(_('Send message now'), style='primary')
def send(self, *args, **data):
try:
email = self.request.principal.email
except AttributeError:
email = self.config.email_admin
usertype = getattr(self.request.principal,
'user_type', 'system').title()
ikoba_utils = getUtility(IIkobaUtils)
success = ikoba_utils.sendContactForm(
self.request.principal.title,email,
self.context.title,self.context.email,
self.request.principal.id,usertype,self.config.name,
data['body'],self.config.email_subject)
# Success is always True if sendContactForm didn't fail.
# TODO: Catch exceptions.
if success:
self.flash(_('Your message has been sent.'))
return
class UserEditFormPage(UserManageFormPage):
"""Edit a user account by user
"""
grok.name('index')
grok.require('waeup.editUser')
form_fields = grok.AutoFields(IUserAccount).omit(
'name', 'description', 'roles')
label = _(u"My Preferences")
def setUpWidgets(self, ignore_request=False):
super(UserManageFormPage,self).setUpWidgets(ignore_request)
self.widgets['title'].displayWidth = 30
class MyRolesPage(IkobaPage):
"""Display site roles and local roles assigned to officers.
"""
grok.name('my_roles')
grok.require('waeup.editUser')
grok.context(IUserAccount)
grok.template('myrolespage')
label = _(u"My Roles")
@property
def getLocalRoles(self):
local_roles = get_user_account(self.request).getLocalRoles()
local_roles_userfriendly = {}
for local_role in local_roles:
role_title = dict(get_all_roles())[local_role].title
local_roles_userfriendly[role_title] = local_roles[local_role]
return local_roles_userfriendly
@property
def getSiteRoles(self):
site_roles = get_user_account(self.request).roles
site_roles_userfriendly = []
for site_role in site_roles:
role_title = dict(get_all_roles())[site_role].title
site_roles_userfriendly.append(role_title)
return site_roles_userfriendly
#
# Configuration pages...
#
class ConfigurationContainerManageFormPage(IkobaEditFormPage):
"""Manage page of the configuration container. We always use the
manage page in the UI not the view page, thus we use the index name here.
"""
grok.require('waeup.managePortalConfiguration')
grok.name('index')
grok.context(IConfigurationContainer)
pnav = 0
label = _(u'Edit portal configuration')
form_fields = grok.AutoFields(IConfigurationContainer).omit(
'frontpage_dict')
form_fields['maintmode_enabled_by'].for_display = True
@action(_('Save'), style='primary')
def save(self, **data):
msave(self, **data)
frontpage = getattr(self.context, 'frontpage', None)
portal_language = getUtility(IIkobaUtils).PORTAL_LANGUAGE
self.context.frontpage_dict = html2dict(frontpage, portal_language)
return
@action(_('Update plugins'),
tooltip=_('For experts only!'),
warning=_('Plugins may only be updated after software upgrades. '
'Are you really sure?'),
validator=NullValidator)
def updatePlugins(self, **data):
grok.getSite().updatePlugins()
self.flash(_('Plugins were updated. See log file for details.'))
return
#
# Datacenter pages...
#
class DatacenterPage(IkobaEditFormPage):
grok.context(IDataCenter)
grok.name('index')
grok.require('waeup.manageDataCenter')
label = _(u'Data Center')
pnav = 0
@jsaction(_('Remove selected'))
def delFiles(self, **data):
form = self.request.form
if 'val_id' in form:
child_id = form['val_id']
else:
self.flash(_('No item selected.'), type='danger')
return
if not isinstance(child_id, list):
child_id = [child_id]
deleted = []
for id in child_id:
fullpath = os.path.join(self.context.storage, id)
try:
os.remove(fullpath)
deleted.append(id)
except OSError:
self.flash(_('OSError: The file could not be deleted.'),
type='danger')
return
if len(deleted):
self.flash(_('Successfully deleted: ${a}',
mapping = {'a': ', '.join(deleted)}))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - deleted: %s' % (ob_class, ', '.join(deleted)))
return
class DatacenterFinishedPage(IkobaEditFormPage):
grok.context(IDataCenter)
grok.name('processed')
grok.require('waeup.manageDataCenter')
label = _(u'Processed Files')
pnav = 0
cancel_button =_('Back to Data Center')
def update(self, CANCEL=None):
if CANCEL is not None:
self.redirect(self.url(self.context))
return
return super(DatacenterFinishedPage, self).update()
class DatacenterUploadPage(IkobaPage):
grok.context(IDataCenter)
grok.name('upload')
grok.require('waeup.manageDataCenter')
label = _(u'Upload portal data as CSV file')
pnav = 0
max_files = 20
upload_button =_(u'Upload')
cancel_button =_(u'Back to Data Center')
def getPreviewHeader(self):
"""Get the header fields of uploaded CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def _notifyImportManagers(self, filename,
normalized_filename, importer, import_mode):
"""Send email to Import Managers
"""
# Get information about file
self.fullpath = os.path.join(self.context.storage, normalized_filename)
uploadfile = DataCenterFile(self.fullpath)
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
table = getPreviewTable(self, 3)
mail_table = ''
for line in table:
header = line[0]
data = str(line[1:]).strip('[').strip(']')
mail_table += '%s: %s ...\n' % (line[0], data)
# Collect all recipient addresses
ikoba_utils = getUtility(IIkobaUtils)
import_managers = get_users_with_role(
'waeup.ImportManager', grok.getSite())
rcpt_addrs = ','.join(
[user['user_email'] for user in import_managers if
user['user_email'] is not None])
if rcpt_addrs:
config = grok.getSite()['configuration']
fullname = self.request.principal.title
try:
email = self.request.principal.email
except AttributeError:
email = config.email_admin
username = self.request.principal.id
usertype = getattr(self.request.principal,
'user_type', 'system').title()
rcpt_name = _('Import Manager')
subject = translate(
_('${a}: ${b} uploaded',
mapping = {'a':config.acronym, 'b':filename}),
'waeup.ikoba',
target_language=ikoba_utils.PORTAL_LANGUAGE)
text = _("""File: ${a}
Importer: ${b}
Import Mode: ${c}
Datasets: ${d}
${e}
Comment by Import Manager:""", mapping = {'a':normalized_filename,
'b':importer,
'c':import_mode,
'd':uploadfile.lines - 1,
'e':mail_table})
success = ikoba_utils.sendContactForm(
fullname,email,
rcpt_name,rcpt_addrs,
username,usertype,config.name,
text,subject)
if success:
self.flash(
_('All import managers have been notified by email.'))
else:
self.flash(_('An smtp server error occurred.'), type='danger')
return
def update(self, uploadfile=None, import_mode=None,
importer=None, CANCEL=None, SUBMIT=None):
number_of_pendings = len(self.context.getPendingFiles())
if number_of_pendings > self.max_files:
self.flash(
_('Maximum number of files in the data center exceeded.'),
type='danger')
self.redirect(self.url(self.context))
return
if CANCEL is not None:
self.redirect(self.url(self.context))
return
if not uploadfile:
return
try:
filename = uploadfile.filename
#if 'pending' in filename:
# self.flash(_("You can't re-upload pending data files."), type='danger')
# return
if not filename.endswith('.csv'):
self.flash(_("Only csv files are allowed."), type='danger')
return
normalized_filename = self.getNormalizedFileName(filename)
finished_file = os.path.join(
self.context.storage, 'finished', normalized_filename)
unfinished_file = os.path.join(
self.context.storage, 'unfinished', normalized_filename)
if os.path.exists(finished_file) or os.path.exists(unfinished_file):
self.flash(_("File with same name was uploaded earlier."),
type='danger')
return
target = os.path.join(self.context.storage, normalized_filename)
filecontent = uploadfile.read()
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
logger = self.context.logger
# Forbid certain characters in import files.
invalid_line = check_csv_charset(filecontent.splitlines())
if invalid_line:
self.flash(_(
"Your file contains forbidden characters or "
"has invalid CSV format. "
"First problematic line detected: line %s. "
"Please replace." % invalid_line), type='danger')
logger.info('%s - invalid file uploaded: %s' %
(ob_class, target))
return
open(target, 'wb').write(filecontent)
os.chmod(target, 0664)
logger.info('%s - uploaded: %s' % (ob_class, target))
self._notifyImportManagers(filename,
normalized_filename, importer, import_mode)
except IOError:
self.flash('Error while uploading file. Please retry.', type='danger')
self.flash('I/O error: %s' % sys.exc_info()[1], type='danger')
return
self.redirect(self.url(self.context))
def getNormalizedFileName(self, filename):
"""Build sane filename.
An uploaded file foo.csv will be stored as foo_USERNAME.csv
where username is the principal id of the currently logged in
user.
Spaces in filename are replaced by underscore.
Pending data filenames remain unchanged.
"""
if filename.endswith('.pending.csv'):
return filename
username = self.request.principal.id
filename = filename.replace(' ', '_')
# Only accept typical filname chars...
filtered_username = ''.join(re.findall('[a-zA-Z0-9_\.\-]', username))
base, ext = os.path.splitext(filename)
return '%s_%s%s' % (base, filtered_username, ext.lower())
def getImporters(self):
return getImporters(self.context)
class FileDownloadView(UtilityView, grok.View):
grok.context(IDataCenter)
grok.name('download')
grok.require('waeup.manageDataCenter')
def update(self, filename=None):
self.filename = self.request.form['filename']
return
def render(self):
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - downloaded: %s' % (ob_class, self.filename))
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition:', 'attachment; filename="%s' %
self.filename.replace('finished/',''))
fullpath = os.path.join(self.context.storage, self.filename)
return open(fullpath, 'rb').read()
class DatacenterImportStep1(IkobaPage):
"""Manual import step 1: choose file
"""
grok.context(IDataCenter)
grok.name('import1')
grok.template('datacenterimport1page')
grok.require('waeup.manageDataCenter')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Back to Data Center')
def getFiles(self):
files = self.context.getPendingFiles(sort='date')
for file in files:
name = file.name
if not name.endswith('.csv') and not name.endswith('.pending'):
continue
yield file
def update(self, filename=None, select=None, cancel=None):
if not grok.getSite()['configuration'].maintmode_enabled_by and \
not self.request.principal.id == 'admin':
self.flash(
_('Portal must be in maintenance mode for data import.'),
type='warning')
self.redirect(self.url(self.context))
return
if cancel is not None:
self.redirect(self.url(self.context))
return
if select is not None:
# A filename was selected
session = ISession(self.request)['waeup.ikoba']
session['import_filename'] = select
self.redirect(self.url(self.context, '@@import2'))
return
class DatacenterImportStep2(IkobaPage):
"""Manual import step 2: choose processor
"""
grok.context(IDataCenter)
grok.name('import2')
grok.template('datacenterimport2page')
grok.require('waeup.manageDataCenter')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Cancel')
back_button =_(u'Back to step 1')
proceed_button =_(u'Proceed to step 3')
filename = None
mode = 'create'
importer = None
mode_locked = False
def getPreviewHeader(self):
"""Get the header fields of attached CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def getPreviewTable(self):
return getPreviewTable(self, 3)
def getImporters(self):
importers = getAllUtilitiesRegisteredFor(IBatchProcessor)
ikoba_utils = getUtility(IIkobaUtils)
importers = sorted(
[dict(title=x.name, name=x.util_name) for x in importers
if x.util_name in ikoba_utils.BATCH_PROCESSOR_NAMES])
return importers
def getModeFromFilename(self, filename):
"""Lookup filename or path and return included mode name or None.
"""
if not filename.endswith('.pending.csv'):
return None
base = os.path.basename(filename)
parts = base.rsplit('.', 3)
if len(parts) != 4:
return None
if parts[1] not in ['create', 'update', 'remove']:
return None
return parts[1]
def getWarnings(self):
import sys
result = []
try:
headerfields = self.getPreviewHeader()
headerfields_clean = list(set(headerfields))
if len(headerfields) > len(headerfields_clean):
result.append(
_("Double headers: each column name may only appear once. "))
except:
fatal = '%s' % sys.exc_info()[1]
result.append(fatal)
if result:
warnings = ""
for line in result:
warnings += line + '
'
warnings += _('Replace imported file!')
return warnings
return False
def update(self, mode=None, importer=None,
back1=None, cancel=None, proceed=None):
session = ISession(self.request)['waeup.ikoba']
self.filename = session.get('import_filename', None)
if self.filename is None or back1 is not None:
self.redirect(self.url(self.context, '@@import1'))
return
if cancel is not None:
self.flash(_('Import aborted.'), type='warning')
self.redirect(self.url(self.context))
return
self.mode = mode or session.get('import_mode', self.mode)
filename_mode = self.getModeFromFilename(self.filename)
if filename_mode is not None:
self.mode = filename_mode
self.mode_locked = True
self.importer = importer or session.get('import_importer', None)
session['import_importer'] = self.importer
if self.importer and 'update' in self.importer:
if self.mode != 'update':
self.flash(_('Update mode only!'), type='warning')
self.mode_locked = True
self.mode = 'update'
proceed = None
session['import_mode'] = self.mode
if proceed is not None:
self.redirect(self.url(self.context, '@@import3'))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
warnings = self.getWarnings()
if not warnings:
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
else:
self.reader = ()
self.flash(warnings, type='warning')
class DatacenterImportStep3(IkobaPage):
"""Manual import step 3: modify header
"""
grok.context(IDataCenter)
grok.name('import3')
grok.template('datacenterimport3page')
grok.require('waeup.manageDataCenter')
label = _(u'Process CSV file')
pnav = 0
cancel_button =_(u'Cancel')
reset_button =_(u'Reset')
update_button =_(u'Set headerfields')
back_button =_(u'Back to step 2')
proceed_button =_(u'Perform import')
filename = None
mode = None
importername = None
@property
def nextstep(self):
return self.url(self.context, '@@import4')
def getPreviewHeader(self):
"""Get the header fields of attached CSV file.
"""
reader = csv.reader(open(self.fullpath, 'rb'))
return reader.next()
def getPreviewTable(self):
"""Get transposed table with 1 sample record.
The first column contains the headers.
"""
if not self.reader:
return
headers = self.getPreviewHeader()
num = 0
data = []
for line in self.reader:
if num > 0:
break
num += 1
data.append(line)
result = []
field_num = 0
for name in headers:
result_line = []
result_line.append(field_num)
field_num += 1
for d in data:
result_line.append(d[name])
result.append(result_line)
return result
def getPossibleHeaders(self):
"""Get the possible headers.
The headers are described as dicts {value:internal_name,
title:displayed_name}
"""
result = [dict(title='', value='--IGNORE--')]
headers = self.importer.getHeaders()
result.extend([dict(title=x, value=x) for x in headers])
return result
def getWarnings(self):
import sys
result = []
try:
self.importer.checkHeaders(self.headerfields, mode=self.mode)
except:
fatal = '%s' % sys.exc_info()[1]
result.append(fatal)
if result:
warnings = ""
for line in result:
warnings += line + '
'
warnings += _('Edit headers or replace imported file!')
return warnings
return False
def update(self, headerfield=None, back2=None, cancel=None, proceed=None):
session = ISession(self.request)['waeup.ikoba']
self.filename = session.get('import_filename', None)
self.mode = session.get('import_mode', None)
self.importername = session.get('import_importer', None)
if None in (self.filename, self.mode, self.importername):
self.redirect(self.url(self.context, '@@import2'))
return
if back2 is not None:
self.redirect(self.url(self.context ,'@@import2'))
return
if cancel is not None:
self.flash(_('Import aborted.'), type='warning')
self.redirect(self.url(self.context))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
self.headerfields = headerfield or self.getPreviewHeader()
session['import_headerfields'] = self.headerfields
if proceed is not None:
self.redirect(self.url(self.context, '@@import4'))
return
self.importer = getUtility(IBatchProcessor, name=self.importername)
self.reader = csv.DictReader(open(self.fullpath, 'rb'))
warnings = self.getWarnings()
if warnings:
self.flash(warnings, type='warning')
class DatacenterImportStep4(IkobaPage):
"""Manual import step 4: do actual import
"""
grok.context(IDataCenter)
grok.name('import4')
grok.template('datacenterimport4page')
grok.require('waeup.importData')
label = _(u'Process CSV file')
pnav = 0
back_button =_(u'Process next')
filename = None
mode = None
importername = None
headerfields = None
warnnum = None
def update(self, back=None, finish=None, showlog=None):
if finish is not None:
self.redirect(self.url(self.context, '@@import1'))
return
if not grok.getSite()['configuration'].maintmode_enabled_by and \
not self.request.principal.id == 'admin':
self.flash(
_('File has not been imported. '
'Portal must be in maintenance mode for data import.'),
type='danger')
self.redirect(self.url(self.context))
return
session = ISession(self.request)['waeup.ikoba']
self.filename = session.get('import_filename', None)
self.mode = session.get('import_mode', None)
self.importername = session.get('import_importer', None)
# If the import file contains only one column
# the import_headerfields attribute is a string.
ihf = session.get('import_headerfields', None)
if not isinstance(ihf, list):
self.headerfields = ihf.split()
else:
self.headerfields = ihf
if None in (self.filename, self.mode, self.importername,
self.headerfields):
self.redirect(self.url(self.context, '@@import3'))
return
if showlog is not None:
logfilename = "datacenter.log"
session['logname'] = logfilename
self.redirect(self.url(self.context, '@@show'))
return
self.fullpath = os.path.join(self.context.storage, self.filename)
self.importer = getUtility(IBatchProcessor, name=self.importername)
# Perform batch processing...
# XXX: This might be better placed in datacenter module.
(linenum, self.warn_num,
fin_path, pending_path) = self.importer.doImport(
self.fullpath, self.headerfields, self.mode,
self.request.principal.id, logger=self.context.logger)
# Put result files in desired locations...
self.context.distProcessedFiles(
self.warn_num == 0, self.fullpath, fin_path, pending_path,
self.mode)
# Disable maintenance mode if set.
if grok.getSite()['configuration'].maintmode_enabled_by:
grok.getSite()['configuration'].maintmode_enabled_by = None
self.flash(_('Maintenance mode disabled.'))
if self.warn_num:
self.flash(_('Processing of ${a} rows failed.',
mapping = {'a':self.warn_num}), type='warning')
self.flash(_('Successfully processed ${a} rows.',
mapping = {'a':linenum - self.warn_num}))
class DatacenterLogsOverview(IkobaPage):
grok.context(IDataCenter)
grok.name('logs')
grok.template('datacenterlogspage')
grok.require('waeup.manageDataCenter')
label = _(u'Show logfiles')
pnav = 0
back_button = _(u'Back to Data Center')
show_button = _(u'Show')
def update(self, back=None):
if back is not None:
self.redirect(self.url(self.context))
return
self.files = self.context.getLogFiles()
class DatacenterLogsFileview(IkobaPage):
grok.context(IDataCenter)
grok.name('show')
grok.template('datacenterlogsshowfilepage')
grok.require('waeup.manageDataCenter')
title = _(u'Data Center')
pnav = 0
search_button = _('Search')
back_button = _('Back to Data Center')
placeholder = _('Enter a regular expression here...')
def label(self):
return "Logfile %s" % self.filename
def update(self, back=None, query=None, logname=None):
if os.name != 'posix':
self.flash(
_('Log files can only be searched ' +
'on Unix-based operating systems.'), type='danger')
self.redirect(self.url(self.context, '@@logs'))
return
if back is not None or logname is None:
self.redirect(self.url(self.context, '@@logs'))
return
self.filename = logname
self.query = query
if not query:
return
try:
self.result = cgi.escape(
''.join(self.context.queryLogfiles(logname, query)))
except ValueError:
self.flash(_('Invalid search expression.'), type='danger')
return
if not self.result:
self.flash(_('No search results found.'), type='warning')
return
class DatacenterSettings(IkobaPage):
grok.context(IDataCenter)
grok.name('manage')
grok.template('datacentermanagepage')
grok.require('waeup.managePortal')
label = _('Edit data center settings')
pnav = 0
save_button =_(u'Save')
reset_button =_(u'Reset')
cancel_button =_(u'Back to Data Center')
def update(self, newpath=None, move=False, overwrite=False,
save=None, cancel=None):
if move:
move = True
if overwrite:
overwrite = True
if newpath is None:
return
if cancel is not None:
self.redirect(self.url(self.context))
return
try:
not_copied = self.context.setStoragePath(newpath, move=move)
for name in not_copied:
self.flash(_('File already existed (not copied): ${a}',
mapping = {'a':name}), type='danger')
except:
self.flash(_('Given storage path cannot be used: ${a}',
mapping = {'a':sys.exc_info()[1]}), type='danger')
return
if newpath:
self.flash(_('New storage path succefully set.'))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - storage path set: %s' % (ob_class, newpath))
self.redirect(self.url(self.context))
return
class ExportCSVPage(IkobaPage):
grok.context(IDataCenter)
grok.name('export')
grok.template('datacenterexportpage')
grok.require('waeup.exportData')
label = _('Download portal data as CSV file')
pnav = 0
export_button = _(u'Create CSV file')
cancel_button =_(u'Back to Data Center')
def getExporters(self):
exporter_utils = getUtilitiesFor(ICSVExporter)
ikoba_utils = getUtility(IIkobaUtils)
title_name_tuples = [
(util.title, name) for name, util in exporter_utils
if name in ikoba_utils.EXPORTER_NAMES]
return sorted(title_name_tuples)
def update(self, CREATE=None, DISCARD=None, PURGE=None, exporter=None,
job_id=None, CANCEL=None):
if CANCEL is not None:
self.redirect(self.url(self.context))
return
if CREATE:
utils = queryUtility(IIkobaUtils)
if utils.export_disabled_message():
self.flash(utils.export_disabled_message(), type='danger')
self.entries = doll_up(self, user=None)
return
if running_jobs(self):
self.flash(_(
"Sorry, another export job is running. "
"Please try again later."), type='danger')
self.entries = doll_up(self, user=None)
return
job_id = self.context.start_export_job(
exporter, self.request.principal.id)
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - exported: %s, job_id=%s' % (ob_class, exporter, job_id))
if DISCARD and job_id:
entry = self.context.entry_from_job_id(job_id)
self.context.delete_export_entry(entry)
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - discarded: job_id=%s' % (ob_class, job_id))
self.flash(_('Discarded export') + ' %s' % job_id)
self.entries = doll_up(self, user=None, purge=PURGE)
if PURGE:
self.flash(_('Exports successfully purged.'))
return
class ExportCSVView(grok.View):
grok.context(IDataCenter)
grok.name('download_export')
grok.require('waeup.exportData')
def render(self, job_id=None):
manager = getUtility(IJobManager)
job = manager.get(job_id)
if job is None:
return
if hasattr(job.result, 'traceback'):
# XXX: Some error happened. Do something more approriate here...
return
path = job.result
if not os.path.exists(path):
# XXX: Do something more appropriate here...
return
result = open(path, 'rb').read()
acronym = grok.getSite()['configuration'].acronym.replace(' ','')
filename = "%s_%s" % (acronym, os.path.basename(path))
filename = filename.replace('.csv', '_%s.csv' % job_id)
self.response.setHeader(
'Content-Type', 'text/csv; charset=UTF-8')
self.response.setHeader(
'Content-Disposition', 'attachment; filename="%s' % filename)
# remove job and running_exports entry from context
#self.context.delete_export_entry(
# self.context.entry_from_job_id(job_id))
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - downloaded: %s, job_id=%s' % (ob_class, filename, job_id))
return result
class SwitchMaintModePage(UtilityView, grok.View):
"""Import managers must disable authentication for all other users
before starting an import. This pages switches maintenance mode
on and off.
"""
grok.context(IDataCenter)
grok.name('switchmaintmode')
grok.require('waeup.importData')
def update(self):
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
if grok.getSite()['configuration'].maintmode_enabled_by:
grok.getSite()['configuration'].maintmode_enabled_by = None
self.context.logger.info('%s - maintmode disabled' % ob_class)
self.flash(_('Maintenance mode disabled.'))
self.redirect(self.url(self.context))
else:
grok.getSite()['configuration'].maintmode_enabled_by = unicode(
self.request.principal.id)
self.context.logger.info('%s - maintmode enabled' % ob_class)
self.flash(_('Maintenance mode enabled.'), type='warning')
self.redirect(self.url(self.context, 'import1'))
return
def render(self):
return
class ChangePasswordRequestPage(IkobaFormPage):
"""Captcha'd page for all kind of users to request a password change.
"""
grok.context(ICompany)
grok.name('changepw')
grok.require('waeup.Anonymous')
grok.template('changepw')
label = _('Send me a new password')
form_fields = grok.AutoFields(IChangePassword)
def update(self):
# Handle captcha
self.captcha = getUtility(ICaptchaManager).getCaptcha()
self.captcha_result = self.captcha.verify(self.request)
self.captcha_code = self.captcha.display(self.captcha_result.error_code)
return
def _searchUser(self, email):
# Search customer
cat = queryUtility(ICatalog, name='customers_catalog')
results = list(cat.searchResults(email=(email,email)))
if len(results) == 1:
return results[0]
# Search portal user
users = grok.getSite()['users'].values()
results = []
for user in users:
if user.email == email:
results.append(user)
if len(results) == 1:
return results[0]
return None
@action(_('Send login credentials to email address'), style='primary')
def request(self, **data):
if not self.captcha_result.is_valid:
# Captcha will display error messages automatically.
# No need to flash something.
return
# Search customer
email = data['email']
user = self._searchUser(email)
if user is None:
self.flash(_('No record found.'), type='warning')
return
# Change password
ikoba_utils = getUtility(IIkobaUtils)
password = ikoba_utils.genPassword()
mandate = PasswordMandate()
mandate.params['password'] = password
mandate.params['user'] = user
site = grok.getSite()
site['mandates'].addMandate(mandate)
# Send email with credentials
args = {'mandate_id':mandate.mandate_id}
mandate_url = self.url(site) + '/mandate?%s' % urlencode(args)
url_info = u'Confirmation link: %s' % mandate_url
msg = _('You have successfully requested a password for the')
success = ikoba_utils.sendCredentials(
IUserAccount(user),password,url_info,msg)
if success:
self.flash(_('An email with your user name and password ' +
'has been sent to ${a}.', mapping = {'a':email}))
else:
self.flash(_('An smtp server error occurred.'), type='danger')
ob_class = self.__implemented__.__name__.replace('waeup.ikoba.','')
self.context.logger.info(
'%s - %s - %s' % (ob_class, IUserAccount(user).name, data['email']))
return