[10321] | 1 | import os |
---|
[10394] | 2 | import re |
---|
[10321] | 3 | import shutil |
---|
| 4 | import tempfile |
---|
| 5 | import unittest |
---|
| 6 | from paste.deploy import loadapp |
---|
[10494] | 7 | try: |
---|
| 8 | from urlparse import parse_qsl # Python 2.x |
---|
| 9 | except ImportError: # pragma: no cover |
---|
| 10 | from urllib.parse import parse_qsl # Python 3.x |
---|
[10394] | 11 | from webob import Request, Response |
---|
| 12 | from webtest import TestApp as WebTestApp # avoid py.test skip message |
---|
| 13 | from waeup.cas.authenticators import DummyAuthenticator |
---|
| 14 | from waeup.cas.db import DB, LoginTicket, ServiceTicket, TicketGrantingCookie |
---|
| 15 | from waeup.cas.server import ( |
---|
| 16 | CASServer, create_service_ticket, create_login_ticket, |
---|
| 17 | create_tgc_value, check_login_ticket, set_session_cookie, |
---|
[10414] | 18 | check_session_cookie, get_template, delete_session_cookie, |
---|
[10604] | 19 | check_service_ticket, update_url, set_message |
---|
[10394] | 20 | ) |
---|
[10321] | 21 | |
---|
[10394] | 22 | RE_ALPHABET = re.compile('^[a-zA-Z0-9\-]*$') |
---|
| 23 | RE_COOKIE = re.compile('^cas-tgc=[A-Za-z0-9\-]+; Path=/; secure; HttpOnly$') |
---|
[10414] | 24 | RE_COOKIE_DEL = re.compile( |
---|
| 25 | '^cas-tgc=; Max-Age=\-[0-9]+; Path=/; ' |
---|
| 26 | 'expires=Thu, 01-Jan-1970 00:00:00 GMT; secure; HttpOnly$') |
---|
[10394] | 27 | |
---|
| 28 | |
---|
[10321] | 29 | class CASServerTests(unittest.TestCase): |
---|
| 30 | |
---|
| 31 | def setUp(self): |
---|
[10348] | 32 | # Create a new location where tempfiles are created. This way |
---|
| 33 | # also temporary dirs of local CASServers can be removed on |
---|
| 34 | # tear-down. |
---|
| 35 | self._new_tmpdir = tempfile.mkdtemp() |
---|
| 36 | self._old_tmpdir = tempfile.tempdir |
---|
| 37 | tempfile.tempdir = self._new_tmpdir |
---|
| 38 | self.workdir = os.path.join(self._new_tmpdir, 'home') |
---|
| 39 | self.db_path = os.path.join(self.workdir, 'mycas.db') |
---|
[10394] | 40 | os.mkdir(self.workdir) |
---|
[10321] | 41 | self.paste_conf1 = os.path.join( |
---|
| 42 | os.path.dirname(__file__), 'sample1.ini') |
---|
[10352] | 43 | self.paste_conf2 = os.path.join( |
---|
| 44 | os.path.dirname(__file__), 'sample2.ini') |
---|
[10321] | 45 | |
---|
| 46 | def tearDown(self): |
---|
[10348] | 47 | # remove local tempfile and reset old tempdir setting |
---|
| 48 | if os.path.isdir(self._new_tmpdir): |
---|
| 49 | shutil.rmtree(self._new_tmpdir) |
---|
| 50 | tempfile.tempdir = self._old_tmpdir |
---|
[10321] | 51 | |
---|
| 52 | def test_paste_deploy_loader(self): |
---|
| 53 | # we can load the CAS server via paste.deploy plugin |
---|
| 54 | app = loadapp('config:%s' % self.paste_conf1) |
---|
| 55 | assert isinstance(app, CASServer) |
---|
[10394] | 56 | assert hasattr(app, 'db') |
---|
| 57 | assert isinstance(app.db, DB) |
---|
| 58 | assert hasattr(app, 'auth') |
---|
[10321] | 59 | |
---|
| 60 | def test_paste_deploy_options(self): |
---|
| 61 | # we can set CAS server-related options via paste.deploy config |
---|
[10352] | 62 | app = loadapp('config:%s' % self.paste_conf2) |
---|
[10321] | 63 | assert isinstance(app, CASServer) |
---|
[10394] | 64 | assert app.db_connection_string == 'sqlite:///:memory:' |
---|
| 65 | assert isinstance(app.auth, DummyAuthenticator) |
---|
[10321] | 66 | |
---|
[10351] | 67 | def test_init(self): |
---|
[10394] | 68 | # we get a `DB` instance created automatically |
---|
[10351] | 69 | app = CASServer() |
---|
[10394] | 70 | assert hasattr(app, 'db') |
---|
| 71 | assert app.db is not None |
---|
[10351] | 72 | |
---|
| 73 | def test_init_explicit_db_path(self): |
---|
| 74 | # we can set a db_path explicitly |
---|
[10394] | 75 | app = CASServer(db='sqlite:///%s' % self.db_path) |
---|
| 76 | assert hasattr(app, 'db') |
---|
| 77 | assert isinstance(app.db, DB) |
---|
| 78 | assert os.path.isfile(self.db_path) |
---|
[10351] | 79 | |
---|
[10394] | 80 | def test_get_template(self): |
---|
| 81 | app = CASServer() |
---|
| 82 | assert app._get_template('login.html') is not None |
---|
| 83 | assert app._get_template('not-existent.html') is None |
---|
[10606] | 84 | # parts of header and footer are replaced |
---|
| 85 | assert 'PART_HEADER' not in app._get_template('login.html') |
---|
| 86 | assert 'PART_FOOTER' not in app._get_template('login.html') |
---|
[10394] | 87 | |
---|
[10321] | 88 | def test_call_root(self): |
---|
[10326] | 89 | # the CAS protocol requires no root |
---|
[10321] | 90 | app = CASServer() |
---|
| 91 | req = Request.blank('http://localhost/') |
---|
| 92 | resp = app(req) |
---|
[10322] | 93 | assert resp.status == '404 Not Found' |
---|
[10326] | 94 | |
---|
| 95 | def test_first_time_login(self): |
---|
| 96 | # we can get a login page |
---|
| 97 | app = CASServer() |
---|
| 98 | req = Request.blank('http://localhost/login') |
---|
| 99 | resp = app(req) |
---|
| 100 | assert resp.status == '200 OK' |
---|
| 101 | |
---|
| 102 | def test_validate(self): |
---|
| 103 | # we can access a validation page |
---|
| 104 | app = CASServer() |
---|
[10416] | 105 | req = Request.blank('http://localhost/validate?service=foo&ticket=bar') |
---|
[10326] | 106 | resp = app(req) |
---|
[10416] | 107 | assert resp.status == '200 OK' |
---|
[10326] | 108 | |
---|
| 109 | def test_logout(self): |
---|
| 110 | # we can access a logout page |
---|
| 111 | app = CASServer() |
---|
| 112 | req = Request.blank('http://localhost/logout') |
---|
| 113 | resp = app(req) |
---|
[10415] | 114 | assert resp.status == '200 OK' |
---|
[10335] | 115 | |
---|
| 116 | def test_login_simple(self): |
---|
| 117 | # a simple login with no service will result in login screen |
---|
| 118 | # (2.1.1#service of protocol specs) |
---|
| 119 | app = CASServer() |
---|
| 120 | req = Request.blank('http://localhost/login') |
---|
| 121 | resp = app(req) |
---|
| 122 | assert resp.status == '200 OK' |
---|
| 123 | assert resp.content_type == 'text/html' |
---|
| 124 | assert b'<form ' in resp.body |
---|
[10394] | 125 | |
---|
[10401] | 126 | def test_login_cred_acceptor_sso_no_service(self): |
---|
| 127 | # 2.2.4: successful login via single sign on |
---|
| 128 | app = CASServer() |
---|
| 129 | tgc = create_tgc_value() |
---|
| 130 | app.db.add(tgc) |
---|
| 131 | value = str(tgc.value) |
---|
| 132 | req = Request.blank('https://localhost/login') |
---|
| 133 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 134 | resp = app(req) |
---|
| 135 | assert resp.status == '200 OK' |
---|
| 136 | assert b'already' in resp.body |
---|
| 137 | assert 'Set-Cookie' not in resp.headers |
---|
| 138 | return |
---|
[10394] | 139 | |
---|
[10411] | 140 | def test_login_renew_with_cookie(self): |
---|
| 141 | # 2.1.1: cookie will be ignored when renew is set |
---|
| 142 | app = CASServer() |
---|
| 143 | tgc = create_tgc_value() |
---|
| 144 | app.db.add(tgc) |
---|
| 145 | value = str(tgc.value) |
---|
| 146 | req = Request.blank('https://localhost/login?renew=true') |
---|
| 147 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 148 | resp = app(req) |
---|
| 149 | assert resp.status == '200 OK' |
---|
| 150 | assert b'username' in resp.body |
---|
| 151 | assert 'Set-Cookie' not in resp.headers |
---|
[10401] | 152 | |
---|
[10412] | 153 | def test_login_renew_without_cookie(self): |
---|
| 154 | # 2.1.1: with renew and no cookie, normal auth will happen |
---|
| 155 | app = CASServer() |
---|
| 156 | req = Request.blank('https://localhost/login?renew=true') |
---|
| 157 | resp = app(req) |
---|
| 158 | assert resp.status == '200 OK' |
---|
| 159 | assert b'username' in resp.body |
---|
| 160 | |
---|
[10411] | 161 | def test_login_renew_as_empty_string(self): |
---|
| 162 | # `renew` is handled correctly, even with empty value |
---|
| 163 | app = CASServer() |
---|
| 164 | tgc = create_tgc_value() |
---|
| 165 | app.db.add(tgc) |
---|
| 166 | value = str(tgc.value) |
---|
| 167 | req = Request.blank('https://localhost/login?renew') |
---|
| 168 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 169 | resp = app(req) |
---|
| 170 | assert resp.status == '200 OK' |
---|
| 171 | assert b'username' in resp.body |
---|
| 172 | assert 'Set-Cookie' not in resp.headers |
---|
| 173 | |
---|
[10412] | 174 | def test_login_gateway_no_cookie_with_service(self): |
---|
| 175 | # 2.1.1: with gateway but w/o cookie we will be redirected to service |
---|
| 176 | # no service ticket will be issued |
---|
| 177 | app = CASServer() |
---|
| 178 | params = 'gateway=true&service=http%3A%2F%2Fwww.service.com' |
---|
| 179 | req = Request.blank('https://localhost/login?%s' % params) |
---|
| 180 | resp = app(req) |
---|
| 181 | assert resp.status == '303 See Other' |
---|
| 182 | assert 'Location' in resp.headers |
---|
| 183 | assert resp.headers['Location'] == 'http://www.service.com' |
---|
[10411] | 184 | |
---|
[10412] | 185 | def test_login_gateway_with_cookie_and_service(self): |
---|
| 186 | # 2.1.1: with cookie and gateway we will be redirected to service |
---|
| 187 | app = CASServer() |
---|
| 188 | tgc = create_tgc_value() |
---|
| 189 | app.db.add(tgc) |
---|
| 190 | value = str(tgc.value) |
---|
| 191 | params = 'gateway=true&service=http%3A%2F%2Fwww.service.com' |
---|
| 192 | req = Request.blank('https://localhost/login?%s' % params) |
---|
| 193 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 194 | resp = app(req) |
---|
| 195 | assert resp.status == '303 See Other' |
---|
| 196 | assert 'Location' in resp.headers |
---|
| 197 | assert resp.headers['Location'].startswith( |
---|
| 198 | 'http://www.service.com?ticket=ST-') |
---|
| 199 | |
---|
| 200 | def test_login_gateway_and_renew(self): |
---|
| 201 | # 2.1.1 if both, gateway and renew are specified, only renew is valid |
---|
| 202 | app = CASServer() |
---|
| 203 | tgc = create_tgc_value() |
---|
| 204 | app.db.add(tgc) |
---|
| 205 | value = str(tgc.value) |
---|
| 206 | req = Request.blank('https://localhost/login?renew=true&gateway=true') |
---|
| 207 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 208 | resp = app(req) |
---|
| 209 | # with only gateway true, this would lead to a redirect |
---|
| 210 | assert resp.status == '200 OK' |
---|
| 211 | assert b'username' in resp.body |
---|
| 212 | assert 'Set-Cookie' not in resp.headers |
---|
| 213 | |
---|
[10413] | 214 | def test_login_warn(self): |
---|
| 215 | # 2.2.1 as a credential acceptor, with `warn` set we require confirm |
---|
| 216 | app = CASServer() |
---|
| 217 | tgc = create_tgc_value() |
---|
| 218 | app.db.add(tgc) |
---|
| 219 | value = str(tgc.value) |
---|
| 220 | params = 'warn=true&service=http%3A%2F%2Fwww.service.com' |
---|
| 221 | req = Request.blank('https://localhost/login?%s' % params) |
---|
| 222 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 223 | resp = app(req) |
---|
| 224 | # without warn, we would get a redirect |
---|
| 225 | assert resp.status == '200 OK' |
---|
[10612] | 226 | assert b'Logged in' in resp.body |
---|
[10412] | 227 | |
---|
[10415] | 228 | def test_logout_no_cookie(self): |
---|
| 229 | # 2.3 logout displays a logout page. |
---|
| 230 | app = CASServer() |
---|
| 231 | req = Request.blank('https://localhost/logout') |
---|
| 232 | resp = app(req) |
---|
| 233 | assert resp.status == '200 OK' |
---|
| 234 | assert b'logged out' in resp.body |
---|
[10413] | 235 | |
---|
[10415] | 236 | def test_logout_with_cookie(self): |
---|
| 237 | # 2.3 logout destroys any existing SSO session |
---|
| 238 | app = CASServer() |
---|
| 239 | tgc = create_tgc_value() |
---|
| 240 | app.db.add(tgc) |
---|
| 241 | value = str(tgc.value) |
---|
| 242 | req = Request.blank('https://localhost/logout') |
---|
| 243 | req.headers['Cookie'] = 'cas-tgc=%s' % value |
---|
| 244 | resp = app(req) |
---|
| 245 | assert resp.status == '200 OK' |
---|
| 246 | assert b'logged out' in resp.body |
---|
| 247 | assert 'Set-Cookie' in resp.headers |
---|
| 248 | cookie = resp.headers['Set-Cookie'] |
---|
| 249 | assert cookie.startswith('cas-tgc=;') |
---|
| 250 | assert 'expires' in cookie |
---|
| 251 | assert 'Max-Age' in cookie |
---|
| 252 | assert len(list(app.db.query(TicketGrantingCookie))) == 0 |
---|
| 253 | |
---|
| 254 | def test_logout_url(self): |
---|
| 255 | # 2.3.1 with an `url` given we provide a link on logout |
---|
| 256 | app = CASServer() |
---|
| 257 | params = 'url=http%3A%2F%2Fwww.logout.com' |
---|
| 258 | req = Request.blank('https://localhost/logout?%s' % params) |
---|
| 259 | resp = app(req) |
---|
| 260 | assert resp.status == '200 OK' |
---|
| 261 | assert b'logged out' in resp.body |
---|
| 262 | assert b'like you to' in resp.body |
---|
| 263 | assert b'http://www.logout.com' in resp.body |
---|
| 264 | |
---|
[10416] | 265 | def test_validate_invalid(self): |
---|
| 266 | # 2.4.2 validation failures is indicated by a given format |
---|
| 267 | app = CASServer() |
---|
| 268 | params = 'ticket=foo&service=bar' |
---|
| 269 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
| 270 | resp = app(req) |
---|
| 271 | assert resp.body == b'no\n\n' |
---|
[10415] | 272 | |
---|
[10416] | 273 | def test_validate_valid(self): |
---|
| 274 | # 2.4 validation success is indicated by a given format |
---|
| 275 | app = CASServer() |
---|
| 276 | sticket = create_service_ticket( |
---|
| 277 | 'someuser', 'http://service.com/', sso=False) |
---|
| 278 | app.db.add(sticket) |
---|
| 279 | params = 'ticket=%s&service=%s' % ( |
---|
| 280 | sticket.ticket, sticket.service) |
---|
| 281 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
| 282 | resp = app(req) |
---|
| 283 | assert resp.body == b'yes\nsomeuser\n' |
---|
| 284 | |
---|
| 285 | def test_validate_renew_invalid(self): |
---|
| 286 | # 2.4.1 with `renew` we accept only non-sso issued tickets |
---|
| 287 | app = CASServer() |
---|
| 288 | sticket = create_service_ticket( |
---|
| 289 | 'someuser', 'http://service.com/', sso=True) |
---|
| 290 | app.db.add(sticket) |
---|
| 291 | params = 'ticket=%s&service=%s&renew=true' % ( |
---|
| 292 | sticket.ticket, sticket.service) |
---|
| 293 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
| 294 | resp = app(req) |
---|
| 295 | assert resp.body == b'no\n\n' |
---|
| 296 | |
---|
| 297 | def test_validate_renew_valid(self): |
---|
| 298 | # 2.4.1 with `renew` we accept only non-sso issued tickets |
---|
| 299 | app = CASServer() |
---|
| 300 | sticket = create_service_ticket( |
---|
| 301 | 'someuser', 'http://service.com/', sso=False) |
---|
| 302 | app.db.add(sticket) |
---|
| 303 | params = 'ticket=%s&service=%s&renew=true' % ( |
---|
| 304 | sticket.ticket, sticket.service) |
---|
| 305 | req = Request.blank('https://localhost/validate?%s' % params) |
---|
| 306 | resp = app(req) |
---|
| 307 | assert resp.body == b'yes\nsomeuser\n' |
---|
| 308 | |
---|
[10601] | 309 | def test_style_css(self): |
---|
| 310 | # we can get a custom style sheet |
---|
| 311 | app = CASServer() |
---|
| 312 | req = Request.blank('https://localhost/style.css') |
---|
| 313 | resp = app(req) |
---|
| 314 | assert resp.status == '200 OK' |
---|
| 315 | assert resp.content_type == 'text/css' |
---|
[10416] | 316 | |
---|
[10604] | 317 | def test_set_message(self): |
---|
| 318 | # we can set a message |
---|
| 319 | result = set_message('Hi there', '<body>MSG_TEXT</body>') |
---|
| 320 | assert result == '<body>Hi there</body>' |
---|
[10601] | 321 | |
---|
[10604] | 322 | def test_set_message_empty_or_none(self): |
---|
| 323 | # without a message, no message text will be displayed |
---|
| 324 | result = set_message('', '<body>MSG_TEXT</body>') |
---|
| 325 | assert result == '<body></body>' |
---|
| 326 | result = set_message('', '<body><div id="msg">MSG_TEXT</div></body>') |
---|
| 327 | assert result == '<body></body>' |
---|
| 328 | result = set_message(None, '<body><div id="msg">MSG_TEXT</div></body>') |
---|
| 329 | assert result == '<body></body>' |
---|
| 330 | result = set_message( |
---|
| 331 | None, |
---|
| 332 | '<body><div a="1">\n<div id="msg">MSG_TEXT</div>a\n</div></body>') |
---|
| 333 | assert result == '<body><div a="1">\na\n</div></body>' |
---|
| 334 | result = set_message( |
---|
| 335 | None, |
---|
| 336 | '<body><div a="1"><div id="msg">MSG_TEXT</div>a</div></body>') |
---|
| 337 | assert result == '<body><div a="1">a</div></body>' |
---|
| 338 | |
---|
| 339 | |
---|
[10394] | 340 | class BrowserTests(unittest.TestCase): |
---|
| 341 | |
---|
| 342 | def setUp(self): |
---|
| 343 | self.raw_app = CASServer(auth=DummyAuthenticator()) |
---|
| 344 | self.app = WebTestApp(self.raw_app) |
---|
| 345 | |
---|
| 346 | def test_login(self): |
---|
| 347 | resp = self.app.get('/login') |
---|
| 348 | assert resp.status == '200 OK' |
---|
| 349 | form = resp.forms[0] |
---|
| 350 | # 2.1.3: form must be submitted by POST |
---|
| 351 | assert form.method == 'post' |
---|
| 352 | fieldnames = form.fields.keys() |
---|
| 353 | # 2.1.3: form must contain: username, password, lt |
---|
| 354 | assert 'username' in fieldnames |
---|
| 355 | assert 'password' in fieldnames |
---|
| 356 | assert 'lt' in fieldnames |
---|
| 357 | assert RE_ALPHABET.match(form['lt'].value) |
---|
| 358 | |
---|
| 359 | def test_login_no_service(self): |
---|
| 360 | # w/o a service passed in, the form should not contain service |
---|
| 361 | # (not a strict protocol requirement, but handy) |
---|
| 362 | resp = self.app.get('/login') |
---|
| 363 | assert 'service' not in resp.forms[0].fields.keys() |
---|
| 364 | |
---|
| 365 | def test_login_service_replayed(self): |
---|
| 366 | # 2.1.3: the login form must contain the service param sent |
---|
| 367 | resp = self.app.get('/login?service=http%3A%2F%2Fwww.service.com') |
---|
| 368 | form = resp.forms[0] |
---|
| 369 | assert resp.status == '200 OK' |
---|
| 370 | assert 'service' in form.fields.keys() |
---|
| 371 | assert form['service'].value == 'http://www.service.com' |
---|
| 372 | |
---|
| 373 | def test_login_cred_acceptor_valid_no_service(self): |
---|
| 374 | # 2.2.4: successful login w/o service yields a message |
---|
| 375 | lt = create_login_ticket() |
---|
| 376 | self.raw_app.db.add(lt) |
---|
| 377 | lt_string = lt.ticket |
---|
| 378 | resp = self.app.post('/login', dict( |
---|
| 379 | username='bird', password='bebop', lt=lt_string)) |
---|
| 380 | assert resp.status == '200 OK' |
---|
| 381 | assert b'successful' in resp.body |
---|
| 382 | # single-sign-on session initiated |
---|
| 383 | assert 'Set-Cookie' in resp.headers |
---|
| 384 | cookie = resp.headers['Set-Cookie'] |
---|
| 385 | assert cookie.startswith('cas-tgc=') |
---|
| 386 | |
---|
| 387 | def test_login_cred_acceptor_valid_w_service(self): |
---|
| 388 | # 2.2.4: successful login with service makes a redirect |
---|
| 389 | # Appendix B: safe redirect |
---|
| 390 | lt = create_login_ticket() |
---|
| 391 | self.raw_app.db.add(lt) |
---|
| 392 | lt_string = lt.ticket |
---|
| 393 | resp = self.app.post('/login', dict( |
---|
| 394 | username='bird', password='bebop', lt=lt_string, |
---|
| 395 | service='http://example.com/Login')) |
---|
| 396 | assert resp.status == '303 See Other' |
---|
| 397 | assert 'Location' in resp.headers |
---|
| 398 | assert resp.headers['Location'].startswith( |
---|
| 399 | 'http://example.com/Login?ticket=ST-') |
---|
| 400 | assert 'Pragma' in resp.headers |
---|
| 401 | assert resp.headers['Pragma'] == 'no-cache' |
---|
| 402 | assert 'Cache-Control' in resp.headers |
---|
| 403 | assert resp.headers['Cache-Control'] == 'no-store' |
---|
| 404 | assert 'Expires' in resp.headers |
---|
| 405 | assert resp.headers['Expires'] == 'Thu, 01 Dec 1994 16:00:00 GMT' |
---|
| 406 | assert b'window.location.href' in resp.body |
---|
| 407 | assert b'noscript' in resp.body |
---|
| 408 | assert b'ticket=ST-' in resp.body |
---|
[10498] | 409 | q = self.raw_app.db.query(ServiceTicket) |
---|
| 410 | st = q.all()[0] |
---|
| 411 | assert st.user == 'bird' |
---|
| 412 | assert st.service == 'http://example.com/Login' |
---|
| 413 | assert st.ticket.startswith('ST-') |
---|
[10394] | 414 | |
---|
[10397] | 415 | def test_login_cred_acceptor_failed(self): |
---|
| 416 | # 2.2.4: failed login yields a message |
---|
| 417 | lt = create_login_ticket() |
---|
| 418 | self.raw_app.db.add(lt) |
---|
| 419 | lt_string = lt.ticket |
---|
| 420 | resp = self.app.post('/login', dict( |
---|
| 421 | username='bird', password='cat', lt=lt_string)) |
---|
| 422 | assert resp.status == '200 OK' |
---|
| 423 | assert b'failed' in resp.body |
---|
[10394] | 424 | |
---|
[10401] | 425 | def test_login_sso_no_service(self): |
---|
| 426 | # we can initiate single-sign-on without service |
---|
| 427 | resp1 = self.app.get('https://localhost/login') # HTTPS required! |
---|
| 428 | assert resp1.status == '200 OK' |
---|
| 429 | assert 'cas-tgc' not in self.app.cookies |
---|
| 430 | form = resp1.forms[0] |
---|
| 431 | form.set('username', 'bird') |
---|
| 432 | form.set('password', 'bebop') |
---|
| 433 | resp2 = form.submit('AUTHENTICATE') |
---|
| 434 | assert resp2.status == '200 OK' |
---|
| 435 | # we got a secure cookie |
---|
| 436 | assert 'cas-tgc' in self.app.cookies |
---|
| 437 | # when we get the login page again, the cookie will replace creds. |
---|
| 438 | resp3 = self.app.get('https://localhost/login') |
---|
| 439 | assert b'You logged in already' in resp3.body |
---|
[10397] | 440 | |
---|
[10409] | 441 | def test_login_sso_with_service(self): |
---|
| 442 | resp1 = self.app.get( |
---|
[10410] | 443 | 'https://localhost/login?service=http%3A%2F%2Fservice.com%2F') |
---|
[10409] | 444 | assert resp1.status == '200 OK' |
---|
| 445 | assert 'cas-tgc' not in self.app.cookies |
---|
| 446 | form = resp1.forms[0] |
---|
| 447 | form.set('username', 'bird') |
---|
| 448 | form.set('password', 'bebop') |
---|
| 449 | resp2 = form.submit('AUTHENTICATE') |
---|
| 450 | assert resp2.status == '303 See Other' |
---|
| 451 | assert resp2.headers['Location'].startswith( |
---|
| 452 | 'http://service.com/?ticket=ST-') |
---|
| 453 | # we got a secure cookie |
---|
| 454 | assert 'cas-tgc' in self.app.cookies |
---|
| 455 | resp3 = self.app.get( |
---|
[10410] | 456 | 'https://localhost/login?service=http%3A%2F%2Fservice.com%2F') |
---|
[10409] | 457 | assert resp3.status == '303 See Other' |
---|
| 458 | assert resp3.headers['Location'].startswith( |
---|
| 459 | 'http://service.com/?ticket=ST-') |
---|
[10401] | 460 | |
---|
[10490] | 461 | def test_login_sso_with_service_additional_params1(self): |
---|
| 462 | # we can get a service ticket also with a service providing |
---|
| 463 | # get params |
---|
| 464 | # this service url reads http://service.com/index.php?authCAS=CAS |
---|
| 465 | service_url = 'http%3A%2F%2Fservice.com%2Findex.php%3FauthCAS%3DCAS' |
---|
| 466 | resp1 = self.app.get( |
---|
| 467 | 'https://localhost/login?service=%s' % service_url) |
---|
| 468 | assert resp1.status == '200 OK' |
---|
| 469 | assert 'cas-tgc' not in self.app.cookies |
---|
| 470 | form = resp1.forms[0] |
---|
| 471 | form.set('username', 'bird') |
---|
| 472 | form.set('password', 'bebop') |
---|
| 473 | resp2 = form.submit('AUTHENTICATE') |
---|
| 474 | assert resp2.status == '303 See Other' |
---|
| 475 | location = resp2.headers['Location'] |
---|
[10492] | 476 | query_string = location.split('?', 1)[1] |
---|
| 477 | query_params = dict(parse_qsl(query_string)) |
---|
| 478 | assert 'authCAS' in query_params.keys() |
---|
| 479 | assert 'ticket' in query_params.keys() |
---|
| 480 | assert len(query_params['ticket']) == 32 |
---|
[10409] | 481 | |
---|
[10490] | 482 | |
---|
[10394] | 483 | class CASServerHelperTests(unittest.TestCase): |
---|
| 484 | |
---|
| 485 | def setUp(self): |
---|
| 486 | self.workdir = tempfile.mkdtemp() |
---|
| 487 | self.db_file = os.path.join(self.workdir, 'mycas.db') |
---|
| 488 | self.conn_string = 'sqlite:///%s' % self.db_file |
---|
| 489 | self.db = DB(self.conn_string) |
---|
| 490 | |
---|
| 491 | def tearDown(self): |
---|
| 492 | shutil.rmtree(self.workdir) |
---|
| 493 | |
---|
| 494 | def test_create_service_ticket(self): |
---|
| 495 | # we can create service tickets |
---|
| 496 | st = create_service_ticket( |
---|
| 497 | user='bob', service='http://www.example.com') |
---|
| 498 | assert isinstance(st, ServiceTicket) |
---|
| 499 | # 3.1.1: service not part of ticket |
---|
| 500 | assert 'example.com' not in st.ticket |
---|
| 501 | # 3.1.1: ticket must start with 'ST-' |
---|
| 502 | assert st.ticket.startswith('ST-') |
---|
| 503 | # 3.1.1: min. ticket length clients must be able to process is 32 |
---|
| 504 | assert len(st.ticket) < 33 |
---|
| 505 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
| 506 | assert RE_ALPHABET.match(st.ticket), ( |
---|
| 507 | 'Ticket contains forbidden chars: %s' % st) |
---|
[10498] | 508 | assert st.service == 'http://www.example.com' |
---|
| 509 | assert st.user == 'bob' |
---|
[10394] | 510 | |
---|
| 511 | def test_create_login_ticket(self): |
---|
| 512 | # we can create login tickets |
---|
| 513 | lt = create_login_ticket() |
---|
| 514 | # 3.5.1: ticket should start with 'LT-' |
---|
| 515 | assert lt.ticket.startswith('LT-') |
---|
| 516 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
| 517 | assert RE_ALPHABET.match(lt.ticket), ( |
---|
| 518 | 'Ticket contains forbidden chars: %s' % lt) |
---|
| 519 | |
---|
| 520 | def test_create_login_ticket_unique(self): |
---|
| 521 | # 3.5.1: login tickets are unique (although not hard to guess) |
---|
| 522 | ticket_num = 1000 # increase to test more thoroughly |
---|
| 523 | lt_list = [create_login_ticket() for x in range(ticket_num)] |
---|
| 524 | assert len(set(lt_list)) == ticket_num |
---|
| 525 | |
---|
| 526 | def test_create_tgc_value(self): |
---|
| 527 | # we can create ticket granting cookies |
---|
| 528 | tgc = create_tgc_value() |
---|
| 529 | assert isinstance(tgc, TicketGrantingCookie) |
---|
| 530 | # 3.6.1: cookie value should start with 'TGC-' |
---|
| 531 | assert tgc.value.startswith('TGC-') |
---|
| 532 | # 3.7: allowed character set == [a-zA-Z0-9\-] |
---|
| 533 | assert RE_ALPHABET.match(tgc.value), ( |
---|
| 534 | 'Cookie value contains forbidden chars: %s' % tgc) |
---|
| 535 | |
---|
| 536 | def test_check_login_ticket(self): |
---|
| 537 | db = DB('sqlite:///') |
---|
| 538 | lt = LoginTicket('LT-123456') |
---|
| 539 | db.add(lt) |
---|
| 540 | assert check_login_ticket(db, None) is False |
---|
| 541 | assert check_login_ticket(db, 'LT-123456') is True |
---|
| 542 | # the ticket will be removed after check |
---|
| 543 | assert check_login_ticket(db, 'LT-123456') is False |
---|
| 544 | assert check_login_ticket(db, 'LT-654321') is False |
---|
| 545 | |
---|
[10490] | 546 | def test_set_session_cookie1(self): |
---|
[10394] | 547 | # make sure we can add session cookies to responses |
---|
| 548 | db = DB('sqlite:///') |
---|
[10405] | 549 | resp = set_session_cookie(db, Response()) |
---|
[10394] | 550 | assert 'Set-Cookie' in resp.headers |
---|
| 551 | cookie = resp.headers['Set-Cookie'] |
---|
| 552 | assert RE_COOKIE.match(cookie), ( |
---|
| 553 | 'Cookie in unexpected format: %s' % cookie) |
---|
| 554 | # the cookie is stored in database |
---|
| 555 | value = cookie.split('=')[1].split(';')[0] |
---|
| 556 | q = db.query(TicketGrantingCookie).filter( |
---|
| 557 | TicketGrantingCookie.value == value) |
---|
| 558 | assert len(list(q)) == 1 |
---|
[10398] | 559 | |
---|
[10490] | 560 | def test_check_session_cookie2(self): |
---|
[10398] | 561 | db = DB('sqlite:///') |
---|
| 562 | tgc = create_tgc_value() |
---|
| 563 | db.add(tgc) |
---|
| 564 | value = tgc.value |
---|
| 565 | assert check_session_cookie(db, value) == tgc |
---|
| 566 | assert check_session_cookie(db, 'foo') is None |
---|
| 567 | assert check_session_cookie(db, b'foo') is None |
---|
[10402] | 568 | assert check_session_cookie(db, None) is None |
---|
[10398] | 569 | value2 = value.encode('utf-8') |
---|
| 570 | assert check_session_cookie(db, value2) == tgc |
---|
[10404] | 571 | |
---|
| 572 | def test_get_template(self): |
---|
| 573 | # we can load templates |
---|
| 574 | assert get_template('not-existing-template') is None |
---|
| 575 | assert get_template('login.html') is not None |
---|
[10606] | 576 | # parts of header and footer are replaced |
---|
| 577 | assert 'PART_HEADER' not in get_template('login.html') |
---|
| 578 | assert 'PART_FOOTER' not in get_template('login.html') |
---|
[10414] | 579 | |
---|
| 580 | def test_delete_session_cookie(self): |
---|
| 581 | # we can unset cookies |
---|
| 582 | db = DB('sqlite:///') |
---|
| 583 | tgc = create_tgc_value() |
---|
| 584 | db.add(tgc) |
---|
| 585 | value = tgc.value |
---|
| 586 | resp = delete_session_cookie(db, Response(), old_value=value) |
---|
| 587 | assert 'Set-Cookie' in resp.headers |
---|
| 588 | cookie = resp.headers['Set-Cookie'] |
---|
| 589 | assert RE_COOKIE_DEL.match(cookie), ( |
---|
| 590 | 'Cookie in unexpected format: %s' % cookie) |
---|
| 591 | # the cookie values was deleted from database |
---|
| 592 | q = db.query(TicketGrantingCookie).filter( |
---|
| 593 | TicketGrantingCookie.value == value) |
---|
| 594 | assert len(list(q)) == 0 |
---|
[10416] | 595 | |
---|
| 596 | def test_check_service_ticket(self): |
---|
| 597 | db = DB('sqlite:///') |
---|
| 598 | st = ServiceTicket( |
---|
| 599 | 'ST-123456', 'someuser', 'http://myservice.com', True) |
---|
| 600 | db.add(st) |
---|
| 601 | assert check_service_ticket(db, None, 'foo') is None |
---|
| 602 | assert check_service_ticket(db, 'foo', None) is None |
---|
| 603 | assert check_service_ticket(db, 'ST-123456', 'foo') is None |
---|
| 604 | assert check_service_ticket(db, 'foo', 'http://myservice.com') is None |
---|
| 605 | result = check_service_ticket(db, 'ST-123456', 'http://myservice.com') |
---|
| 606 | assert isinstance(result, ServiceTicket) |
---|
| 607 | assert result.user == 'someuser' |
---|
| 608 | assert check_service_ticket( |
---|
| 609 | db, 'ST-123456', 'http://myservice.com', True) is None |
---|
| 610 | assert check_service_ticket( |
---|
| 611 | db, 'ST-123456', 'http://myservice.com', False) is not None |
---|
[10491] | 612 | |
---|
| 613 | def test_update_url(self): |
---|
| 614 | # we can create valid new urls with query string params updated |
---|
| 615 | result1 = update_url('http://sample.com/index?a=1&b=2', dict(b='3')) |
---|
[10494] | 616 | assert result1 in ( |
---|
| 617 | 'http://sample.com/index?a=1&b=3', |
---|
| 618 | 'http://sample.com/index?b=3&a=1') |
---|
[10491] | 619 | result2 = update_url('http://sample.com/index?b=2', dict(b='3')) |
---|
| 620 | assert result2 == 'http://sample.com/index?b=3' |
---|
| 621 | result3 = update_url('http://sample.com/index', dict(b='3')) |
---|
| 622 | assert result3 == 'http://sample.com/index?b=3' |
---|
| 623 | result4 = update_url('http://sample.com/index?a=2', dict(b='3')) |
---|
[10494] | 624 | assert result4 in ( |
---|
| 625 | 'http://sample.com/index?a=2&b=3', |
---|
| 626 | 'http://sample.com/index?b=3&a=2') |
---|