source: main/waeup.kofa/branches/henrik-transcript-workflow/src/waeup/kofa/tests/test_authentication.py @ 16833

Last change on this file since 16833 was 14670, checked in by uli, 8 years ago

Add IKofaPluggable to update local PAU.

The new plugin enables updating of sites that have
yet no XMLRPC authentication enabled.

The plugin can be removed after updating all sites.

New sites (university-instances) do not need this
plugin at all.

  • Property svn:keywords set to Id
File size: 15.6 KB
Line 
1## $Id: test_authentication.py 14670 2017-04-06 12:20:41Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import grok
19import logging
20import time
21import unittest
22from cStringIO import StringIO
23from zope.component import getGlobalSiteManager, queryUtility
24from zope.component.hooks import setSite, clearSite
25from zope.interface.verify import verifyClass, verifyObject
26from zope.password.testing import setUpPasswordManagers
27from zope.pluggableauth import PluggableAuthentication
28from zope.pluggableauth.interfaces import (
29    IAuthenticatorPlugin, ICredentialsPlugin)
30from zope.publisher.browser import TestRequest
31from zope.securitypolicy.interfaces import IPrincipalRoleManager
32from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
33from waeup.kofa.app import University
34from waeup.kofa.authentication import (
35    UserAuthenticatorPlugin, Account, KofaPrincipalInfo, FailedLoginInfo,
36    get_principal_role_manager, UsersPlugin, KofaXMLRPCCredentialsPlugin,
37    setup_authentication, UpdatePAUPlugin)
38from waeup.kofa.interfaces import (
39    IAuthPluginUtility, IUserAccount, IFailedLoginInfo, IKofaPrincipalInfo,
40    IKofaPluggable)
41
42
43class FakeSite(grok.Site, grok.Container):
44    #def getSiteManager(self):
45    #    return None
46    #    return getGlobalSiteManager()
47    pass
48
49
50class FakeAuthPlugin(object):
51    def register(self, pau):
52        pau.credentialsPlugins += ('foo', )
53
54
55class Test_setup_authentication(FunctionalTestCase):
56    # Tests for the `setup_authentication` function
57
58    layer = FunctionalLayer
59
60    def tearDown(self):
61        # clean up registry.
62        gsm = getGlobalSiteManager()
63        for iface, name in ((IAuthPluginUtility, 'myauth'), ):
64            to_delete = queryUtility(iface, name=name)
65            if to_delete is not None:
66                gsm.unregisterUtility(provided=iface, name=name)
67        super(Test_setup_authentication, self).tearDown()
68
69    def test_plugins_are_registered(self):
70        # We can populate a PAU with (hardcoded set of) plugins
71        pau = PluggableAuthentication()
72        setup_authentication(pau)
73        for name in (
74                'No Challenge if Authenticated',
75                'xmlrpc-credentials',
76                'credentials'):
77            assert name in pau.credentialsPlugins
78        for name in ('users', ):
79            assert name in pau.authenticatorPlugins
80
81    def test_external_plugins_are_registered(self):
82        # registered plugins are called as well
83        gsm = getGlobalSiteManager()
84        gsm.registerUtility(
85            FakeAuthPlugin(), IAuthPluginUtility, name='myauth')
86        pau = PluggableAuthentication()
87        setup_authentication(pau)
88        assert 'foo' in pau.credentialsPlugins
89
90
91class KofaXMLRPCCredentialsPluginTests(FunctionalTestCase):
92    # Test for XMLRPC credentials plugin
93
94    layer = FunctionalLayer
95
96    def test_ifaces(self):
97        # we meet interface requirements
98        plugin = KofaXMLRPCCredentialsPlugin()
99        self.assertTrue(
100            verifyClass(ICredentialsPlugin, KofaXMLRPCCredentialsPlugin))
101
102    def test_util_is_registered(self):
103        # we can query this named utility
104        util = queryUtility(ICredentialsPlugin, name='xmlrpc-credentials')
105        assert util is not None
106
107    def test_can_extract_creds(self):
108        # we can extract credentials from appropriate requests
109        req = TestRequest(
110            environ={'HTTP_AUTHORIZATION': u'Basic bWdyOm1ncnB3'})
111        plugin = KofaXMLRPCCredentialsPlugin()
112        assert plugin.extractCredentials(req) == {
113            'login': 'mgr', 'password': 'mgrpw'}
114
115    def test_challenge_disabled(self):
116        # we will not challenge people
117        plugin = KofaXMLRPCCredentialsPlugin()
118        assert plugin.challenge(TestRequest()) is False
119
120    def test_logout_disabled(self):
121        # we do not support logging out. HTTP basic auth cannot do this.
122        plugin = KofaXMLRPCCredentialsPlugin()
123        assert plugin.logout(TestRequest()) is False
124
125
126class UserAuthenticatorPluginTests(FunctionalTestCase):
127    # Must be functional because of various utility lookups and the like
128
129    layer = FunctionalLayer
130
131    def setUp(self):
132        super(UserAuthenticatorPluginTests, self).setUp()
133        self.getRootFolder()['app'] = FakeSite()
134        self.site = self.getRootFolder()['app']
135        self.site['users'] = {'bob': Account('bob', 'secret')}
136        setSite(self.site)
137        return
138
139    def tearDown(self):
140        super(UserAuthenticatorPluginTests, self).tearDown()
141        clearSite()
142        return
143
144    def test_ifaces(self):
145        # make sure, interfaces requirements are met
146        plugin = UserAuthenticatorPlugin()
147        plugin.__parent__ = None # This attribute is required by iface
148        self.assertTrue(
149            verifyClass(IAuthenticatorPlugin, UserAuthenticatorPlugin))
150        self.assertTrue(verifyObject(IAuthenticatorPlugin, plugin))
151        return
152
153    def test_authenticate_credentials(self):
154        # make sure authentication works as expected
155        plugin = UserAuthenticatorPlugin()
156        result1 = plugin.authenticateCredentials(
157            dict(login='bob', password='secret'))
158        result2 = plugin.authenticateCredentials(
159            dict(login='bob', password='nonsense'))
160        self.assertTrue(isinstance(result1, KofaPrincipalInfo))
161        self.assertTrue(result2 is None)
162        return
163
164    def test_principal_info(self):
165        # make sure we can get a principal info
166        plugin = UserAuthenticatorPlugin()
167        result1 = plugin.principalInfo('bob')
168        result2 = plugin.principalInfo('manfred')
169        self.assertTrue(isinstance(result1, KofaPrincipalInfo))
170        self.assertTrue(result2 is None)
171        return
172
173    def test_get_principal_role_manager(self):
174        # make sure we get different role managers for different situations
175        prm1 = get_principal_role_manager()
176        clearSite(None)
177        prm2 = get_principal_role_manager()
178        self.assertTrue(IPrincipalRoleManager.providedBy(prm1))
179        self.assertTrue(IPrincipalRoleManager.providedBy(prm2))
180        self.assertTrue(prm1._context is self.site)
181        self.assertTrue(hasattr(prm2, '_context') is False)
182        return
183
184    def make_failed_logins(self, num):
185        # do `num` failed logins and a valid one afterwards
186        del self.site['users']
187        self.site['users'] = {'bob': Account('bob', 'secret')}
188        plugin = UserAuthenticatorPlugin()
189        resultlist = []
190        # reset accounts
191        for x in range(num):
192            resultlist.append(plugin.authenticateCredentials(
193                dict(login='bob', password='wrongsecret')))
194        resultlist.append(plugin.authenticateCredentials(
195            dict(login='bob', password='secret')))
196        return resultlist
197
198    def DISABLED_test_failed_logins(self):
199        # after three failed logins, an account is blocked
200        # XXX: this tests authenticator with time penalty (currently
201        # disabled)
202        results = []
203        succ_principal = KofaPrincipalInfo(
204            id='bob',
205            title='bob',
206            description=None,
207            email=None,
208            phone=None,
209            public_name=None,
210            user_type=u'user')
211        for x in range(4):
212            results.append(self.make_failed_logins(x))
213        self.assertEqual(results[2], [None, None, succ_principal])
214        # last login was blocked although correctly entered due to
215        # time penalty
216        self.assertEqual(results[3], [None, None, None, None])
217        return
218
219class KofaPrincipalInfoTests(unittest.TestCase):
220
221    def create_info(self):
222        return KofaPrincipalInfo(
223            id='bob',
224            title='bob',
225            description=None,
226            email=None,
227            phone=None,
228            public_name=None,
229            user_type=u'user')
230
231    def test_iface(self):
232        # make sure we implement the promised interfaces
233        info = self.create_info()
234        verifyClass(IKofaPrincipalInfo, KofaPrincipalInfo)
235        verifyObject(IKofaPrincipalInfo, info)
236        return
237
238    def test_equality(self):
239        # we can test two infos for equality
240        info1 = self.create_info()
241        info2 = self.create_info()
242        self.assertEqual(info1, info2)
243        self.assertTrue(info1 == info2)
244        info1.id = 'blah'
245        self.assertTrue(info1 != info2)
246        self.assertTrue((info1 == info2) is False)
247        info1.id = 'bob'
248        info2.id = 'blah'
249        self.assertTrue(info1 != info2)
250        self.assertTrue((info1 == info2) is False)
251        return
252
253class FailedLoginInfoTests(unittest.TestCase):
254
255    def test_iface(self):
256        # make sure we fullfill the promised interfaces
257        info1 = FailedLoginInfo()
258        info2 = FailedLoginInfo(num=1, last=time.time())
259        self.assertTrue(
260            verifyClass(IFailedLoginInfo, FailedLoginInfo))
261        self.assertTrue(verifyObject(IFailedLoginInfo, info1))
262        # make sure the stored values have correct type if not None
263        self.assertTrue(verifyObject(IFailedLoginInfo, info2))
264        return
265
266    def test_default_values(self):
267        # By default we get 0, None
268        info = FailedLoginInfo()
269        self.assertEqual(info.num, 0)
270        self.assertEqual(info.last, None)
271        return
272
273    def test_set_values_by_attribute(self):
274        # we can set values by attribute
275        ts = time.gmtime(0)
276        info = FailedLoginInfo()
277        info.num = 5
278        info.last = ts
279        self.assertEqual(info.num, 5)
280        self.assertEqual(info.last, ts)
281        return
282
283    def test_set_values_by_constructor(self):
284        # we can set values by constructor args
285        ts = time.gmtime(0)
286        info = FailedLoginInfo(5, ts)
287        self.assertEqual(info.num, 5)
288        self.assertEqual(info.last, ts)
289        return
290
291    def test_set_values_by_keywords(self):
292        # we can set values by constructor keywords
293        ts = time.gmtime(0)
294        info = FailedLoginInfo(last=ts, num=3)
295        self.assertEqual(info.num, 3)
296        self.assertEqual(info.last, ts)
297        return
298
299    def test_as_tuple(self):
300        # we can get the info values as tuple
301        ts = time.gmtime(0)
302        info = FailedLoginInfo(last=ts, num=3)
303        self.assertEqual(info.as_tuple(), (3, ts))
304        return
305
306    def test_set_values(self):
307        # we can set the values of a an info instance
308        ts = time.time()
309        info = FailedLoginInfo()
310        info.set_values(num=3, last=ts)
311        self.assertEqual(info.num, 3)
312        self.assertEqual(info.last, ts)
313        return
314
315    def test_increase(self):
316        # we can increase the number of failed logins
317        ts1 = time.time()
318        info = FailedLoginInfo()
319        info.increase()
320        self.assertEqual(info.num, 1)
321        self.assertTrue(info.last > ts1)
322        ts2 = info.last
323        info.increase()
324        self.assertEqual(info.num, 2)
325        self.assertTrue(info.last > ts2)
326        return
327
328    def test_reset(self):
329        # we can reset failed login infos.
330        info = FailedLoginInfo()
331        info.increase()
332        info.reset()
333        self.assertEqual(info.num, 0)
334        self.assertEqual(info.last, None)
335        return
336
337class AccountTests(unittest.TestCase):
338
339    def setUp(self):
340        setUpPasswordManagers()
341        return
342
343    def test_iface(self):
344        acct = Account('bob', 'mypasswd')
345        self.assertTrue(
346            verifyClass(IUserAccount, Account))
347        self.assertTrue(
348            verifyObject(IUserAccount, acct))
349        return
350
351    def test_failed_logins(self):
352        # we can retrieve infos about failed logins
353        ts = time.time()
354        acct = Account('bob', 'mypasswd')
355        self.assertTrue(hasattr(acct, 'failed_logins'))
356        acct.failed_logins.set_values(num=3, last=ts)
357        self.assertEqual(acct.failed_logins.last, ts)
358        self.assertEqual(acct.failed_logins.num, 3)
359        return
360
361    def test_failed_logins_per_inst(self):
362        # we get a different counter for each Account instance
363        acct1 = Account('bob', 'secret')
364        acct2 = Account('alice', 'alsosecret')
365        self.assertTrue(acct1.failed_logins is not acct2.failed_logins)
366        return
367
368class FakeUserAccount(object):
369    pass
370
371
372def get_logger():
373    logger = logging.getLogger('waeup.test')
374    stream = StringIO()
375    handler = logging.StreamHandler(stream)
376    logger.setLevel(logging.DEBUG)
377    logger.propagate = False
378    logger.addHandler(handler)
379    return logger, stream
380
381
382class UsersPluginTests(unittest.TestCase):
383
384    def setUp(self):
385        setUpPasswordManagers()
386        self.site = FakeSite()
387        self.site['users'] = grok.Container()
388        return
389
390    def test_ifaces(self):
391        # make sure we implement the promised interfaces
392        plugin = UsersPlugin()
393        verifyClass(IKofaPluggable, UsersPlugin)
394        verifyObject(IKofaPluggable, plugin)
395        return
396
397    def test_update(self):
398        # make sure user accounts are updated properly.
399        plugin = UsersPlugin()
400        logger, stream = get_logger()
401        plugin.update(self.site, 'app', logger)
402        stream.seek(0)
403        self.assertEqual(stream.read(), '')
404        self.site['users']['bob'] = FakeUserAccount()
405        logger, stream = get_logger()
406        plugin.update(self.site, 'app', logger)
407        stream.seek(0)
408        log_content = stream.read()
409        self.assertTrue(hasattr(self.site['users']['bob'], 'description'))
410        self.assertTrue(hasattr(self.site['users']['bob'], 'failed_logins'))
411        self.assertTrue(
412            isinstance(self.site['users']['bob'].failed_logins,
413                       FailedLoginInfo))
414        self.assertTrue('attribute description added' in log_content)
415        self.assertTrue('attribute failed_logins added' in log_content)
416        return
417
418
419class TestUpdatePAUPlugin(FunctionalTestCase):
420
421    layer = FunctionalLayer
422
423    def setUp(self):
424        super(TestUpdatePAUPlugin, self).setUp()
425        self.getRootFolder()['app'] = University()
426        self.site = self.getRootFolder()['app']
427
428    def tearDown(self):
429        clearSite()
430        super(TestUpdatePAUPlugin, self).tearDown()
431
432    def get_pau(self):
433        # the PAU is registered as a local utility in local site manager.
434        # the name is derived from class name.
435        pau = self.site.getSiteManager()['PluggableAuthentication']
436        assert pau is not None
437        return pau
438
439    def test_update_outdated(self):
440        # we can update outdated sites.
441        plugin = UpdatePAUPlugin()
442        logger, stream = get_logger()
443        pau = self.get_pau()
444        pau.credentialsPlugins = ('foo', 'credentials', 'bar')
445        plugin.update(self.site, 'xmlrpc-credentials', logger)
446        assert 'xmlrpc-credentials' in pau.credentialsPlugins
447        assert pau.credentialsPlugins.index('xmlrpc-credentials') == 1
448
449    def test_update_uptodate(self):
450        # we cope with already updated sites.
451        plugin = UpdatePAUPlugin()
452        logger, stream = get_logger()
453        pau = self.get_pau()
454        pau.credentialsPlugins = ('foo', 'xmlrpc-credentials', 'bar')
455        plugin.update(self.site, 'xmlrpc-credentials', logger)
456        assert pau.credentialsPlugins.count('xmlrpc-credentials') == 1
Note: See TracBrowser for help on using the repository browser.