1 | import os |
---|
2 | import re |
---|
3 | import shutil |
---|
4 | import tempfile |
---|
5 | import unittest |
---|
6 | from paste.deploy import loadapp |
---|
7 | try: |
---|
8 | from urlparse import parse_qsl # Python 2.x |
---|
9 | except ImportError: # pragma: no cover |
---|
10 | from urllib.parse import parse_qsl # Python 3.x |
---|
11 | from webob import Request, Response |
---|
12 | from webtest import TestApp as WebTestApp # avoid py.test skip message |
---|
13 | from waeup.cas.authenticators import DummyAuthenticator |
---|
14 | from waeup.cas.db import DB, LoginTicket, ServiceTicket, TicketGrantingCookie |
---|
15 | from waeup.cas.server import ( |
---|
16 | CASServer, create_service_ticket, create_login_ticket, |
---|
17 | create_tgc_value, check_login_ticket, set_session_cookie, |
---|
18 | check_session_cookie, get_template, delete_session_cookie, |
---|
19 | check_service_ticket, update_url |
---|
20 | ) |
---|
21 | |
---|
22 | RE_ALPHABET = re.compile('^[a-zA-Z0-9\-]*$') |
---|
23 | RE_COOKIE = re.compile('^cas-tgc=[A-Za-z0-9\-]+; Path=/; secure; HttpOnly$') |
---|
24 | RE_COOKIE_DEL = re.compile( |
---|
25 | '^cas-tgc=; Max-Age=\-[0-9]+; Path=/; ' |
---|
26 | 'expires=Thu, 01-Jan-1970 00:00:00 GMT; secure; HttpOnly$') |
---|
27 | |
---|
28 | |
---|
29 | class CASServerTests(unittest.TestCase): |
---|
30 | |
---|
31 | def setUp(self): |
---|
32 | # Create a new location where tempfiles are created. This way |
---|
33 | # also temporary dirs of local CASServers can be removed on |
---|
34 | # tear-down. |
---|
35 | self._new_tmpdir = tempfile.mkdtemp() |
---|
36 | self._old_tmpdir = tempfile.tempdir |
---|
37 | tempfile.tempdir = self._new_tmpdir |
---|
38 | self.workdir = os.path.join(self._new_tmpdir, 'home') |
---|
39 | self.db_path = os.path.join(self.workdir, 'mycas.db') |
---|
40 | os.mkdir(self.workdir) |
---|
41 | self.paste_conf1 = os.path.join( |
---|
42 | os.path.dirname(__file__), 'sample1.ini') |
---|
43 | self.paste_conf2 = os.path.join( |
---|
44 | os.path.dirname(__file__), 'sample2.ini') |
---|
45 | |
---|
46 | def tearDown(self): |
---|
47 | # remove local tempfile and reset old tempdir setting |
---|
48 | if os.path.isdir(self._new_tmpdir): |
---|
49 | shutil.rmtree(self._new_tmpdir) |
---|
50 | tempfile.tempdir = self._old_tmpdir |
---|
51 | |
---|
52 | def test_paste_deploy_loader(self): |
---|
53 | # we can load the CAS server via paste.deploy plugin |
---|
54 | app = loadapp('config:%s' % self.paste_conf1) |
---|
55 | assert isinstance(app, CASServer) |
---|
56 | assert hasattr(app, 'db') |
---|
57 | assert isinstance(app.db, DB) |
---|
58 | assert hasattr(app, 'auth') |
---|
59 | |
---|
60 | def test_paste_deploy_options(self): |
---|
61 | # we can set CAS server-related options via paste.deploy config |
---|
62 | app = loadapp('config:%s' % self.paste_conf2) |
---|
63 | assert isinstance(app, CASServer) |
---|
64 | assert app.db_connection_string == 'sqlite:///:memory:' |
---|
65 | assert isinstance(app.auth, DummyAuthenticator) |
---|
66 | |
---|
67 | def test_init(self): |
---|
68 | # we get a `DB` instance created automatically |
---|
69 | app = CASServer() |
---|
70 | assert hasattr(app, 'db') |
---|
71 | assert app.db is not None |
---|
72 | |
---|
73 | def test_init_explicit_db_path(self): |
---|
74 | # we can set a db_path explicitly |
---|
75 | app = CASServer(db='sqlite:///%s' % self.db_path) |
---|
76 | assert hasattr(app, 'db') |
---|
77 | assert isinstance(app.db, DB) |
---|
78 | assert os.path.isfile(self.db_path) |
---|
79 | |
---|
80 | def test_get_template(self): |
---|
81 | app = CASServer() |
---|
82 | assert app._get_template('login.html') is not None |
---|
83 | assert app._get_template('not-existent.html') is None |
---|
84 | |
---|
85 | def test_call_root(self): |
---|
86 | # the CAS protocol requires no root |
---|
87 | app = CASServer() |
---|
88 | req = Request.blank('http://localhost/') |
---|
89 | resp = app(req) |
---|
90 | assert resp.status == '404 Not Found' |
---|
91 | |
---|
92 | def test_first_time_login(self): |
---|
93 | # we can get a login page |
---|
94 | app = CASServer() |
---|
95 | req = Request.blank('http://localhost/login') |
---|
96 | resp = app(req) |
---|
97 | assert resp.status == '200 OK' |
---|
98 | |
---|
99 | def test_validate(self): |
---|
100 | # we can access a validation page |
---|
101 | app = CASServer() |
---|
102 | req = Request.blank('http://localhost/validate?service=foo&ticket=bar') |
---|
103 | resp = app(req) |
---|
104 | assert resp.status == '200 OK' |
---|
105 | |
---|
106 | def test_logout(self): |
---|
107 | # we can access a logout page |
---|
108 | app = CASServer() |
---|
109 | req = Request.blank('http://localhost/logout') |
---|
110 | resp = app(req) |
---|
111 | assert resp.status == '200 OK' |
---|
112 | |
---|
113 | def test_login_simple(self): |
---|
114 | # a simple login with no service will result in login screen |
---|
115 | # (2.1.1#service of protocol specs) |
---|
116 | app = CASServer() |
---|
117 | req = Request.blank('http://localhost/login') |
---|
118 | resp = app(req) |
---|
119 | assert resp.status == '200 OK' |
---|
120 | assert resp.content_type == 'text/html' |
---|
121 | assert b'<form ' in resp.body |
---|
122 | |
---|
123 | def test_login_cred_acceptor_sso_no_service(self): |
---|
124 | # 2.2.4: successful login via single sign on |
---|
125 | app = CASServer() |
---|
126 | tgc = create_tgc_value() |
---|
127 | app.db.add(tgc) |
---|
128 | value = str(tgc.value) |
---|
129 | req = Request.blank('https://localhost/login') |
---|
130 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
131 | resp = app(req) |
---|
132 | assert resp.status == '200 OK' |
---|
133 | assert b'already' in resp.body |
---|
134 | assert 'Set-Cookie' not in resp.headers |
---|
135 | return |
---|
136 | |
---|
137 | def test_login_renew_with_cookie(self): |
---|
138 | # 2.1.1: cookie will be ignored when renew is set |
---|
139 | app = CASServer() |
---|
140 | tgc = create_tgc_value() |
---|
141 | app.db.add(tgc) |
---|
142 | value = str(tgc.value) |
---|
143 | req = Request.blank('https://localhost/login?renew=true') |
---|
144 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
145 | resp = app(req) |
---|
146 | assert resp.status == '200 OK' |
---|
147 | assert b'username' in resp.body |
---|
148 | assert 'Set-Cookie' not in resp.headers |
---|
149 | |
---|
150 | def test_login_renew_without_cookie(self): |
---|
151 | # 2.1.1: with renew and no cookie, normal auth will happen |
---|
152 | app = CASServer() |
---|
153 | req = Request.blank('https://localhost/login?renew=true') |
---|
154 | resp = app(req) |
---|
155 | assert resp.status == '200 OK' |
---|
156 | assert b'username' in resp.body |
---|
157 | |
---|
158 | def test_login_renew_as_empty_string(self): |
---|
159 | # `renew` is handled correctly, even with empty value |
---|
160 | app = CASServer() |
---|
161 | tgc = create_tgc_value() |
---|
162 | app.db.add(tgc) |
---|
163 | value = str(tgc.value) |
---|
164 | req = Request.blank('https://localhost/login?renew') |
---|
165 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
166 | resp = app(req) |
---|
167 | assert resp.status == '200 OK' |
---|
168 | assert b'username' in resp.body |
---|
169 | assert 'Set-Cookie' not in resp.headers |
---|
170 | |
---|
171 | def test_login_gateway_no_cookie_with_service(self): |
---|
172 | # 2.1.1: with gateway but w/o cookie we will be redirected to service |
---|
173 | # no service ticket will be issued |
---|
174 | app = CASServer() |
---|
175 | params = 'gateway=true&service=http%3A%2F%2Fwww.service.com' |
---|
176 | req = Request.blank('https://localhost/login?%s' % params) |
---|
177 | resp = app(req) |
---|
178 | assert resp.status == '303 See Other' |
---|
179 | assert 'Location' in resp.headers |
---|
180 | assert resp.headers['Location'] == 'http://www.service.com' |
---|
181 | |
---|
182 | def test_login_gateway_with_cookie_and_service(self): |
---|
183 | # 2.1.1: with cookie and gateway we will be redirected to service |
---|
184 | app = CASServer() |
---|
185 | tgc = create_tgc_value() |
---|
186 | app.db.add(tgc) |
---|
187 | value = str(tgc.value) |
---|
188 | params = 'gateway=true&service=http%3A%2F%2Fwww.service.com' |
---|
189 | req = Request.blank('https://localhost/login?%s' % params) |
---|
190 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
191 | resp = app(req) |
---|
192 | assert resp.status == '303 See Other' |
---|
193 | assert 'Location' in resp.headers |
---|
194 | assert resp.headers['Location'].startswith( |
---|
195 | 'http://www.service.com?ticket=ST-') |
---|
196 | |
---|
197 | def test_login_gateway_and_renew(self): |
---|
198 | # 2.1.1 if both, gateway and renew are specified, only renew is valid |
---|
199 | app = CASServer() |
---|
200 | tgc = create_tgc_value() |
---|
201 | app.db.add(tgc) |
---|
202 | value = str(tgc.value) |
---|
203 | req = Request.blank('https://localhost/login?renew=true&gateway=true') |
---|
204 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
205 | resp = app(req) |
---|
206 | # with only gateway true, this would lead to a redirect |
---|
207 | assert resp.status == '200 OK' |
---|
208 | assert b'username' in resp.body |
---|
209 | assert 'Set-Cookie' not in resp.headers |
---|
210 | |
---|
211 | def test_login_warn(self): |
---|
212 | # 2.2.1 as a credential acceptor, with `warn` set we require confirm |
---|
213 | app = CASServer() |
---|
214 | tgc = create_tgc_value() |
---|
215 | app.db.add(tgc) |
---|
216 | value = str(tgc.value) |
---|
217 | params = 'warn=true&service=http%3A%2F%2Fwww.service.com' |
---|
218 | req = Request.blank('https://localhost/login?%s' % params) |
---|
219 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
220 | resp = app(req) |
---|
221 | # without warn, we would get a redirect |
---|
222 | assert resp.status == '200 OK' |
---|
223 | assert b'CAS login successful' in resp.body |
---|
224 | |
---|
225 | def test_logout_no_cookie(self): |
---|
226 | # 2.3 logout displays a logout page. |
---|
227 | app = CASServer() |
---|
228 | req = Request.blank('https://localhost/logout') |
---|
229 | resp = app(req) |
---|
230 | assert resp.status == '200 OK' |
---|
231 | assert b'logged out' in resp.body |
---|
232 | |
---|
233 | def test_logout_with_cookie(self): |
---|
234 | # 2.3 logout destroys any existing SSO session |
---|
235 | app = CASServer() |
---|
236 | tgc = create_tgc_value() |
---|
237 | app.db.add(tgc) |
---|
238 | value = str(tgc.value) |
---|
239 | req = Request.blank('https://localhost/logout') |
---|
240 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
241 | resp = app(req) |
---|
242 | assert resp.status == '200 OK' |
---|
243 | assert b'logged out' in resp.body |
---|
244 | assert 'Set-Cookie' in resp.headers |
---|
245 | cookie = resp.headers['Set-Cookie'] |
---|
246 | assert cookie.startswith('cas-tgc=;') |
---|
247 | assert 'expires' in cookie |
---|
248 | assert 'Max-Age' in cookie |
---|
249 | assert len(list(app.db.query(TicketGrantingCookie))) == 0 |
---|
250 | |
---|
251 | def test_logout_url(self): |
---|
252 | # 2.3.1 with an `url` given we provide a link on logout |
---|
253 | app = CASServer() |
---|
254 | params = 'url=http%3A%2F%2Fwww.logout.com' |
---|
255 | req = Request.blank('https://localhost/logout?%s' % params) |
---|
256 | resp = app(req) |
---|
257 | assert resp.status == '200 OK' |
---|
258 | assert b'logged out' in resp.body |
---|
259 | assert b'like you to' in resp.body |
---|
260 | assert b'http://www.logout.com' in resp.body |
---|
261 | |
---|
262 | def test_validate_invalid(self): |
---|
263 | # 2.4.2 validation failures is indicated by a given format |
---|
264 | app = CASServer() |
---|
265 | params = 'ticket=foo&service=bar' |
---|
266 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
267 | resp = app(req) |
---|
268 | assert resp.body == b'no\n\n' |
---|
269 | |
---|
270 | def test_validate_valid(self): |
---|
271 | # 2.4 validation success is indicated by a given format |
---|
272 | app = CASServer() |
---|
273 | sticket = create_service_ticket( |
---|
274 | 'someuser', 'http://service.com/', sso=False) |
---|
275 | app.db.add(sticket) |
---|
276 | params = 'ticket=%s&service=%s' % ( |
---|
277 | sticket.ticket, sticket.service) |
---|
278 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
279 | resp = app(req) |
---|
280 | assert resp.body == b'yes\nsomeuser\n' |
---|
281 | |
---|
282 | def test_validate_renew_invalid(self): |
---|
283 | # 2.4.1 with `renew` we accept only non-sso issued tickets |
---|
284 | app = CASServer() |
---|
285 | sticket = create_service_ticket( |
---|
286 | 'someuser', 'http://service.com/', sso=True) |
---|
287 | app.db.add(sticket) |
---|
288 | params = 'ticket=%s&service=%s&renew=true' % ( |
---|
289 | sticket.ticket, sticket.service) |
---|
290 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
291 | resp = app(req) |
---|
292 | assert resp.body == b'no\n\n' |
---|
293 | |
---|
294 | def test_validate_renew_valid(self): |
---|
295 | # 2.4.1 with `renew` we accept only non-sso issued tickets |
---|
296 | app = CASServer() |
---|
297 | sticket = create_service_ticket( |
---|
298 | 'someuser', 'http://service.com/', sso=False) |
---|
299 | app.db.add(sticket) |
---|
300 | params = 'ticket=%s&service=%s&renew=true' % ( |
---|
301 | sticket.ticket, sticket.service) |
---|
302 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
303 | resp = app(req) |
---|
304 | assert resp.body == b'yes\nsomeuser\n' |
---|
305 | |
---|
306 | def test_style_css(self): |
---|
307 | # we can get a custom style sheet |
---|
308 | app = CASServer() |
---|
309 | req = Request.blank('https://localhost/style.css') |
---|
310 | resp = app(req) |
---|
311 | assert resp.status == '200 OK' |
---|
312 | assert resp.content_type == 'text/css' |
---|
313 | |
---|
314 | |
---|
315 | class BrowserTests(unittest.TestCase): |
---|
316 | |
---|
317 | def setUp(self): |
---|
318 | self.raw_app = CASServer(auth=DummyAuthenticator()) |
---|
319 | self.app = WebTestApp(self.raw_app) |
---|
320 | |
---|
321 | def test_login(self): |
---|
322 | resp = self.app.get('/login') |
---|
323 | assert resp.status == '200 OK' |
---|
324 | form = resp.forms[0] |
---|
325 | # 2.1.3: form must be submitted by POST |
---|
326 | assert form.method == 'post' |
---|
327 | fieldnames = form.fields.keys() |
---|
328 | # 2.1.3: form must contain: username, password, lt |
---|
329 | assert 'username' in fieldnames |
---|
330 | assert 'password' in fieldnames |
---|
331 | assert 'lt' in fieldnames |
---|
332 | assert RE_ALPHABET.match(form['lt'].value) |
---|
333 | |
---|
334 | def test_login_no_service(self): |
---|
335 | # w/o a service passed in, the form should not contain service |
---|
336 | # (not a strict protocol requirement, but handy) |
---|
337 | resp = self.app.get('/login') |
---|
338 | assert 'service' not in resp.forms[0].fields.keys() |
---|
339 | |
---|
340 | def test_login_service_replayed(self): |
---|
341 | # 2.1.3: the login form must contain the service param sent |
---|
342 | resp = self.app.get('/login?service=http%3A%2F%2Fwww.service.com') |
---|
343 | form = resp.forms[0] |
---|
344 | assert resp.status == '200 OK' |
---|
345 | assert 'service' in form.fields.keys() |
---|
346 | assert form['service'].value == 'http://www.service.com' |
---|
347 | |
---|
348 | def test_login_cred_acceptor_valid_no_service(self): |
---|
349 | # 2.2.4: successful login w/o service yields a message |
---|
350 | lt = create_login_ticket() |
---|
351 | self.raw_app.db.add(lt) |
---|
352 | lt_string = lt.ticket |
---|
353 | resp = self.app.post('/login', dict( |
---|
354 | username='bird', password='bebop', lt=lt_string)) |
---|
355 | assert resp.status == '200 OK' |
---|
356 | assert b'successful' in resp.body |
---|
357 | # single-sign-on session initiated |
---|
358 | assert 'Set-Cookie' in resp.headers |
---|
359 | cookie = resp.headers['Set-Cookie'] |
---|
360 | assert cookie.startswith('cas-tgc=') |
---|
361 | |
---|
362 | def test_login_cred_acceptor_valid_w_service(self): |
---|
363 | # 2.2.4: successful login with service makes a redirect |
---|
364 | # Appendix B: safe redirect |
---|
365 | lt = create_login_ticket() |
---|
366 | self.raw_app.db.add(lt) |
---|
367 | lt_string = lt.ticket |
---|
368 | resp = self.app.post('/login', dict( |
---|
369 | username='bird', password='bebop', lt=lt_string, |
---|
370 | service='http://example.com/Login')) |
---|
371 | assert resp.status == '303 See Other' |
---|
372 | assert 'Location' in resp.headers |
---|
373 | assert resp.headers['Location'].startswith( |
---|
374 | 'http://example.com/Login?ticket=ST-') |
---|
375 | assert 'Pragma' in resp.headers |
---|
376 | assert resp.headers['Pragma'] == 'no-cache' |
---|
377 | assert 'Cache-Control' in resp.headers |
---|
378 | assert resp.headers['Cache-Control'] == 'no-store' |
---|
379 | assert 'Expires' in resp.headers |
---|
380 | assert resp.headers['Expires'] == 'Thu, 01 Dec 1994 16:00:00 GMT' |
---|
381 | assert b'window.location.href' in resp.body |
---|
382 | assert b'noscript' in resp.body |
---|
383 | assert b'ticket=ST-' in resp.body |
---|
384 | q = self.raw_app.db.query(ServiceTicket) |
---|
385 | st = q.all()[0] |
---|
386 | assert st.user == 'bird' |
---|
387 | assert st.service == 'http://example.com/Login' |
---|
388 | assert st.ticket.startswith('ST-') |
---|
389 | |
---|
390 | def test_login_cred_acceptor_failed(self): |
---|
391 | # 2.2.4: failed login yields a message |
---|
392 | lt = create_login_ticket() |
---|
393 | self.raw_app.db.add(lt) |
---|
394 | lt_string = lt.ticket |
---|
395 | resp = self.app.post('/login', dict( |
---|
396 | username='bird', password='cat', lt=lt_string)) |
---|
397 | assert resp.status == '200 OK' |
---|
398 | assert b'failed' in resp.body |
---|
399 | |
---|
400 | def test_login_sso_no_service(self): |
---|
401 | # we can initiate single-sign-on without service |
---|
402 | resp1 = self.app.get('https://localhost/login') # HTTPS required! |
---|
403 | assert resp1.status == '200 OK' |
---|
404 | assert 'cas-tgc' not in self.app.cookies |
---|
405 | form = resp1.forms[0] |
---|
406 | form.set('username', 'bird') |
---|
407 | form.set('password', 'bebop') |
---|
408 | resp2 = form.submit('AUTHENTICATE') |
---|
409 | assert resp2.status == '200 OK' |
---|
410 | # we got a secure cookie |
---|
411 | assert 'cas-tgc' in self.app.cookies |
---|
412 | # when we get the login page again, the cookie will replace creds. |
---|
413 | resp3 = self.app.get('https://localhost/login') |
---|
414 | assert b'You logged in already' in resp3.body |
---|
415 | |
---|
416 | def test_login_sso_with_service(self): |
---|
417 | resp1 = self.app.get( |
---|
418 | 'https://localhost/login?service=http%3A%2F%2Fservice.com%2F') |
---|
419 | assert resp1.status == '200 OK' |
---|
420 | assert 'cas-tgc' not in self.app.cookies |
---|
421 | form = resp1.forms[0] |
---|
422 | form.set('username', 'bird') |
---|
423 | form.set('password', 'bebop') |
---|
424 | resp2 = form.submit('AUTHENTICATE') |
---|
425 | assert resp2.status == '303 See Other' |
---|
426 | assert resp2.headers['Location'].startswith( |
---|
427 | 'http://service.com/?ticket=ST-') |
---|
428 | # we got a secure cookie |
---|
429 | assert 'cas-tgc' in self.app.cookies |
---|
430 | resp3 = self.app.get( |
---|
431 | 'https://localhost/login?service=http%3A%2F%2Fservice.com%2F') |
---|
432 | assert resp3.status == '303 See Other' |
---|
433 | assert resp3.headers['Location'].startswith( |
---|
434 | 'http://service.com/?ticket=ST-') |
---|
435 | |
---|
436 | def test_login_sso_with_service_additional_params1(self): |
---|
437 | # we can get a service ticket also with a service providing |
---|
438 | # get params |
---|
439 | # this service url reads http://service.com/index.php?authCAS=CAS |
---|
440 | service_url = 'http%3A%2F%2Fservice.com%2Findex.php%3FauthCAS%3DCAS' |
---|
441 | resp1 = self.app.get( |
---|
442 | 'https://localhost/login?service=%s' % service_url) |
---|
443 | assert resp1.status == '200 OK' |
---|
444 | assert 'cas-tgc' not in self.app.cookies |
---|
445 | form = resp1.forms[0] |
---|
446 | form.set('username', 'bird') |
---|
447 | form.set('password', 'bebop') |
---|
448 | resp2 = form.submit('AUTHENTICATE') |
---|
449 | assert resp2.status == '303 See Other' |
---|
450 | location = resp2.headers['Location'] |
---|
451 | query_string = location.split('?', 1)[1] |
---|
452 | query_params = dict(parse_qsl(query_string)) |
---|
453 | assert 'authCAS' in query_params.keys() |
---|
454 | assert 'ticket' in query_params.keys() |
---|
455 | assert len(query_params['ticket']) == 32 |
---|
456 | |
---|
457 | |
---|
458 | class CASServerHelperTests(unittest.TestCase): |
---|
459 | |
---|
460 | def setUp(self): |
---|
461 | self.workdir = tempfile.mkdtemp() |
---|
462 | self.db_file = os.path.join(self.workdir, 'mycas.db') |
---|
463 | self.conn_string = 'sqlite:///%s' % self.db_file |
---|
464 | self.db = DB(self.conn_string) |
---|
465 | |
---|
466 | def tearDown(self): |
---|
467 | shutil.rmtree(self.workdir) |
---|
468 | |
---|
469 | def test_create_service_ticket(self): |
---|
470 | # we can create service tickets |
---|
471 | st = create_service_ticket( |
---|
472 | user='bob', service='http://www.example.com') |
---|
473 | assert isinstance(st, ServiceTicket) |
---|
474 | # 3.1.1: service not part of ticket |
---|
475 | assert 'example.com' not in st.ticket |
---|
476 | # 3.1.1: ticket must start with 'ST-' |
---|
477 | assert st.ticket.startswith('ST-') |
---|
478 | # 3.1.1: min. ticket length clients must be able to process is 32 |
---|
479 | assert len(st.ticket) < 33 |
---|
480 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
481 | assert RE_ALPHABET.match(st.ticket), ( |
---|
482 | 'Ticket contains forbidden chars: %s' % st) |
---|
483 | assert st.service == 'http://www.example.com' |
---|
484 | assert st.user == 'bob' |
---|
485 | |
---|
486 | def test_create_login_ticket(self): |
---|
487 | # we can create login tickets |
---|
488 | lt = create_login_ticket() |
---|
489 | # 3.5.1: ticket should start with 'LT-' |
---|
490 | assert lt.ticket.startswith('LT-') |
---|
491 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
492 | assert RE_ALPHABET.match(lt.ticket), ( |
---|
493 | 'Ticket contains forbidden chars: %s' % lt) |
---|
494 | |
---|
495 | def test_create_login_ticket_unique(self): |
---|
496 | # 3.5.1: login tickets are unique (although not hard to guess) |
---|
497 | ticket_num = 1000 # increase to test more thoroughly |
---|
498 | lt_list = [create_login_ticket() for x in range(ticket_num)] |
---|
499 | assert len(set(lt_list)) == ticket_num |
---|
500 | |
---|
501 | def test_create_tgc_value(self): |
---|
502 | # we can create ticket granting cookies |
---|
503 | tgc = create_tgc_value() |
---|
504 | assert isinstance(tgc, TicketGrantingCookie) |
---|
505 | # 3.6.1: cookie value should start with 'TGC-' |
---|
506 | assert tgc.value.startswith('TGC-') |
---|
507 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
508 | assert RE_ALPHABET.match(tgc.value), ( |
---|
509 | 'Cookie value contains forbidden chars: %s' % tgc) |
---|
510 | |
---|
511 | def test_check_login_ticket(self): |
---|
512 | db = DB('sqlite:///') |
---|
513 | lt = LoginTicket('LT-123456') |
---|
514 | db.add(lt) |
---|
515 | assert check_login_ticket(db, None) is False |
---|
516 | assert check_login_ticket(db, 'LT-123456') is True |
---|
517 | # the ticket will be removed after check |
---|
518 | assert check_login_ticket(db, 'LT-123456') is False |
---|
519 | assert check_login_ticket(db, 'LT-654321') is False |
---|
520 | |
---|
521 | def test_set_session_cookie1(self): |
---|
522 | # make sure we can add session cookies to responses |
---|
523 | db = DB('sqlite:///') |
---|
524 | resp = set_session_cookie(db, Response()) |
---|
525 | assert 'Set-Cookie' in resp.headers |
---|
526 | cookie = resp.headers['Set-Cookie'] |
---|
527 | assert RE_COOKIE.match(cookie), ( |
---|
528 | 'Cookie in unexpected format: %s' % cookie) |
---|
529 | # the cookie is stored in database |
---|
530 | value = cookie.split('=')[1].split(';')[0] |
---|
531 | q = db.query(TicketGrantingCookie).filter( |
---|
532 | TicketGrantingCookie.value == value) |
---|
533 | assert len(list(q)) == 1 |
---|
534 | |
---|
535 | def test_check_session_cookie2(self): |
---|
536 | db = DB('sqlite:///') |
---|
537 | tgc = create_tgc_value() |
---|
538 | db.add(tgc) |
---|
539 | value = tgc.value |
---|
540 | assert check_session_cookie(db, value) == tgc |
---|
541 | assert check_session_cookie(db, 'foo') is None |
---|
542 | assert check_session_cookie(db, b'foo') is None |
---|
543 | assert check_session_cookie(db, None) is None |
---|
544 | value2 = value.encode('utf-8') |
---|
545 | assert check_session_cookie(db, value2) == tgc |
---|
546 | |
---|
547 | def test_get_template(self): |
---|
548 | # we can load templates |
---|
549 | assert get_template('not-existing-template') is None |
---|
550 | assert get_template('login.html') is not None |
---|
551 | |
---|
552 | def test_delete_session_cookie(self): |
---|
553 | # we can unset cookies |
---|
554 | db = DB('sqlite:///') |
---|
555 | tgc = create_tgc_value() |
---|
556 | db.add(tgc) |
---|
557 | value = tgc.value |
---|
558 | resp = delete_session_cookie(db, Response(), old_value=value) |
---|
559 | assert 'Set-Cookie' in resp.headers |
---|
560 | cookie = resp.headers['Set-Cookie'] |
---|
561 | assert RE_COOKIE_DEL.match(cookie), ( |
---|
562 | 'Cookie in unexpected format: %s' % cookie) |
---|
563 | # the cookie values was deleted from database |
---|
564 | q = db.query(TicketGrantingCookie).filter( |
---|
565 | TicketGrantingCookie.value == value) |
---|
566 | assert len(list(q)) == 0 |
---|
567 | |
---|
568 | def test_check_service_ticket(self): |
---|
569 | db = DB('sqlite:///') |
---|
570 | st = ServiceTicket( |
---|
571 | 'ST-123456', 'someuser', 'http://myservice.com', True) |
---|
572 | db.add(st) |
---|
573 | assert check_service_ticket(db, None, 'foo') is None |
---|
574 | assert check_service_ticket(db, 'foo', None) is None |
---|
575 | assert check_service_ticket(db, 'ST-123456', 'foo') is None |
---|
576 | assert check_service_ticket(db, 'foo', 'http://myservice.com') is None |
---|
577 | result = check_service_ticket(db, 'ST-123456', 'http://myservice.com') |
---|
578 | assert isinstance(result, ServiceTicket) |
---|
579 | assert result.user == 'someuser' |
---|
580 | assert check_service_ticket( |
---|
581 | db, 'ST-123456', 'http://myservice.com', True) is None |
---|
582 | assert check_service_ticket( |
---|
583 | db, 'ST-123456', 'http://myservice.com', False) is not None |
---|
584 | |
---|
585 | def test_update_url(self): |
---|
586 | # we can create valid new urls with query string params updated |
---|
587 | result1 = update_url('http://sample.com/index?a=1&b=2', dict(b='3')) |
---|
588 | assert result1 in ( |
---|
589 | 'http://sample.com/index?a=1&b=3', |
---|
590 | 'http://sample.com/index?b=3&a=1') |
---|
591 | result2 = update_url('http://sample.com/index?b=2', dict(b='3')) |
---|
592 | assert result2 == 'http://sample.com/index?b=3' |
---|
593 | result3 = update_url('http://sample.com/index', dict(b='3')) |
---|
594 | assert result3 == 'http://sample.com/index?b=3' |
---|
595 | result4 = update_url('http://sample.com/index?a=2', dict(b='3')) |
---|
596 | assert result4 in ( |
---|
597 | 'http://sample.com/index?a=2&b=3', |
---|
598 | 'http://sample.com/index?b=3&a=2') |
---|