source: main/waeup.cas/trunk/waeup/cas/tests/test_server.py @ 10490

Last change on this file since 10490 was 10490, checked in by uli, 11 years ago

Regression test: CAS must allow GET params in service URLs.

File size: 22.1 KB
RevLine 
[10321]1import os
[10394]2import re
[10321]3import shutil
4import tempfile
5import unittest
6from paste.deploy import loadapp
[10394]7from webob import Request, Response
8from webtest import TestApp as WebTestApp  # avoid py.test skip message
9from waeup.cas.authenticators import DummyAuthenticator
10from waeup.cas.db import DB, LoginTicket, ServiceTicket, TicketGrantingCookie
11from waeup.cas.server import (
12    CASServer, create_service_ticket, create_login_ticket,
13    create_tgc_value, check_login_ticket, set_session_cookie,
[10414]14    check_session_cookie, get_template, delete_session_cookie,
[10416]15    check_service_ticket,
[10394]16    )
[10321]17
[10394]18RE_ALPHABET = re.compile('^[a-zA-Z0-9\-]*$')
19RE_COOKIE = re.compile('^cas-tgc=[A-Za-z0-9\-]+; Path=/; secure; HttpOnly$')
[10414]20RE_COOKIE_DEL = re.compile(
21    '^cas-tgc=; Max-Age=\-[0-9]+; Path=/; '
22    'expires=Thu, 01-Jan-1970 00:00:00 GMT; secure; HttpOnly$')
[10394]23
24
[10321]25class CASServerTests(unittest.TestCase):
26
27    def setUp(self):
[10348]28        # Create a new location where tempfiles are created.  This way
29        # also temporary dirs of local CASServers can be removed on
30        # tear-down.
31        self._new_tmpdir = tempfile.mkdtemp()
32        self._old_tmpdir = tempfile.tempdir
33        tempfile.tempdir = self._new_tmpdir
34        self.workdir = os.path.join(self._new_tmpdir, 'home')
35        self.db_path = os.path.join(self.workdir, 'mycas.db')
[10394]36        os.mkdir(self.workdir)
[10321]37        self.paste_conf1 = os.path.join(
38            os.path.dirname(__file__), 'sample1.ini')
[10352]39        self.paste_conf2 = os.path.join(
40            os.path.dirname(__file__), 'sample2.ini')
[10321]41
42    def tearDown(self):
[10348]43        # remove local tempfile and reset old tempdir setting
44        if os.path.isdir(self._new_tmpdir):
45            shutil.rmtree(self._new_tmpdir)
46        tempfile.tempdir = self._old_tmpdir
[10321]47
48    def test_paste_deploy_loader(self):
49        # we can load the CAS server via paste.deploy plugin
50        app = loadapp('config:%s' % self.paste_conf1)
51        assert isinstance(app, CASServer)
[10394]52        assert hasattr(app, 'db')
53        assert isinstance(app.db, DB)
54        assert hasattr(app, 'auth')
[10321]55
56    def test_paste_deploy_options(self):
57        # we can set CAS server-related options via paste.deploy config
[10352]58        app = loadapp('config:%s' % self.paste_conf2)
[10321]59        assert isinstance(app, CASServer)
[10394]60        assert app.db_connection_string == 'sqlite:///:memory:'
61        assert isinstance(app.auth, DummyAuthenticator)
[10321]62
[10351]63    def test_init(self):
[10394]64        # we get a `DB` instance created automatically
[10351]65        app = CASServer()
[10394]66        assert hasattr(app, 'db')
67        assert app.db is not None
[10351]68
69    def test_init_explicit_db_path(self):
70        # we can set a db_path explicitly
[10394]71        app = CASServer(db='sqlite:///%s' % self.db_path)
72        assert hasattr(app, 'db')
73        assert isinstance(app.db, DB)
74        assert os.path.isfile(self.db_path)
[10351]75
[10394]76    def test_get_template(self):
77        app = CASServer()
78        assert app._get_template('login.html') is not None
79        assert app._get_template('not-existent.html') is None
80
[10321]81    def test_call_root(self):
[10326]82        # the CAS protocol requires no root
[10321]83        app = CASServer()
84        req = Request.blank('http://localhost/')
85        resp = app(req)
[10322]86        assert resp.status == '404 Not Found'
[10326]87
88    def test_first_time_login(self):
89        # we can get a login page
90        app = CASServer()
91        req = Request.blank('http://localhost/login')
92        resp = app(req)
93        assert resp.status == '200 OK'
94
95    def test_validate(self):
96        # we can access a validation page
97        app = CASServer()
[10416]98        req = Request.blank('http://localhost/validate?service=foo&ticket=bar')
[10326]99        resp = app(req)
[10416]100        assert resp.status == '200 OK'
[10326]101
102    def test_logout(self):
103        # we can access a logout page
104        app = CASServer()
105        req = Request.blank('http://localhost/logout')
106        resp = app(req)
[10415]107        assert resp.status == '200 OK'
[10335]108
109    def test_login_simple(self):
110        # a simple login with no service will result in login screen
111        # (2.1.1#service of protocol specs)
112        app = CASServer()
113        req = Request.blank('http://localhost/login')
114        resp = app(req)
115        assert resp.status == '200 OK'
116        assert resp.content_type == 'text/html'
117        assert b'<form ' in resp.body
[10394]118
[10401]119    def test_login_cred_acceptor_sso_no_service(self):
120        # 2.2.4: successful login via single sign on
121        app = CASServer()
122        tgc = create_tgc_value()
123        app.db.add(tgc)
124        value = str(tgc.value)
125        req = Request.blank('https://localhost/login')
126        req.headers['Cookie'] = 'cas-tgc=%s' % value
127        resp = app(req)
128        assert resp.status == '200 OK'
129        assert b'already' in resp.body
130        assert 'Set-Cookie' not in resp.headers
131        return
[10394]132
[10411]133    def test_login_renew_with_cookie(self):
134        # 2.1.1: cookie will be ignored when renew is set
135        app = CASServer()
136        tgc = create_tgc_value()
137        app.db.add(tgc)
138        value = str(tgc.value)
139        req = Request.blank('https://localhost/login?renew=true')
140        req.headers['Cookie'] = 'cas-tgc=%s' % value
141        resp = app(req)
142        assert resp.status == '200 OK'
143        assert b'username' in resp.body
144        assert 'Set-Cookie' not in resp.headers
[10401]145
[10412]146    def test_login_renew_without_cookie(self):
147        # 2.1.1: with renew and no cookie, normal auth will happen
148        app = CASServer()
149        req = Request.blank('https://localhost/login?renew=true')
150        resp = app(req)
151        assert resp.status == '200 OK'
152        assert b'username' in resp.body
153
[10411]154    def test_login_renew_as_empty_string(self):
155        # `renew` is handled correctly, even with empty value
156        app = CASServer()
157        tgc = create_tgc_value()
158        app.db.add(tgc)
159        value = str(tgc.value)
160        req = Request.blank('https://localhost/login?renew')
161        req.headers['Cookie'] = 'cas-tgc=%s' % value
162        resp = app(req)
163        assert resp.status == '200 OK'
164        assert b'username' in resp.body
165        assert 'Set-Cookie' not in resp.headers
166
[10412]167    def test_login_gateway_no_cookie_with_service(self):
168        # 2.1.1: with gateway but w/o cookie we will be redirected to service
169        # no service ticket will be issued
170        app = CASServer()
171        params = 'gateway=true&service=http%3A%2F%2Fwww.service.com'
172        req = Request.blank('https://localhost/login?%s' % params)
173        resp = app(req)
174        assert resp.status == '303 See Other'
175        assert 'Location' in resp.headers
176        assert resp.headers['Location'] == 'http://www.service.com'
[10411]177
[10412]178    def test_login_gateway_with_cookie_and_service(self):
179        # 2.1.1: with cookie and gateway we will be redirected to service
180        app = CASServer()
181        tgc = create_tgc_value()
182        app.db.add(tgc)
183        value = str(tgc.value)
184        params = 'gateway=true&service=http%3A%2F%2Fwww.service.com'
185        req = Request.blank('https://localhost/login?%s' % params)
186        req.headers['Cookie'] = 'cas-tgc=%s' % value
187        resp = app(req)
188        assert resp.status == '303 See Other'
189        assert 'Location' in resp.headers
190        assert resp.headers['Location'].startswith(
191            'http://www.service.com?ticket=ST-')
192
193    def test_login_gateway_and_renew(self):
194        # 2.1.1 if both, gateway and renew are specified, only renew is valid
195        app = CASServer()
196        tgc = create_tgc_value()
197        app.db.add(tgc)
198        value = str(tgc.value)
199        req = Request.blank('https://localhost/login?renew=true&gateway=true')
200        req.headers['Cookie'] = 'cas-tgc=%s' % value
201        resp = app(req)
202        # with only gateway true, this would lead to a redirect
203        assert resp.status == '200 OK'
204        assert b'username' in resp.body
205        assert 'Set-Cookie' not in resp.headers
206
[10413]207    def test_login_warn(self):
208        # 2.2.1 as a credential acceptor, with `warn` set we require confirm
209        app = CASServer()
210        tgc = create_tgc_value()
211        app.db.add(tgc)
212        value = str(tgc.value)
213        params = 'warn=true&service=http%3A%2F%2Fwww.service.com'
214        req = Request.blank('https://localhost/login?%s' % params)
215        req.headers['Cookie'] = 'cas-tgc=%s' % value
216        resp = app(req)
217        # without warn, we would get a redirect
218        assert resp.status == '200 OK'
219        assert b'CAS login successful' in resp.body
[10412]220
[10415]221    def test_logout_no_cookie(self):
222        # 2.3 logout displays a logout page.
223        app = CASServer()
224        req = Request.blank('https://localhost/logout')
225        resp = app(req)
226        assert resp.status == '200 OK'
227        assert b'logged out' in resp.body
[10413]228
[10415]229    def test_logout_with_cookie(self):
230        # 2.3 logout destroys any existing SSO session
231        app = CASServer()
232        tgc = create_tgc_value()
233        app.db.add(tgc)
234        value = str(tgc.value)
235        req = Request.blank('https://localhost/logout')
236        req.headers['Cookie'] = 'cas-tgc=%s' % value
237        resp = app(req)
238        assert resp.status == '200 OK'
239        assert b'logged out' in resp.body
240        assert 'Set-Cookie' in resp.headers
241        cookie = resp.headers['Set-Cookie']
242        assert cookie.startswith('cas-tgc=;')
243        assert 'expires' in cookie
244        assert 'Max-Age' in cookie
245        assert len(list(app.db.query(TicketGrantingCookie))) == 0
246
247    def test_logout_url(self):
248        # 2.3.1 with an `url` given we provide a link on logout
249        app = CASServer()
250        params = 'url=http%3A%2F%2Fwww.logout.com'
251        req = Request.blank('https://localhost/logout?%s' % params)
252        resp = app(req)
253        assert resp.status == '200 OK'
254        assert b'logged out' in resp.body
255        assert b'like you to' in resp.body
256        assert b'http://www.logout.com' in resp.body
257
[10416]258    def test_validate_invalid(self):
259        # 2.4.2 validation failures is indicated by a given format
260        app = CASServer()
261        params = 'ticket=foo&service=bar'
262        req = Request.blank('https://localhost/validate?%s' % params)
263        resp = app(req)
264        assert resp.body == b'no\n\n'
[10415]265
[10416]266    def test_validate_valid(self):
267        # 2.4 validation success is indicated by a given format
268        app = CASServer()
269        sticket = create_service_ticket(
270            'someuser', 'http://service.com/', sso=False)
271        app.db.add(sticket)
272        params = 'ticket=%s&service=%s' % (
273            sticket.ticket, sticket.service)
274        req = Request.blank('https://localhost/validate?%s' % params)
275        resp = app(req)
276        assert resp.body == b'yes\nsomeuser\n'
277
278    def test_validate_renew_invalid(self):
279        # 2.4.1 with `renew` we accept only non-sso issued tickets
280        app = CASServer()
281        sticket = create_service_ticket(
282            'someuser', 'http://service.com/', sso=True)
283        app.db.add(sticket)
284        params = 'ticket=%s&service=%s&renew=true' % (
285            sticket.ticket, sticket.service)
286        req = Request.blank('https://localhost/validate?%s' % params)
287        resp = app(req)
288        assert resp.body == b'no\n\n'
289
290    def test_validate_renew_valid(self):
291        # 2.4.1 with `renew` we accept only non-sso issued tickets
292        app = CASServer()
293        sticket = create_service_ticket(
294            'someuser', 'http://service.com/', sso=False)
295        app.db.add(sticket)
296        params = 'ticket=%s&service=%s&renew=true' % (
297            sticket.ticket, sticket.service)
298        req = Request.blank('https://localhost/validate?%s' % params)
299        resp = app(req)
300        assert resp.body == b'yes\nsomeuser\n'
301
302
[10394]303class BrowserTests(unittest.TestCase):
304
305    def setUp(self):
306        self.raw_app = CASServer(auth=DummyAuthenticator())
307        self.app = WebTestApp(self.raw_app)
308
309    def test_login(self):
310        resp = self.app.get('/login')
311        assert resp.status == '200 OK'
312        form = resp.forms[0]
313        # 2.1.3: form must be submitted by POST
314        assert form.method == 'post'
315        fieldnames = form.fields.keys()
316        # 2.1.3: form must contain: username, password, lt
317        assert 'username' in fieldnames
318        assert 'password' in fieldnames
319        assert 'lt' in fieldnames
320        assert RE_ALPHABET.match(form['lt'].value)
321
322    def test_login_no_service(self):
323        # w/o a service passed in, the form should not contain service
324        # (not a strict protocol requirement, but handy)
325        resp = self.app.get('/login')
326        assert 'service' not in resp.forms[0].fields.keys()
327
328    def test_login_service_replayed(self):
329        # 2.1.3: the login form must contain the service param sent
330        resp = self.app.get('/login?service=http%3A%2F%2Fwww.service.com')
331        form = resp.forms[0]
332        assert resp.status == '200 OK'
333        assert 'service' in form.fields.keys()
334        assert form['service'].value == 'http://www.service.com'
335
336    def test_login_cred_acceptor_valid_no_service(self):
337        # 2.2.4: successful login w/o service yields a message
338        lt = create_login_ticket()
339        self.raw_app.db.add(lt)
340        lt_string = lt.ticket
341        resp = self.app.post('/login', dict(
342            username='bird', password='bebop', lt=lt_string))
343        assert resp.status == '200 OK'
344        assert b'successful' in resp.body
345        # single-sign-on session initiated
346        assert 'Set-Cookie' in resp.headers
347        cookie = resp.headers['Set-Cookie']
348        assert cookie.startswith('cas-tgc=')
349
350    def test_login_cred_acceptor_valid_w_service(self):
351        # 2.2.4: successful login with service makes a redirect
352        # Appendix B: safe redirect
353        lt = create_login_ticket()
354        self.raw_app.db.add(lt)
355        lt_string = lt.ticket
356        resp = self.app.post('/login', dict(
357            username='bird', password='bebop', lt=lt_string,
358            service='http://example.com/Login'))
359        assert resp.status == '303 See Other'
360        assert 'Location' in resp.headers
361        assert resp.headers['Location'].startswith(
362            'http://example.com/Login?ticket=ST-')
363        assert 'Pragma' in resp.headers
364        assert resp.headers['Pragma'] == 'no-cache'
365        assert 'Cache-Control' in resp.headers
366        assert resp.headers['Cache-Control'] == 'no-store'
367        assert 'Expires' in resp.headers
368        assert resp.headers['Expires'] == 'Thu, 01 Dec 1994 16:00:00 GMT'
369        assert b'window.location.href' in resp.body
370        assert b'noscript' in resp.body
371        assert b'ticket=ST-' in resp.body
372
[10397]373    def test_login_cred_acceptor_failed(self):
374        # 2.2.4: failed login yields a message
375        lt = create_login_ticket()
376        self.raw_app.db.add(lt)
377        lt_string = lt.ticket
378        resp = self.app.post('/login', dict(
379            username='bird', password='cat', lt=lt_string))
380        assert resp.status == '200 OK'
381        assert b'failed' in resp.body
[10394]382
[10401]383    def test_login_sso_no_service(self):
384        # we can initiate single-sign-on without service
385        resp1 = self.app.get('https://localhost/login')  # HTTPS required!
386        assert resp1.status == '200 OK'
387        assert 'cas-tgc' not in self.app.cookies
388        form = resp1.forms[0]
389        form.set('username', 'bird')
390        form.set('password', 'bebop')
391        resp2 = form.submit('AUTHENTICATE')
392        assert resp2.status == '200 OK'
393        # we got a secure cookie
394        assert 'cas-tgc' in self.app.cookies
395        # when we get the login page again, the cookie will replace creds.
396        resp3 = self.app.get('https://localhost/login')
397        assert b'You logged in already' in resp3.body
[10397]398
[10409]399    def test_login_sso_with_service(self):
400        resp1 = self.app.get(
[10410]401            'https://localhost/login?service=http%3A%2F%2Fservice.com%2F')
[10409]402        assert resp1.status == '200 OK'
403        assert 'cas-tgc' not in self.app.cookies
404        form = resp1.forms[0]
405        form.set('username', 'bird')
406        form.set('password', 'bebop')
407        resp2 = form.submit('AUTHENTICATE')
408        assert resp2.status == '303 See Other'
409        assert resp2.headers['Location'].startswith(
410            'http://service.com/?ticket=ST-')
411        # we got a secure cookie
412        assert 'cas-tgc' in self.app.cookies
413        resp3 = self.app.get(
[10410]414            'https://localhost/login?service=http%3A%2F%2Fservice.com%2F')
[10409]415        assert resp3.status == '303 See Other'
416        assert resp3.headers['Location'].startswith(
417            'http://service.com/?ticket=ST-')
[10401]418
[10490]419    def test_login_sso_with_service_additional_params1(self):
420        # we can get a service ticket also with a service providing
421        # get params
422        # this service url reads http://service.com/index.php?authCAS=CAS
423        service_url = 'http%3A%2F%2Fservice.com%2Findex.php%3FauthCAS%3DCAS'
424        resp1 = self.app.get(
425            'https://localhost/login?service=%s' % service_url)
426        assert resp1.status == '200 OK'
427        assert 'cas-tgc' not in self.app.cookies
428        form = resp1.forms[0]
429        form.set('username', 'bird')
430        form.set('password', 'bebop')
431        resp2 = form.submit('AUTHENTICATE')
432        assert resp2.status == '303 See Other'
433        location = resp2.headers['Location']
434        ticket_value = location[48:]
435        exp_location = 'http://service.com/index.php'
436        exp_location += '?authCAS=CAS&ticket=' + ticket_value
437        # there is only one ticket value in response location
438        assert len(ticket_value) == 32
439        assert location == exp_location
[10409]440
[10490]441
[10394]442class CASServerHelperTests(unittest.TestCase):
443
444    def setUp(self):
445        self.workdir = tempfile.mkdtemp()
446        self.db_file = os.path.join(self.workdir, 'mycas.db')
447        self.conn_string = 'sqlite:///%s' % self.db_file
448        self.db = DB(self.conn_string)
449
450    def tearDown(self):
451        shutil.rmtree(self.workdir)
452
453    def test_create_service_ticket(self):
454        # we can create service tickets
455        st = create_service_ticket(
456            user='bob', service='http://www.example.com')
457        assert isinstance(st, ServiceTicket)
458        # 3.1.1: service not part of ticket
459        assert 'example.com' not in st.ticket
460        # 3.1.1: ticket must start with 'ST-'
461        assert st.ticket.startswith('ST-')
462        # 3.1.1: min. ticket length clients must be able to process is 32
463        assert len(st.ticket) < 33
464        # 3.7: allowed character set == [a-zA-Z0-9\-]
465        assert RE_ALPHABET.match(st.ticket), (
466            'Ticket contains forbidden chars: %s' % st)
467
468    def test_create_login_ticket(self):
469        # we can create login tickets
470        lt = create_login_ticket()
471        # 3.5.1: ticket should start with 'LT-'
472        assert lt.ticket.startswith('LT-')
473        # 3.7: allowed character set == [a-zA-Z0-9\-]
474        assert RE_ALPHABET.match(lt.ticket), (
475            'Ticket contains forbidden chars: %s' % lt)
476
477    def test_create_login_ticket_unique(self):
478        # 3.5.1: login tickets are unique (although not hard to guess)
479        ticket_num = 1000  # increase to test more thoroughly
480        lt_list = [create_login_ticket() for x in range(ticket_num)]
481        assert len(set(lt_list)) == ticket_num
482
483    def test_create_tgc_value(self):
484        # we can create ticket granting cookies
485        tgc = create_tgc_value()
486        assert isinstance(tgc, TicketGrantingCookie)
487        # 3.6.1: cookie value should start with 'TGC-'
488        assert tgc.value.startswith('TGC-')
489        # 3.7: allowed character set == [a-zA-Z0-9\-]
490        assert RE_ALPHABET.match(tgc.value), (
491            'Cookie value contains forbidden chars: %s' % tgc)
492
493    def test_check_login_ticket(self):
494        db = DB('sqlite:///')
495        lt = LoginTicket('LT-123456')
496        db.add(lt)
497        assert check_login_ticket(db, None) is False
498        assert check_login_ticket(db, 'LT-123456') is True
499        # the ticket will be removed after check
500        assert check_login_ticket(db, 'LT-123456') is False
501        assert check_login_ticket(db, 'LT-654321') is False
502
[10490]503    def test_set_session_cookie1(self):
[10394]504        # make sure we can add session cookies to responses
505        db = DB('sqlite:///')
[10405]506        resp = set_session_cookie(db, Response())
[10394]507        assert 'Set-Cookie' in resp.headers
508        cookie = resp.headers['Set-Cookie']
509        assert RE_COOKIE.match(cookie), (
510            'Cookie in unexpected format: %s' % cookie)
511        # the cookie is stored in database
512        value = cookie.split('=')[1].split(';')[0]
513        q = db.query(TicketGrantingCookie).filter(
514            TicketGrantingCookie.value == value)
515        assert len(list(q)) == 1
[10398]516
[10490]517    def test_check_session_cookie2(self):
[10398]518        db = DB('sqlite:///')
519        tgc = create_tgc_value()
520        db.add(tgc)
521        value = tgc.value
522        assert check_session_cookie(db, value) == tgc
523        assert check_session_cookie(db, 'foo') is None
524        assert check_session_cookie(db, b'foo') is None
[10402]525        assert check_session_cookie(db, None) is None
[10398]526        value2 = value.encode('utf-8')
527        assert check_session_cookie(db, value2) == tgc
[10404]528
529    def test_get_template(self):
530        # we can load templates
531        assert get_template('not-existing-template') is None
532        assert get_template('login.html') is not None
[10414]533
534    def test_delete_session_cookie(self):
535        # we can unset cookies
536        db = DB('sqlite:///')
537        tgc = create_tgc_value()
538        db.add(tgc)
539        value = tgc.value
540        resp = delete_session_cookie(db, Response(), old_value=value)
541        assert 'Set-Cookie' in resp.headers
542        cookie = resp.headers['Set-Cookie']
543        assert RE_COOKIE_DEL.match(cookie), (
544            'Cookie in unexpected format: %s' % cookie)
545        # the cookie values was deleted from database
546        q = db.query(TicketGrantingCookie).filter(
547            TicketGrantingCookie.value == value)
548        assert len(list(q)) == 0
[10416]549
550    def test_check_service_ticket(self):
551        db = DB('sqlite:///')
552        st = ServiceTicket(
553            'ST-123456', 'someuser', 'http://myservice.com', True)
554        db.add(st)
555        assert check_service_ticket(db, None, 'foo') is None
556        assert check_service_ticket(db, 'foo', None) is None
557        assert check_service_ticket(db, 'ST-123456', 'foo') is None
558        assert check_service_ticket(db, 'foo', 'http://myservice.com') is None
559        result = check_service_ticket(db, 'ST-123456', 'http://myservice.com')
560        assert isinstance(result, ServiceTicket)
561        assert result.user == 'someuser'
562        assert check_service_ticket(
563            db,  'ST-123456', 'http://myservice.com', True) is None
564        assert check_service_ticket(
565            db,  'ST-123456', 'http://myservice.com', False) is not None
Note: See TracBrowser for help on using the repository browser.