[10321] | 1 | import os |
---|
[10394] | 2 | import re |
---|
[10321] | 3 | import shutil |
---|
| 4 | import tempfile |
---|
| 5 | import unittest |
---|
| 6 | from paste.deploy import loadapp |
---|
[10394] | 7 | from webob import Request, Response |
---|
| 8 | from webtest import TestApp as WebTestApp # avoid py.test skip message |
---|
| 9 | from waeup.cas.authenticators import DummyAuthenticator |
---|
| 10 | from waeup.cas.db import DB, LoginTicket, ServiceTicket, TicketGrantingCookie |
---|
| 11 | from waeup.cas.server import ( |
---|
| 12 | CASServer, create_service_ticket, create_login_ticket, |
---|
| 13 | create_tgc_value, check_login_ticket, set_session_cookie, |
---|
[10404] | 14 | check_session_cookie, get_template, |
---|
[10394] | 15 | ) |
---|
[10321] | 16 | |
---|
[10394] | 17 | RE_ALPHABET = re.compile('^[a-zA-Z0-9\-]*$') |
---|
| 18 | RE_COOKIE = re.compile('^cas-tgc=[A-Za-z0-9\-]+; Path=/; secure; HttpOnly$') |
---|
| 19 | |
---|
| 20 | |
---|
[10321] | 21 | class CASServerTests(unittest.TestCase): |
---|
| 22 | |
---|
| 23 | def setUp(self): |
---|
[10348] | 24 | # Create a new location where tempfiles are created. This way |
---|
| 25 | # also temporary dirs of local CASServers can be removed on |
---|
| 26 | # tear-down. |
---|
| 27 | self._new_tmpdir = tempfile.mkdtemp() |
---|
| 28 | self._old_tmpdir = tempfile.tempdir |
---|
| 29 | tempfile.tempdir = self._new_tmpdir |
---|
| 30 | self.workdir = os.path.join(self._new_tmpdir, 'home') |
---|
| 31 | self.db_path = os.path.join(self.workdir, 'mycas.db') |
---|
[10394] | 32 | os.mkdir(self.workdir) |
---|
[10321] | 33 | self.paste_conf1 = os.path.join( |
---|
| 34 | os.path.dirname(__file__), 'sample1.ini') |
---|
[10352] | 35 | self.paste_conf2 = os.path.join( |
---|
| 36 | os.path.dirname(__file__), 'sample2.ini') |
---|
[10321] | 37 | |
---|
| 38 | def tearDown(self): |
---|
[10348] | 39 | # remove local tempfile and reset old tempdir setting |
---|
| 40 | if os.path.isdir(self._new_tmpdir): |
---|
| 41 | shutil.rmtree(self._new_tmpdir) |
---|
| 42 | tempfile.tempdir = self._old_tmpdir |
---|
[10321] | 43 | |
---|
| 44 | def test_paste_deploy_loader(self): |
---|
| 45 | # we can load the CAS server via paste.deploy plugin |
---|
| 46 | app = loadapp('config:%s' % self.paste_conf1) |
---|
| 47 | assert isinstance(app, CASServer) |
---|
[10394] | 48 | assert hasattr(app, 'db') |
---|
| 49 | assert isinstance(app.db, DB) |
---|
| 50 | assert hasattr(app, 'auth') |
---|
[10321] | 51 | |
---|
| 52 | def test_paste_deploy_options(self): |
---|
| 53 | # we can set CAS server-related options via paste.deploy config |
---|
[10352] | 54 | app = loadapp('config:%s' % self.paste_conf2) |
---|
[10321] | 55 | assert isinstance(app, CASServer) |
---|
[10394] | 56 | assert app.db_connection_string == 'sqlite:///:memory:' |
---|
| 57 | assert isinstance(app.auth, DummyAuthenticator) |
---|
[10321] | 58 | |
---|
[10351] | 59 | def test_init(self): |
---|
[10394] | 60 | # we get a `DB` instance created automatically |
---|
[10351] | 61 | app = CASServer() |
---|
[10394] | 62 | assert hasattr(app, 'db') |
---|
| 63 | assert app.db is not None |
---|
[10351] | 64 | |
---|
| 65 | def test_init_explicit_db_path(self): |
---|
| 66 | # we can set a db_path explicitly |
---|
[10394] | 67 | app = CASServer(db='sqlite:///%s' % self.db_path) |
---|
| 68 | assert hasattr(app, 'db') |
---|
| 69 | assert isinstance(app.db, DB) |
---|
| 70 | assert os.path.isfile(self.db_path) |
---|
[10351] | 71 | |
---|
[10394] | 72 | def test_get_template(self): |
---|
| 73 | app = CASServer() |
---|
| 74 | assert app._get_template('login.html') is not None |
---|
| 75 | assert app._get_template('not-existent.html') is None |
---|
| 76 | |
---|
[10321] | 77 | def test_call_root(self): |
---|
[10326] | 78 | # the CAS protocol requires no root |
---|
[10321] | 79 | app = CASServer() |
---|
| 80 | req = Request.blank('http://localhost/') |
---|
| 81 | resp = app(req) |
---|
[10322] | 82 | assert resp.status == '404 Not Found' |
---|
[10326] | 83 | |
---|
| 84 | def test_first_time_login(self): |
---|
| 85 | # we can get a login page |
---|
| 86 | app = CASServer() |
---|
| 87 | req = Request.blank('http://localhost/login') |
---|
| 88 | resp = app(req) |
---|
| 89 | assert resp.status == '200 OK' |
---|
| 90 | |
---|
| 91 | def test_validate(self): |
---|
| 92 | # we can access a validation page |
---|
| 93 | app = CASServer() |
---|
| 94 | req = Request.blank('http://localhost/validate') |
---|
| 95 | resp = app(req) |
---|
| 96 | assert resp.status == '501 Not Implemented' |
---|
| 97 | |
---|
| 98 | def test_logout(self): |
---|
| 99 | # we can access a logout page |
---|
| 100 | app = CASServer() |
---|
| 101 | req = Request.blank('http://localhost/logout') |
---|
| 102 | resp = app(req) |
---|
| 103 | assert resp.status == '501 Not Implemented' |
---|
[10335] | 104 | |
---|
| 105 | def test_login_simple(self): |
---|
| 106 | # a simple login with no service will result in login screen |
---|
| 107 | # (2.1.1#service of protocol specs) |
---|
| 108 | app = CASServer() |
---|
| 109 | req = Request.blank('http://localhost/login') |
---|
| 110 | resp = app(req) |
---|
| 111 | assert resp.status == '200 OK' |
---|
| 112 | assert resp.content_type == 'text/html' |
---|
| 113 | assert b'<form ' in resp.body |
---|
[10394] | 114 | |
---|
[10401] | 115 | def test_login_cred_acceptor_sso_no_service(self): |
---|
| 116 | # 2.2.4: successful login via single sign on |
---|
| 117 | app = CASServer() |
---|
| 118 | tgc = create_tgc_value() |
---|
| 119 | app.db.add(tgc) |
---|
| 120 | value = str(tgc.value) |
---|
| 121 | req = Request.blank('https://localhost/login') |
---|
| 122 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 123 | resp = app(req) |
---|
| 124 | assert resp.status == '200 OK' |
---|
| 125 | assert b'already' in resp.body |
---|
| 126 | assert 'Set-Cookie' not in resp.headers |
---|
| 127 | return |
---|
[10394] | 128 | |
---|
[10401] | 129 | |
---|
[10394] | 130 | class BrowserTests(unittest.TestCase): |
---|
| 131 | |
---|
| 132 | def setUp(self): |
---|
| 133 | self.raw_app = CASServer(auth=DummyAuthenticator()) |
---|
| 134 | self.app = WebTestApp(self.raw_app) |
---|
| 135 | |
---|
| 136 | def test_login(self): |
---|
| 137 | resp = self.app.get('/login') |
---|
| 138 | assert resp.status == '200 OK' |
---|
| 139 | form = resp.forms[0] |
---|
| 140 | # 2.1.3: form must be submitted by POST |
---|
| 141 | assert form.method == 'post' |
---|
| 142 | fieldnames = form.fields.keys() |
---|
| 143 | # 2.1.3: form must contain: username, password, lt |
---|
| 144 | assert 'username' in fieldnames |
---|
| 145 | assert 'password' in fieldnames |
---|
| 146 | assert 'lt' in fieldnames |
---|
| 147 | assert RE_ALPHABET.match(form['lt'].value) |
---|
| 148 | |
---|
| 149 | def test_login_no_service(self): |
---|
| 150 | # w/o a service passed in, the form should not contain service |
---|
| 151 | # (not a strict protocol requirement, but handy) |
---|
| 152 | resp = self.app.get('/login') |
---|
| 153 | assert 'service' not in resp.forms[0].fields.keys() |
---|
| 154 | |
---|
| 155 | def test_login_service_replayed(self): |
---|
| 156 | # 2.1.3: the login form must contain the service param sent |
---|
| 157 | resp = self.app.get('/login?service=http%3A%2F%2Fwww.service.com') |
---|
| 158 | form = resp.forms[0] |
---|
| 159 | assert resp.status == '200 OK' |
---|
| 160 | assert 'service' in form.fields.keys() |
---|
| 161 | assert form['service'].value == 'http://www.service.com' |
---|
| 162 | |
---|
| 163 | def test_login_cred_acceptor_valid_no_service(self): |
---|
| 164 | # 2.2.4: successful login w/o service yields a message |
---|
| 165 | lt = create_login_ticket() |
---|
| 166 | self.raw_app.db.add(lt) |
---|
| 167 | lt_string = lt.ticket |
---|
| 168 | resp = self.app.post('/login', dict( |
---|
| 169 | username='bird', password='bebop', lt=lt_string)) |
---|
| 170 | assert resp.status == '200 OK' |
---|
| 171 | assert b'successful' in resp.body |
---|
| 172 | # single-sign-on session initiated |
---|
| 173 | assert 'Set-Cookie' in resp.headers |
---|
| 174 | cookie = resp.headers['Set-Cookie'] |
---|
| 175 | assert cookie.startswith('cas-tgc=') |
---|
| 176 | |
---|
| 177 | def test_login_cred_acceptor_valid_w_service(self): |
---|
| 178 | # 2.2.4: successful login with service makes a redirect |
---|
| 179 | # Appendix B: safe redirect |
---|
| 180 | lt = create_login_ticket() |
---|
| 181 | self.raw_app.db.add(lt) |
---|
| 182 | lt_string = lt.ticket |
---|
| 183 | resp = self.app.post('/login', dict( |
---|
| 184 | username='bird', password='bebop', lt=lt_string, |
---|
| 185 | service='http://example.com/Login')) |
---|
| 186 | assert resp.status == '303 See Other' |
---|
| 187 | assert 'Location' in resp.headers |
---|
| 188 | assert resp.headers['Location'].startswith( |
---|
| 189 | 'http://example.com/Login?ticket=ST-') |
---|
| 190 | assert 'Pragma' in resp.headers |
---|
| 191 | assert resp.headers['Pragma'] == 'no-cache' |
---|
| 192 | assert 'Cache-Control' in resp.headers |
---|
| 193 | assert resp.headers['Cache-Control'] == 'no-store' |
---|
| 194 | assert 'Expires' in resp.headers |
---|
| 195 | assert resp.headers['Expires'] == 'Thu, 01 Dec 1994 16:00:00 GMT' |
---|
| 196 | assert b'window.location.href' in resp.body |
---|
| 197 | assert b'noscript' in resp.body |
---|
| 198 | assert b'ticket=ST-' in resp.body |
---|
| 199 | |
---|
[10397] | 200 | def test_login_cred_acceptor_failed(self): |
---|
| 201 | # 2.2.4: failed login yields a message |
---|
| 202 | lt = create_login_ticket() |
---|
| 203 | self.raw_app.db.add(lt) |
---|
| 204 | lt_string = lt.ticket |
---|
| 205 | resp = self.app.post('/login', dict( |
---|
| 206 | username='bird', password='cat', lt=lt_string)) |
---|
| 207 | assert resp.status == '200 OK' |
---|
| 208 | assert b'failed' in resp.body |
---|
[10394] | 209 | |
---|
[10401] | 210 | def test_login_sso_no_service(self): |
---|
| 211 | # we can initiate single-sign-on without service |
---|
| 212 | resp1 = self.app.get('https://localhost/login') # HTTPS required! |
---|
| 213 | assert resp1.status == '200 OK' |
---|
| 214 | assert 'cas-tgc' not in self.app.cookies |
---|
| 215 | form = resp1.forms[0] |
---|
| 216 | form.set('username', 'bird') |
---|
| 217 | form.set('password', 'bebop') |
---|
| 218 | resp2 = form.submit('AUTHENTICATE') |
---|
| 219 | assert resp2.status == '200 OK' |
---|
| 220 | # we got a secure cookie |
---|
| 221 | assert 'cas-tgc' in self.app.cookies |
---|
| 222 | # when we get the login page again, the cookie will replace creds. |
---|
| 223 | resp3 = self.app.get('https://localhost/login') |
---|
| 224 | assert b'You logged in already' in resp3.body |
---|
[10397] | 225 | |
---|
[10409] | 226 | def test_login_sso_with_service(self): |
---|
| 227 | resp1 = self.app.get( |
---|
| 228 | 'https://localhost/login?service=http://service.com/') |
---|
| 229 | assert resp1.status == '200 OK' |
---|
| 230 | assert 'cas-tgc' not in self.app.cookies |
---|
| 231 | form = resp1.forms[0] |
---|
| 232 | form.set('username', 'bird') |
---|
| 233 | form.set('password', 'bebop') |
---|
| 234 | resp2 = form.submit('AUTHENTICATE') |
---|
| 235 | assert resp2.status == '303 See Other' |
---|
| 236 | assert resp2.headers['Location'].startswith( |
---|
| 237 | 'http://service.com/?ticket=ST-') |
---|
| 238 | # we got a secure cookie |
---|
| 239 | assert 'cas-tgc' in self.app.cookies |
---|
| 240 | resp3 = self.app.get( |
---|
| 241 | 'https://localhost/login?service=http://service.com/') |
---|
| 242 | assert resp3.status == '303 See Other' |
---|
| 243 | assert resp3.headers['Location'].startswith( |
---|
| 244 | 'http://service.com/?ticket=ST-') |
---|
[10401] | 245 | |
---|
[10409] | 246 | |
---|
[10394] | 247 | class CASServerHelperTests(unittest.TestCase): |
---|
| 248 | |
---|
| 249 | def setUp(self): |
---|
| 250 | self.workdir = tempfile.mkdtemp() |
---|
| 251 | self.db_file = os.path.join(self.workdir, 'mycas.db') |
---|
| 252 | self.conn_string = 'sqlite:///%s' % self.db_file |
---|
| 253 | self.db = DB(self.conn_string) |
---|
| 254 | |
---|
| 255 | def tearDown(self): |
---|
| 256 | shutil.rmtree(self.workdir) |
---|
| 257 | |
---|
| 258 | def test_create_service_ticket(self): |
---|
| 259 | # we can create service tickets |
---|
| 260 | st = create_service_ticket( |
---|
| 261 | user='bob', service='http://www.example.com') |
---|
| 262 | assert isinstance(st, ServiceTicket) |
---|
| 263 | # 3.1.1: service not part of ticket |
---|
| 264 | assert 'example.com' not in st.ticket |
---|
| 265 | # 3.1.1: ticket must start with 'ST-' |
---|
| 266 | assert st.ticket.startswith('ST-') |
---|
| 267 | # 3.1.1: min. ticket length clients must be able to process is 32 |
---|
| 268 | assert len(st.ticket) < 33 |
---|
| 269 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
| 270 | assert RE_ALPHABET.match(st.ticket), ( |
---|
| 271 | 'Ticket contains forbidden chars: %s' % st) |
---|
| 272 | |
---|
| 273 | def test_create_login_ticket(self): |
---|
| 274 | # we can create login tickets |
---|
| 275 | lt = create_login_ticket() |
---|
| 276 | # 3.5.1: ticket should start with 'LT-' |
---|
| 277 | assert lt.ticket.startswith('LT-') |
---|
| 278 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
| 279 | assert RE_ALPHABET.match(lt.ticket), ( |
---|
| 280 | 'Ticket contains forbidden chars: %s' % lt) |
---|
| 281 | |
---|
| 282 | def test_create_login_ticket_unique(self): |
---|
| 283 | # 3.5.1: login tickets are unique (although not hard to guess) |
---|
| 284 | ticket_num = 1000 # increase to test more thoroughly |
---|
| 285 | lt_list = [create_login_ticket() for x in range(ticket_num)] |
---|
| 286 | assert len(set(lt_list)) == ticket_num |
---|
| 287 | |
---|
| 288 | def test_create_tgc_value(self): |
---|
| 289 | # we can create ticket granting cookies |
---|
| 290 | tgc = create_tgc_value() |
---|
| 291 | assert isinstance(tgc, TicketGrantingCookie) |
---|
| 292 | # 3.6.1: cookie value should start with 'TGC-' |
---|
| 293 | assert tgc.value.startswith('TGC-') |
---|
| 294 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
| 295 | assert RE_ALPHABET.match(tgc.value), ( |
---|
| 296 | 'Cookie value contains forbidden chars: %s' % tgc) |
---|
| 297 | |
---|
| 298 | def test_check_login_ticket(self): |
---|
| 299 | db = DB('sqlite:///') |
---|
| 300 | lt = LoginTicket('LT-123456') |
---|
| 301 | db.add(lt) |
---|
| 302 | assert check_login_ticket(db, None) is False |
---|
| 303 | assert check_login_ticket(db, 'LT-123456') is True |
---|
| 304 | # the ticket will be removed after check |
---|
| 305 | assert check_login_ticket(db, 'LT-123456') is False |
---|
| 306 | assert check_login_ticket(db, 'LT-654321') is False |
---|
| 307 | |
---|
| 308 | def test_set_session_cookie(self): |
---|
| 309 | # make sure we can add session cookies to responses |
---|
| 310 | db = DB('sqlite:///') |
---|
[10405] | 311 | resp = set_session_cookie(db, Response()) |
---|
[10394] | 312 | assert 'Set-Cookie' in resp.headers |
---|
| 313 | cookie = resp.headers['Set-Cookie'] |
---|
| 314 | assert RE_COOKIE.match(cookie), ( |
---|
| 315 | 'Cookie in unexpected format: %s' % cookie) |
---|
| 316 | # the cookie is stored in database |
---|
| 317 | value = cookie.split('=')[1].split(';')[0] |
---|
| 318 | q = db.query(TicketGrantingCookie).filter( |
---|
| 319 | TicketGrantingCookie.value == value) |
---|
| 320 | assert len(list(q)) == 1 |
---|
[10398] | 321 | |
---|
| 322 | def test_check_session_cookie(self): |
---|
| 323 | db = DB('sqlite:///') |
---|
| 324 | tgc = create_tgc_value() |
---|
| 325 | db.add(tgc) |
---|
| 326 | value = tgc.value |
---|
| 327 | assert check_session_cookie(db, value) == tgc |
---|
| 328 | assert check_session_cookie(db, 'foo') is None |
---|
| 329 | assert check_session_cookie(db, b'foo') is None |
---|
[10402] | 330 | assert check_session_cookie(db, None) is None |
---|
[10398] | 331 | value2 = value.encode('utf-8') |
---|
| 332 | assert check_session_cookie(db, value2) == tgc |
---|
[10404] | 333 | |
---|
| 334 | def test_get_template(self): |
---|
| 335 | # we can load templates |
---|
| 336 | assert get_template('not-existing-template') is None |
---|
| 337 | assert get_template('login.html') is not None |
---|