Changeset 10055
- Timestamp:
- 4 Apr 2013, 15:12:43 (12 years ago)
- Location:
- main/waeup.kofa/trunk/src/waeup/kofa
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/authentication.py
r9706 r10055 19 19 """ 20 20 import grok 21 import time 21 22 from zope.event import notify 22 23 from zope.component import getUtility, getUtilitiesFor 23 24 from zope.component.interfaces import IFactory 24 from zope.interface import Interface 25 from zope.interface import Interface, implementedBy 25 26 from zope.schema import getFields 26 27 from zope.securitypolicy.interfaces import ( … … 37 38 IUserAccount, IAuthPluginUtility, IPasswordValidator, 38 39 IKofaPrincipal, IKofaPrincipalInfo, IKofaPluggable, 39 IBatchProcessor, IGNORE_MARKER )40 IBatchProcessor, IGNORE_MARKER, IFailedLoginInfo) 40 41 from waeup.kofa.utils.batching import BatchProcessor 41 42 … … 84 85 grok.implements(IKofaPrincipalInfo) 85 86 86 def __init__(self, id, title, description, email, phone, public_name, user_type): 87 def __init__(self, id, title, description, email, phone, public_name, 88 user_type): 87 89 self.id = id 88 90 self.title = title … … 95 97 self.authenticatorPlugin = None 96 98 99 def __eq__(self, obj): 100 default = object() 101 result = [] 102 for name in ('id', 'title', 'description', 'email', 'phone', 103 'public_name', 'user_type', 'credentialsPlugin', 104 'authenticatorPlugin'): 105 result.append( 106 getattr(self, name) == getattr(obj, name, default)) 107 return False not in result 108 97 109 class KofaPrincipal(Principal): 98 110 """A portal principal. 99 111 100 Kofa principals provide an extra `email`, `phone`, `public_name` and `user_type`101 a ttribute extending ordinary principals.112 Kofa principals provide an extra `email`, `phone`, `public_name` 113 and `user_type` attribute extending ordinary principals. 102 114 """ 103 115 … … 151 163 return principal 152 164 165 class FailedLoginInfo(grok.Model): 166 grok.implements(IFailedLoginInfo) 167 168 def __init__(self, num=0, last=None): 169 self.num = num 170 self.last = last 171 return 172 173 def as_tuple(self): 174 return (self.num, self.last) 175 176 def set_values(self, num=0, last=None): 177 self.num, self.last = num, last 178 self._p_changed = True 179 pass 180 181 def increase(self): 182 self.set_values(num=self.num + 1, last=time.time()) 183 pass 184 185 def reset(self): 186 self.set_values(num=0, last=None) 187 pass 188 153 189 class Account(grok.Model): 190 """Kofa user accounts store infos about a user. 191 192 Beside the usual data and an (encrypted) password, accounts also 193 have a persistent attribute `failed_logins` which is an instance 194 of `waeup.kofa.authentication.FailedLoginInfo`. 195 196 This attribute can be manipulated directly (set new value, 197 increase values, or reset). 198 """ 154 199 grok.implements(IUserAccount) 155 156 _local_roles = dict()157 200 158 201 def __init__(self, name, password, title=None, description=None, … … 168 211 self.setPassword(password) 169 212 self.setSiteRolesForPrincipal(roles) 213 170 214 # We don't want to share this dict with other accounts 171 215 self._local_roles = dict() 216 self.failed_logins = FailedLoginInfo() 172 217 173 218 def setPassword(self, password): … … 235 280 if account is None: 236 281 return None 282 # The following shows how 'time penalties' could be enforced 283 # on failed logins. First three failed logins are 'for 284 # free'. After that the user has to wait for 1, 2, 4, 8, 16, 285 # 32, ... seconds before a login can succeed. 286 # There are, however, some problems to discuss, before we 287 # really use this in all authenticators. 288 289 #num, last = account.failed_logins.as_tuple() 290 #if (num > 2) and (time.time() < (last + 2**(num-3))): 291 # # tried login while account still blocked due to previous 292 # # login errors. 293 # return None 237 294 if not account.checkPassword(credentials['password']): 295 #account.failed_logins.increase() 238 296 return None 239 297 return KofaPrincipalInfo( … … 458 516 """A plugin that updates users. 459 517 """ 460 461 518 grok.implements(IKofaPluggable) 462 519 grok.name('users') … … 478 535 'UsersPlugin: %s attribute %s added.' % ( 479 536 user.name,i[0])) 537 if not hasattr(user, 'failed_logins'): 538 # add attribute `failed_logins`... 539 user.failed_logins = FailedLoginInfo() 540 logger.info( 541 'UsersPlugin: attribute failed_logins added.') 480 542 # Remove deprecated attributes 481 543 for i in self.deprecated_attributes: -
main/waeup.kofa/trunk/src/waeup/kofa/interfaces.py
r9816 r10055 446 446 required = False,) 447 447 448 class IFailedLoginInfo(IKofaObject): 449 """Info about failed logins. 450 451 Timestamps are supposed to be stored as floats using time.time() 452 or similar. 453 """ 454 num = schema.Int( 455 title = _(u'Number of failed logins'), 456 description = _(u'Number of failed logins'), 457 required = True, 458 default = 0, 459 ) 460 461 last = schema.Float( 462 title = _(u'Timestamp'), 463 description = _(u'Timestamp of last failed login or `None`'), 464 required = False, 465 default = None, 466 ) 467 468 def as_tuple(): 469 """Get login info as tuple ``<NUM>, <TIMESTAMP>``. 470 """ 471 472 def set_values(num=0, last=None): 473 """Set number of failed logins and timestamp of last one. 474 """ 475 476 def increase(): 477 """Increase the current number of failed logins and set timestamp. 478 """ 479 480 def reset(): 481 """Set failed login counters back to zero. 482 """ 483 484 448 485 class IUserAccount(IKofaObject): 449 486 """A user account. 450 487 """ 488 489 failed_logins = Attribute("""FailedLoginInfo for this account""") 490 451 491 name = schema.TextLine( 452 492 title = _(u'User Id'), … … 486 526 required = False, 487 527 ) 528 529 488 530 489 531 class IPasswordValidator(Interface): -
main/waeup.kofa/trunk/src/waeup/kofa/students/authentication.py
r9334 r10055 20 20 """ 21 21 import grok 22 import time 22 23 from zope.component import getUtility 23 24 from zope.password.interfaces import IPasswordManager … … 29 30 from zope.session.interfaces import ISession 30 31 from waeup.kofa.authentication import ( 31 KofaPrincipalInfo, get_principal_role_manager )32 KofaPrincipalInfo, get_principal_role_manager, FailedLoginInfo) 32 33 from waeup.kofa.interfaces import ( 33 34 IAuthPluginUtility, IUserAccount, IPasswordValidator) … … 69 70 def description(self): 70 71 return self.title 72 73 @property 74 def failed_logins(self): 75 if not hasattr(self.context, 'failed_logins'): 76 self.context.failed_logins = FailedLoginInfo() 77 return self.context.failed_logins 71 78 72 79 def _get_roles(self): -
main/waeup.kofa/trunk/src/waeup/kofa/tests/test_authentication.py
r8427 r10055 17 17 ## 18 18 import grok 19 import logging 20 import time 19 21 import unittest 22 from cStringIO import StringIO 23 from zope.component import getGlobalSiteManager 20 24 from zope.component.hooks import setSite, clearSite 21 25 from zope.interface.verify import verifyClass, verifyObject 26 from zope.password.testing import setUpPasswordManagers 22 27 from zope.pluggableauth.interfaces import IAuthenticatorPlugin 23 28 from zope.securitypolicy.interfaces import IPrincipalRoleManager 24 29 from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer 25 30 from waeup.kofa.authentication import ( 26 UserAuthenticatorPlugin, Account, KofaPrincipalInfo, 27 get_principal_role_manager) 31 UserAuthenticatorPlugin, Account, KofaPrincipalInfo, FailedLoginInfo, 32 get_principal_role_manager, UsersPlugin,) 33 from waeup.kofa.interfaces import ( 34 IUserAccount, IFailedLoginInfo, IKofaPrincipalInfo, IKofaPluggable) 28 35 29 36 class FakeSite(grok.Site, grok.Container): 37 #def getSiteManager(self): 38 # return None 39 # return getGlobalSiteManager() 30 40 pass 31 41 … … 87 97 self.assertTrue(hasattr(prm2, '_context') is False) 88 98 return 99 100 def make_failed_logins(self, num): 101 # do `num` failed logins and a valid one afterwards 102 del self.site['users'] 103 self.site['users'] = {'bob': Account('bob', 'secret')} 104 plugin = UserAuthenticatorPlugin() 105 resultlist = [] 106 # reset accounts 107 for x in range(num): 108 resultlist.append(plugin.authenticateCredentials( 109 dict(login='bob', password='wrongsecret'))) 110 resultlist.append(plugin.authenticateCredentials( 111 dict(login='bob', password='secret'))) 112 return resultlist 113 114 def DISABLED_test_failed_logins(self): 115 # after three failed logins, an account is blocked 116 # XXX: this tests authenticator with time penalty (currently 117 # disabled) 118 results = [] 119 succ_principal = KofaPrincipalInfo( 120 id='bob', 121 title='bob', 122 description=None, 123 email=None, 124 phone=None, 125 public_name=None, 126 user_type=u'user') 127 for x in range(4): 128 results.append(self.make_failed_logins(x)) 129 self.assertEqual(results[2], [None, None, succ_principal]) 130 # last login was blocked although correctly entered due to 131 # time penalty 132 self.assertEqual(results[3], [None, None, None, None]) 133 return 134 135 class KofaPrincipalInfoTests(unittest.TestCase): 136 137 def create_info(self): 138 return KofaPrincipalInfo( 139 id='bob', 140 title='bob', 141 description=None, 142 email=None, 143 phone=None, 144 public_name=None, 145 user_type=u'user') 146 147 def test_iface(self): 148 # make sure we implement the promised interfaces 149 info = self.create_info() 150 verifyClass(IKofaPrincipalInfo, KofaPrincipalInfo) 151 verifyObject(IKofaPrincipalInfo, info) 152 return 153 154 def test_equality(self): 155 # we can test two infos for equality 156 info1 = self.create_info() 157 info2 = self.create_info() 158 self.assertEqual(info1, info2) 159 self.assertTrue(info1 == info2) 160 info1.id = 'blah' 161 self.assertTrue(info1 != info2) 162 self.assertTrue((info1 == info2) is False) 163 info1.id = 'bob' 164 info2.id = 'blah' 165 self.assertTrue(info1 != info2) 166 self.assertTrue((info1 == info2) is False) 167 return 168 169 class FailedLoginInfoTests(unittest.TestCase): 170 171 def test_iface(self): 172 # make sure we fullfill the promised interfaces 173 info1 = FailedLoginInfo() 174 info2 = FailedLoginInfo(num=1, last=time.time()) 175 self.assertTrue( 176 verifyClass(IFailedLoginInfo, FailedLoginInfo)) 177 self.assertTrue(verifyObject(IFailedLoginInfo, info1)) 178 # make sure the stored values have correct type if not None 179 self.assertTrue(verifyObject(IFailedLoginInfo, info2)) 180 return 181 182 def test_default_values(self): 183 # By default we get 0, None 184 info = FailedLoginInfo() 185 self.assertEqual(info.num, 0) 186 self.assertEqual(info.last, None) 187 return 188 189 def test_set_values_by_attribute(self): 190 # we can set values by attribute 191 ts = time.gmtime(0) 192 info = FailedLoginInfo() 193 info.num = 5 194 info.last = ts 195 self.assertEqual(info.num, 5) 196 self.assertEqual(info.last, ts) 197 return 198 199 def test_set_values_by_constructor(self): 200 # we can set values by constructor args 201 ts = time.gmtime(0) 202 info = FailedLoginInfo(5, ts) 203 self.assertEqual(info.num, 5) 204 self.assertEqual(info.last, ts) 205 return 206 207 def test_set_values_by_keywords(self): 208 # we can set values by constructor keywords 209 ts = time.gmtime(0) 210 info = FailedLoginInfo(last=ts, num=3) 211 self.assertEqual(info.num, 3) 212 self.assertEqual(info.last, ts) 213 return 214 215 def test_as_tuple(self): 216 # we can get the info values as tuple 217 ts = time.gmtime(0) 218 info = FailedLoginInfo(last=ts, num=3) 219 self.assertEqual(info.as_tuple(), (3, ts)) 220 return 221 222 def test_set_values(self): 223 # we can set the values of a an info instance 224 ts = time.time() 225 info = FailedLoginInfo() 226 info.set_values(num=3, last=ts) 227 self.assertEqual(info.num, 3) 228 self.assertEqual(info.last, ts) 229 return 230 231 def test_increase(self): 232 # we can increase the number of failed logins 233 ts1 = time.time() 234 info = FailedLoginInfo() 235 info.increase() 236 self.assertEqual(info.num, 1) 237 self.assertTrue(info.last > ts1) 238 ts2 = info.last 239 info.increase() 240 self.assertEqual(info.num, 2) 241 self.assertTrue(info.last > ts2) 242 return 243 244 def test_reset(self): 245 # we can reset failed login infos. 246 info = FailedLoginInfo() 247 info.increase() 248 info.reset() 249 self.assertEqual(info.num, 0) 250 self.assertEqual(info.last, None) 251 return 252 253 class AccountTests(unittest.TestCase): 254 255 def setUp(self): 256 setUpPasswordManagers() 257 return 258 259 def test_iface(self): 260 acct = Account('bob', 'mypasswd') 261 self.assertTrue( 262 verifyClass(IUserAccount, Account)) 263 self.assertTrue( 264 verifyObject(IUserAccount, acct)) 265 return 266 267 def test_failed_logins(self): 268 # we can retrieve infos about failed logins 269 ts = time.time() 270 acct = Account('bob', 'mypasswd') 271 self.assertTrue(hasattr(acct, 'failed_logins')) 272 acct.failed_logins.set_values(num=3, last=ts) 273 self.assertEqual(acct.failed_logins.last, ts) 274 self.assertEqual(acct.failed_logins.num, 3) 275 return 276 277 def test_failed_logins_per_inst(self): 278 # we get a different counter for each Account instance 279 acct1 = Account('bob', 'secret') 280 acct2 = Account('alice', 'alsosecret') 281 self.assertTrue(acct1.failed_logins is not acct2.failed_logins) 282 return 283 284 class FakeUserAccount(object): 285 pass 286 287 class UsersPluginTests(unittest.TestCase): 288 289 def setUp(self): 290 setUpPasswordManagers() 291 self.site = FakeSite() 292 self.site['users'] = grok.Container() 293 return 294 295 def get_logger(self): 296 logger = logging.getLogger('waeup.test') 297 stream = StringIO() 298 handler = logging.StreamHandler(stream) 299 logger.setLevel(logging.DEBUG) 300 logger.propagate = False 301 logger.addHandler(handler) 302 return logger, stream 303 304 def test_ifaces(self): 305 # make sure we implement the promised interfaces 306 plugin = UsersPlugin() 307 verifyClass(IKofaPluggable, UsersPlugin) 308 verifyObject(IKofaPluggable, plugin) 309 return 310 311 def test_update(self): 312 # make sure user accounts are updated properly. 313 plugin = UsersPlugin() 314 logger, stream = self.get_logger() 315 plugin.update(self.site, 'app', logger) 316 stream.seek(0) 317 self.assertEqual(stream.read(), '') 318 self.site['users']['bob'] = FakeUserAccount() 319 logger, stream = self.get_logger() 320 plugin.update(self.site, 'app', logger) 321 stream.seek(0) 322 log_content = stream.read() 323 self.assertTrue(hasattr(self.site['users']['bob'], 'description')) 324 self.assertTrue(hasattr(self.site['users']['bob'], 'failed_logins')) 325 self.assertTrue( 326 isinstance(self.site['users']['bob'].failed_logins, 327 FailedLoginInfo)) 328 self.assertTrue('attribute description added' in log_content) 329 self.assertTrue('attribute failed_logins added' in log_content) 330 return
Note: See TracChangeset for help on using the changeset viewer.