source: main/waeup.cas/trunk/waeup/cas/tests/test_server.py @ 10492

Last change on this file since 10492 was 10492, checked in by uli, 11 years ago

Update regression test to allow query params in arbitrary order.

File size: 22.8 KB
Line 
1import os
2import re
3import shutil
4import tempfile
5import unittest
6from paste.deploy import loadapp
7from urlparse import parse_qsl
8from webob import Request, Response
9from webtest import TestApp as WebTestApp  # avoid py.test skip message
10from waeup.cas.authenticators import DummyAuthenticator
11from waeup.cas.db import DB, LoginTicket, ServiceTicket, TicketGrantingCookie
12from waeup.cas.server import (
13    CASServer, create_service_ticket, create_login_ticket,
14    create_tgc_value, check_login_ticket, set_session_cookie,
15    check_session_cookie, get_template, delete_session_cookie,
16    check_service_ticket, update_url
17    )
18
19RE_ALPHABET = re.compile('^[a-zA-Z0-9\-]*$')
20RE_COOKIE = re.compile('^cas-tgc=[A-Za-z0-9\-]+; Path=/; secure; HttpOnly$')
21RE_COOKIE_DEL = re.compile(
22    '^cas-tgc=; Max-Age=\-[0-9]+; Path=/; '
23    'expires=Thu, 01-Jan-1970 00:00:00 GMT; secure; HttpOnly$')
24
25
26class CASServerTests(unittest.TestCase):
27
28    def setUp(self):
29        # Create a new location where tempfiles are created.  This way
30        # also temporary dirs of local CASServers can be removed on
31        # tear-down.
32        self._new_tmpdir = tempfile.mkdtemp()
33        self._old_tmpdir = tempfile.tempdir
34        tempfile.tempdir = self._new_tmpdir
35        self.workdir = os.path.join(self._new_tmpdir, 'home')
36        self.db_path = os.path.join(self.workdir, 'mycas.db')
37        os.mkdir(self.workdir)
38        self.paste_conf1 = os.path.join(
39            os.path.dirname(__file__), 'sample1.ini')
40        self.paste_conf2 = os.path.join(
41            os.path.dirname(__file__), 'sample2.ini')
42
43    def tearDown(self):
44        # remove local tempfile and reset old tempdir setting
45        if os.path.isdir(self._new_tmpdir):
46            shutil.rmtree(self._new_tmpdir)
47        tempfile.tempdir = self._old_tmpdir
48
49    def test_paste_deploy_loader(self):
50        # we can load the CAS server via paste.deploy plugin
51        app = loadapp('config:%s' % self.paste_conf1)
52        assert isinstance(app, CASServer)
53        assert hasattr(app, 'db')
54        assert isinstance(app.db, DB)
55        assert hasattr(app, 'auth')
56
57    def test_paste_deploy_options(self):
58        # we can set CAS server-related options via paste.deploy config
59        app = loadapp('config:%s' % self.paste_conf2)
60        assert isinstance(app, CASServer)
61        assert app.db_connection_string == 'sqlite:///:memory:'
62        assert isinstance(app.auth, DummyAuthenticator)
63
64    def test_init(self):
65        # we get a `DB` instance created automatically
66        app = CASServer()
67        assert hasattr(app, 'db')
68        assert app.db is not None
69
70    def test_init_explicit_db_path(self):
71        # we can set a db_path explicitly
72        app = CASServer(db='sqlite:///%s' % self.db_path)
73        assert hasattr(app, 'db')
74        assert isinstance(app.db, DB)
75        assert os.path.isfile(self.db_path)
76
77    def test_get_template(self):
78        app = CASServer()
79        assert app._get_template('login.html') is not None
80        assert app._get_template('not-existent.html') is None
81
82    def test_call_root(self):
83        # the CAS protocol requires no root
84        app = CASServer()
85        req = Request.blank('http://localhost/')
86        resp = app(req)
87        assert resp.status == '404 Not Found'
88
89    def test_first_time_login(self):
90        # we can get a login page
91        app = CASServer()
92        req = Request.blank('http://localhost/login')
93        resp = app(req)
94        assert resp.status == '200 OK'
95
96    def test_validate(self):
97        # we can access a validation page
98        app = CASServer()
99        req = Request.blank('http://localhost/validate?service=foo&ticket=bar')
100        resp = app(req)
101        assert resp.status == '200 OK'
102
103    def test_logout(self):
104        # we can access a logout page
105        app = CASServer()
106        req = Request.blank('http://localhost/logout')
107        resp = app(req)
108        assert resp.status == '200 OK'
109
110    def test_login_simple(self):
111        # a simple login with no service will result in login screen
112        # (2.1.1#service of protocol specs)
113        app = CASServer()
114        req = Request.blank('http://localhost/login')
115        resp = app(req)
116        assert resp.status == '200 OK'
117        assert resp.content_type == 'text/html'
118        assert b'<form ' in resp.body
119
120    def test_login_cred_acceptor_sso_no_service(self):
121        # 2.2.4: successful login via single sign on
122        app = CASServer()
123        tgc = create_tgc_value()
124        app.db.add(tgc)
125        value = str(tgc.value)
126        req = Request.blank('https://localhost/login')
127        req.headers['Cookie'] = 'cas-tgc=%s' % value
128        resp = app(req)
129        assert resp.status == '200 OK'
130        assert b'already' in resp.body
131        assert 'Set-Cookie' not in resp.headers
132        return
133
134    def test_login_renew_with_cookie(self):
135        # 2.1.1: cookie will be ignored when renew is set
136        app = CASServer()
137        tgc = create_tgc_value()
138        app.db.add(tgc)
139        value = str(tgc.value)
140        req = Request.blank('https://localhost/login?renew=true')
141        req.headers['Cookie'] = 'cas-tgc=%s' % value
142        resp = app(req)
143        assert resp.status == '200 OK'
144        assert b'username' in resp.body
145        assert 'Set-Cookie' not in resp.headers
146
147    def test_login_renew_without_cookie(self):
148        # 2.1.1: with renew and no cookie, normal auth will happen
149        app = CASServer()
150        req = Request.blank('https://localhost/login?renew=true')
151        resp = app(req)
152        assert resp.status == '200 OK'
153        assert b'username' in resp.body
154
155    def test_login_renew_as_empty_string(self):
156        # `renew` is handled correctly, even with empty value
157        app = CASServer()
158        tgc = create_tgc_value()
159        app.db.add(tgc)
160        value = str(tgc.value)
161        req = Request.blank('https://localhost/login?renew')
162        req.headers['Cookie'] = 'cas-tgc=%s' % value
163        resp = app(req)
164        assert resp.status == '200 OK'
165        assert b'username' in resp.body
166        assert 'Set-Cookie' not in resp.headers
167
168    def test_login_gateway_no_cookie_with_service(self):
169        # 2.1.1: with gateway but w/o cookie we will be redirected to service
170        # no service ticket will be issued
171        app = CASServer()
172        params = 'gateway=true&service=http%3A%2F%2Fwww.service.com'
173        req = Request.blank('https://localhost/login?%s' % params)
174        resp = app(req)
175        assert resp.status == '303 See Other'
176        assert 'Location' in resp.headers
177        assert resp.headers['Location'] == 'http://www.service.com'
178
179    def test_login_gateway_with_cookie_and_service(self):
180        # 2.1.1: with cookie and gateway we will be redirected to service
181        app = CASServer()
182        tgc = create_tgc_value()
183        app.db.add(tgc)
184        value = str(tgc.value)
185        params = 'gateway=true&service=http%3A%2F%2Fwww.service.com'
186        req = Request.blank('https://localhost/login?%s' % params)
187        req.headers['Cookie'] = 'cas-tgc=%s' % value
188        resp = app(req)
189        assert resp.status == '303 See Other'
190        assert 'Location' in resp.headers
191        assert resp.headers['Location'].startswith(
192            'http://www.service.com?ticket=ST-')
193
194    def test_login_gateway_and_renew(self):
195        # 2.1.1 if both, gateway and renew are specified, only renew is valid
196        app = CASServer()
197        tgc = create_tgc_value()
198        app.db.add(tgc)
199        value = str(tgc.value)
200        req = Request.blank('https://localhost/login?renew=true&gateway=true')
201        req.headers['Cookie'] = 'cas-tgc=%s' % value
202        resp = app(req)
203        # with only gateway true, this would lead to a redirect
204        assert resp.status == '200 OK'
205        assert b'username' in resp.body
206        assert 'Set-Cookie' not in resp.headers
207
208    def test_login_warn(self):
209        # 2.2.1 as a credential acceptor, with `warn` set we require confirm
210        app = CASServer()
211        tgc = create_tgc_value()
212        app.db.add(tgc)
213        value = str(tgc.value)
214        params = 'warn=true&service=http%3A%2F%2Fwww.service.com'
215        req = Request.blank('https://localhost/login?%s' % params)
216        req.headers['Cookie'] = 'cas-tgc=%s' % value
217        resp = app(req)
218        # without warn, we would get a redirect
219        assert resp.status == '200 OK'
220        assert b'CAS login successful' in resp.body
221
222    def test_logout_no_cookie(self):
223        # 2.3 logout displays a logout page.
224        app = CASServer()
225        req = Request.blank('https://localhost/logout')
226        resp = app(req)
227        assert resp.status == '200 OK'
228        assert b'logged out' in resp.body
229
230    def test_logout_with_cookie(self):
231        # 2.3 logout destroys any existing SSO session
232        app = CASServer()
233        tgc = create_tgc_value()
234        app.db.add(tgc)
235        value = str(tgc.value)
236        req = Request.blank('https://localhost/logout')
237        req.headers['Cookie'] = 'cas-tgc=%s' % value
238        resp = app(req)
239        assert resp.status == '200 OK'
240        assert b'logged out' in resp.body
241        assert 'Set-Cookie' in resp.headers
242        cookie = resp.headers['Set-Cookie']
243        assert cookie.startswith('cas-tgc=;')
244        assert 'expires' in cookie
245        assert 'Max-Age' in cookie
246        assert len(list(app.db.query(TicketGrantingCookie))) == 0
247
248    def test_logout_url(self):
249        # 2.3.1 with an `url` given we provide a link on logout
250        app = CASServer()
251        params = 'url=http%3A%2F%2Fwww.logout.com'
252        req = Request.blank('https://localhost/logout?%s' % params)
253        resp = app(req)
254        assert resp.status == '200 OK'
255        assert b'logged out' in resp.body
256        assert b'like you to' in resp.body
257        assert b'http://www.logout.com' in resp.body
258
259    def test_validate_invalid(self):
260        # 2.4.2 validation failures is indicated by a given format
261        app = CASServer()
262        params = 'ticket=foo&service=bar'
263        req = Request.blank('https://localhost/validate?%s' % params)
264        resp = app(req)
265        assert resp.body == b'no\n\n'
266
267    def test_validate_valid(self):
268        # 2.4 validation success is indicated by a given format
269        app = CASServer()
270        sticket = create_service_ticket(
271            'someuser', 'http://service.com/', sso=False)
272        app.db.add(sticket)
273        params = 'ticket=%s&service=%s' % (
274            sticket.ticket, sticket.service)
275        req = Request.blank('https://localhost/validate?%s' % params)
276        resp = app(req)
277        assert resp.body == b'yes\nsomeuser\n'
278
279    def test_validate_renew_invalid(self):
280        # 2.4.1 with `renew` we accept only non-sso issued tickets
281        app = CASServer()
282        sticket = create_service_ticket(
283            'someuser', 'http://service.com/', sso=True)
284        app.db.add(sticket)
285        params = 'ticket=%s&service=%s&renew=true' % (
286            sticket.ticket, sticket.service)
287        req = Request.blank('https://localhost/validate?%s' % params)
288        resp = app(req)
289        assert resp.body == b'no\n\n'
290
291    def test_validate_renew_valid(self):
292        # 2.4.1 with `renew` we accept only non-sso issued tickets
293        app = CASServer()
294        sticket = create_service_ticket(
295            'someuser', 'http://service.com/', sso=False)
296        app.db.add(sticket)
297        params = 'ticket=%s&service=%s&renew=true' % (
298            sticket.ticket, sticket.service)
299        req = Request.blank('https://localhost/validate?%s' % params)
300        resp = app(req)
301        assert resp.body == b'yes\nsomeuser\n'
302
303
304class BrowserTests(unittest.TestCase):
305
306    def setUp(self):
307        self.raw_app = CASServer(auth=DummyAuthenticator())
308        self.app = WebTestApp(self.raw_app)
309
310    def test_login(self):
311        resp = self.app.get('/login')
312        assert resp.status == '200 OK'
313        form = resp.forms[0]
314        # 2.1.3: form must be submitted by POST
315        assert form.method == 'post'
316        fieldnames = form.fields.keys()
317        # 2.1.3: form must contain: username, password, lt
318        assert 'username' in fieldnames
319        assert 'password' in fieldnames
320        assert 'lt' in fieldnames
321        assert RE_ALPHABET.match(form['lt'].value)
322
323    def test_login_no_service(self):
324        # w/o a service passed in, the form should not contain service
325        # (not a strict protocol requirement, but handy)
326        resp = self.app.get('/login')
327        assert 'service' not in resp.forms[0].fields.keys()
328
329    def test_login_service_replayed(self):
330        # 2.1.3: the login form must contain the service param sent
331        resp = self.app.get('/login?service=http%3A%2F%2Fwww.service.com')
332        form = resp.forms[0]
333        assert resp.status == '200 OK'
334        assert 'service' in form.fields.keys()
335        assert form['service'].value == 'http://www.service.com'
336
337    def test_login_cred_acceptor_valid_no_service(self):
338        # 2.2.4: successful login w/o service yields a message
339        lt = create_login_ticket()
340        self.raw_app.db.add(lt)
341        lt_string = lt.ticket
342        resp = self.app.post('/login', dict(
343            username='bird', password='bebop', lt=lt_string))
344        assert resp.status == '200 OK'
345        assert b'successful' in resp.body
346        # single-sign-on session initiated
347        assert 'Set-Cookie' in resp.headers
348        cookie = resp.headers['Set-Cookie']
349        assert cookie.startswith('cas-tgc=')
350
351    def test_login_cred_acceptor_valid_w_service(self):
352        # 2.2.4: successful login with service makes a redirect
353        # Appendix B: safe redirect
354        lt = create_login_ticket()
355        self.raw_app.db.add(lt)
356        lt_string = lt.ticket
357        resp = self.app.post('/login', dict(
358            username='bird', password='bebop', lt=lt_string,
359            service='http://example.com/Login'))
360        assert resp.status == '303 See Other'
361        assert 'Location' in resp.headers
362        assert resp.headers['Location'].startswith(
363            'http://example.com/Login?ticket=ST-')
364        assert 'Pragma' in resp.headers
365        assert resp.headers['Pragma'] == 'no-cache'
366        assert 'Cache-Control' in resp.headers
367        assert resp.headers['Cache-Control'] == 'no-store'
368        assert 'Expires' in resp.headers
369        assert resp.headers['Expires'] == 'Thu, 01 Dec 1994 16:00:00 GMT'
370        assert b'window.location.href' in resp.body
371        assert b'noscript' in resp.body
372        assert b'ticket=ST-' in resp.body
373
374    def test_login_cred_acceptor_failed(self):
375        # 2.2.4: failed login yields a message
376        lt = create_login_ticket()
377        self.raw_app.db.add(lt)
378        lt_string = lt.ticket
379        resp = self.app.post('/login', dict(
380            username='bird', password='cat', lt=lt_string))
381        assert resp.status == '200 OK'
382        assert b'failed' in resp.body
383
384    def test_login_sso_no_service(self):
385        # we can initiate single-sign-on without service
386        resp1 = self.app.get('https://localhost/login')  # HTTPS required!
387        assert resp1.status == '200 OK'
388        assert 'cas-tgc' not in self.app.cookies
389        form = resp1.forms[0]
390        form.set('username', 'bird')
391        form.set('password', 'bebop')
392        resp2 = form.submit('AUTHENTICATE')
393        assert resp2.status == '200 OK'
394        # we got a secure cookie
395        assert 'cas-tgc' in self.app.cookies
396        # when we get the login page again, the cookie will replace creds.
397        resp3 = self.app.get('https://localhost/login')
398        assert b'You logged in already' in resp3.body
399
400    def test_login_sso_with_service(self):
401        resp1 = self.app.get(
402            'https://localhost/login?service=http%3A%2F%2Fservice.com%2F')
403        assert resp1.status == '200 OK'
404        assert 'cas-tgc' not in self.app.cookies
405        form = resp1.forms[0]
406        form.set('username', 'bird')
407        form.set('password', 'bebop')
408        resp2 = form.submit('AUTHENTICATE')
409        assert resp2.status == '303 See Other'
410        assert resp2.headers['Location'].startswith(
411            'http://service.com/?ticket=ST-')
412        # we got a secure cookie
413        assert 'cas-tgc' in self.app.cookies
414        resp3 = self.app.get(
415            'https://localhost/login?service=http%3A%2F%2Fservice.com%2F')
416        assert resp3.status == '303 See Other'
417        assert resp3.headers['Location'].startswith(
418            'http://service.com/?ticket=ST-')
419
420    def test_login_sso_with_service_additional_params1(self):
421        # we can get a service ticket also with a service providing
422        # get params
423        # this service url reads http://service.com/index.php?authCAS=CAS
424        service_url = 'http%3A%2F%2Fservice.com%2Findex.php%3FauthCAS%3DCAS'
425        resp1 = self.app.get(
426            'https://localhost/login?service=%s' % service_url)
427        assert resp1.status == '200 OK'
428        assert 'cas-tgc' not in self.app.cookies
429        form = resp1.forms[0]
430        form.set('username', 'bird')
431        form.set('password', 'bebop')
432        resp2 = form.submit('AUTHENTICATE')
433        assert resp2.status == '303 See Other'
434        location = resp2.headers['Location']
435        query_string = location.split('?', 1)[1]
436        query_params = dict(parse_qsl(query_string))
437        assert 'authCAS' in query_params.keys()
438        assert 'ticket' in query_params.keys()
439        assert len(query_params['ticket']) == 32
440
441
442class CASServerHelperTests(unittest.TestCase):
443
444    def setUp(self):
445        self.workdir = tempfile.mkdtemp()
446        self.db_file = os.path.join(self.workdir, 'mycas.db')
447        self.conn_string = 'sqlite:///%s' % self.db_file
448        self.db = DB(self.conn_string)
449
450    def tearDown(self):
451        shutil.rmtree(self.workdir)
452
453    def test_create_service_ticket(self):
454        # we can create service tickets
455        st = create_service_ticket(
456            user='bob', service='http://www.example.com')
457        assert isinstance(st, ServiceTicket)
458        # 3.1.1: service not part of ticket
459        assert 'example.com' not in st.ticket
460        # 3.1.1: ticket must start with 'ST-'
461        assert st.ticket.startswith('ST-')
462        # 3.1.1: min. ticket length clients must be able to process is 32
463        assert len(st.ticket) < 33
464        # 3.7: allowed character set == [a-zA-Z0-9\-]
465        assert RE_ALPHABET.match(st.ticket), (
466            'Ticket contains forbidden chars: %s' % st)
467
468    def test_create_login_ticket(self):
469        # we can create login tickets
470        lt = create_login_ticket()
471        # 3.5.1: ticket should start with 'LT-'
472        assert lt.ticket.startswith('LT-')
473        # 3.7: allowed character set == [a-zA-Z0-9\-]
474        assert RE_ALPHABET.match(lt.ticket), (
475            'Ticket contains forbidden chars: %s' % lt)
476
477    def test_create_login_ticket_unique(self):
478        # 3.5.1: login tickets are unique (although not hard to guess)
479        ticket_num = 1000  # increase to test more thoroughly
480        lt_list = [create_login_ticket() for x in range(ticket_num)]
481        assert len(set(lt_list)) == ticket_num
482
483    def test_create_tgc_value(self):
484        # we can create ticket granting cookies
485        tgc = create_tgc_value()
486        assert isinstance(tgc, TicketGrantingCookie)
487        # 3.6.1: cookie value should start with 'TGC-'
488        assert tgc.value.startswith('TGC-')
489        # 3.7: allowed character set == [a-zA-Z0-9\-]
490        assert RE_ALPHABET.match(tgc.value), (
491            'Cookie value contains forbidden chars: %s' % tgc)
492
493    def test_check_login_ticket(self):
494        db = DB('sqlite:///')
495        lt = LoginTicket('LT-123456')
496        db.add(lt)
497        assert check_login_ticket(db, None) is False
498        assert check_login_ticket(db, 'LT-123456') is True
499        # the ticket will be removed after check
500        assert check_login_ticket(db, 'LT-123456') is False
501        assert check_login_ticket(db, 'LT-654321') is False
502
503    def test_set_session_cookie1(self):
504        # make sure we can add session cookies to responses
505        db = DB('sqlite:///')
506        resp = set_session_cookie(db, Response())
507        assert 'Set-Cookie' in resp.headers
508        cookie = resp.headers['Set-Cookie']
509        assert RE_COOKIE.match(cookie), (
510            'Cookie in unexpected format: %s' % cookie)
511        # the cookie is stored in database
512        value = cookie.split('=')[1].split(';')[0]
513        q = db.query(TicketGrantingCookie).filter(
514            TicketGrantingCookie.value == value)
515        assert len(list(q)) == 1
516
517    def test_check_session_cookie2(self):
518        db = DB('sqlite:///')
519        tgc = create_tgc_value()
520        db.add(tgc)
521        value = tgc.value
522        assert check_session_cookie(db, value) == tgc
523        assert check_session_cookie(db, 'foo') is None
524        assert check_session_cookie(db, b'foo') is None
525        assert check_session_cookie(db, None) is None
526        value2 = value.encode('utf-8')
527        assert check_session_cookie(db, value2) == tgc
528
529    def test_get_template(self):
530        # we can load templates
531        assert get_template('not-existing-template') is None
532        assert get_template('login.html') is not None
533
534    def test_delete_session_cookie(self):
535        # we can unset cookies
536        db = DB('sqlite:///')
537        tgc = create_tgc_value()
538        db.add(tgc)
539        value = tgc.value
540        resp = delete_session_cookie(db, Response(), old_value=value)
541        assert 'Set-Cookie' in resp.headers
542        cookie = resp.headers['Set-Cookie']
543        assert RE_COOKIE_DEL.match(cookie), (
544            'Cookie in unexpected format: %s' % cookie)
545        # the cookie values was deleted from database
546        q = db.query(TicketGrantingCookie).filter(
547            TicketGrantingCookie.value == value)
548        assert len(list(q)) == 0
549
550    def test_check_service_ticket(self):
551        db = DB('sqlite:///')
552        st = ServiceTicket(
553            'ST-123456', 'someuser', 'http://myservice.com', True)
554        db.add(st)
555        assert check_service_ticket(db, None, 'foo') is None
556        assert check_service_ticket(db, 'foo', None) is None
557        assert check_service_ticket(db, 'ST-123456', 'foo') is None
558        assert check_service_ticket(db, 'foo', 'http://myservice.com') is None
559        result = check_service_ticket(db, 'ST-123456', 'http://myservice.com')
560        assert isinstance(result, ServiceTicket)
561        assert result.user == 'someuser'
562        assert check_service_ticket(
563            db,  'ST-123456', 'http://myservice.com', True) is None
564        assert check_service_ticket(
565            db,  'ST-123456', 'http://myservice.com', False) is not None
566
567    def test_update_url(self):
568        # we can create valid new urls with query string params updated
569        url = 'http://sample.com/index?a=1&b=2'
570        result1 = update_url('http://sample.com/index?a=1&b=2', dict(b='3'))
571        assert result1 == 'http://sample.com/index?a=1&b=3'
572        result2 = update_url('http://sample.com/index?b=2', dict(b='3'))
573        assert result2 == 'http://sample.com/index?b=3'
574        result3 = update_url('http://sample.com/index', dict(b='3'))
575        assert result3 == 'http://sample.com/index?b=3'
576        result4 = update_url('http://sample.com/index?a=2', dict(b='3'))
577        assert result4 == 'http://sample.com/index?a=2&b=3'
Note: See TracBrowser for help on using the repository browser.