Changeset 10394 for main/waeup.cas/trunk/waeup/cas/tests
- Timestamp:
- 4 Jul 2013, 09:04:41 (12 years ago)
- Location:
- main/waeup.cas/trunk/waeup/cas/tests
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.cas/trunk/waeup/cas/tests/sample2.ini
r10352 r10394 3 3 [app:main] 4 4 use = egg:waeup.cas#server 5 # what file to use to store tickets, credentials, etc. 6 db_path = /a/path/to/cas.db 5 # SQLAlchemy connection string 6 # By default we get an in-memory SQLite database 7 # Samples: 8 # sqlite in-memory db: sqlite://:memory: 9 # sqlite file-based db: sqlite:////path/to/db 10 db = sqlite:///:memory: 11 auth = dummy -
main/waeup.cas/trunk/waeup/cas/tests/test_db.py
r10349 r10394 5 5 import tempfile 6 6 import unittest 7 from waeup.cas.db import create_db 7 from sqlalchemy.engine import Engine 8 from waeup.cas.db import ( 9 DB, LoginTicket, ServiceTicket, TicketGrantingCookie) 8 10 9 11 … … 17 19 def test_create_db(self): 18 20 # we can create a database 19 db_path = os.path.join(self.workdir, 'sample-cas.db') 20 create_db(db_path) 21 assert os.path.isfile(db_path) 22 conn = sqlite3.connect(db_path) 23 assert conn is not None 24 conn.close() 21 conn_string = 'sqlite:///' 22 db = DB(conn_string) 23 assert hasattr(db, 'engine') 24 assert isinstance(db.engine, Engine) 25 25 26 26 def test_create_db_exists(self): 27 27 # an already existing db will be left untouched 28 28 db_path = os.path.join(self.workdir, 'sample-cas.db') 29 create_db(db_path) 29 conn_string = 'sqlite:///%s' % db_path 30 assert os.path.isdir(self.workdir) 31 DB(conn_string) # create database 30 32 conn = sqlite3.connect(db_path) 31 cursor = conn.cursor() 32 cursor.execute('''CREATE TABLE mytest (name text, forname text)''') 33 cursor.execute('''INSERT INTO mytest VALUES ("Foo", "Bar")''') 34 conn.commit() 33 with conn: 34 conn.execute('''CREATE TABLE mytest (name text, forname text)''') 35 conn.execute('''INSERT INTO mytest VALUES ("Foo", "Bar")''') 35 36 conn.close() 36 37 37 create_db(db_path)38 38 conn = sqlite3.connect(db_path) 39 cursor = conn.cursor() 40 cursor.execute('''SELECT * FROM mytest''') 41 assert cursor.fetchall() == [('Foo', 'Bar')] 39 with conn: 40 result = [x for x in conn.execute('''SELECT * FROM mytest''')] 41 conn.close() 42 assert result == [('Foo', 'Bar')] 43 44 def test_create_db_populates_db(self): 45 # created DBs will be populated with the required tables 46 db_path = os.path.join(self.workdir, 'sample-cas.db') 47 conn_string = 'sqlite:///%s' % db_path 48 DB(conn_string) # creates database 49 conn = sqlite3.connect(db_path) 50 with conn: 51 result = [x[:2] for x in conn.execute( 52 '''SELECT * FROM sqlite_master ORDER BY type''')] 53 conn.close() 54 assert ('table', 'service_tickets') in result 55 assert ('table', 'login_tickets') in result 56 57 58 class TicketTests(unittest.TestCase): 59 60 def setUp(self): 61 self.db = DB('sqlite:///') 62 self.db.session() 63 64 def tearDown(self): 65 self.db.session.remove() 66 67 def test_login_ticket_add(self): 68 # we can add login tickets 69 assert self.db.engine.has_table('login_tickets') 70 71 self.db.add(LoginTicket('foo')) 72 assert [x.ticket for x in self.db.query(LoginTicket)] == ['foo'] 73 74 def test_login_ticket_delete(self): 75 # we can delete single login tickets 76 assert self.db.engine.has_table('login_tickets') 77 78 self.db.add(LoginTicket('foo')) 79 contents = [x for x in self.db.query(LoginTicket)] 80 assert len(contents) == 1 81 lt = contents[0] 82 self.db.delete(lt) 83 assert [x.ticket for x in self.db.query(LoginTicket)] == [] 84 85 def test_login_ticket_repr(self): 86 # we can get a proper LoginTicket representation 87 # (i.e. one that can be fed to `eval`) 88 ticket = LoginTicket('foo', 12.1) 89 assert ticket.__repr__() == "LoginTicket('foo', 12.1)" 90 91 def test_login_ticket_timestamp(self): 92 # we get a timestamp stored, if none is passed to init 93 lticket = LoginTicket('foo') 94 assert isinstance(lticket.ts, float) 95 96 def test_add_service_ticket(self): 97 # we can add service tickets 98 self.db.add(ServiceTicket('foo', 'bar', 'baz', 12.1)) 99 result = [(x.ticket, x.user, x.service, x.ts) 100 for x in self.db.query(ServiceTicket)] 101 assert result == [('foo', 'bar', 'baz', 12.1)] 102 103 def test_service_ticket_repr(self): 104 # we can get a proper ServiceTicket representation 105 # (i.e. one that can be fed to `eval`) 106 sticket = ServiceTicket('foo', 'bar', 'baz', 12.1) 107 st_repr = sticket.__repr__() 108 assert st_repr == "ServiceTicket('foo', 'bar', 'baz', 12.1)" 109 110 def test_service_ticket_timestamp(self): 111 # we get a timestamp stored, if none is passed to init 112 sticket = ServiceTicket('foo', 'bar', 'baz') 113 assert isinstance(sticket.ts, float) 114 115 def test_ticket_granting_cookie_add(self): 116 # we can add ticket granting cookies 117 assert self.db.engine.has_table('ticket_granting_cookies') 118 119 self.db.add(TicketGrantingCookie('foo')) 120 assert [x.value for x in self.db.query( 121 TicketGrantingCookie)] == ['foo'] 122 123 def test_ticket_granting_cookie_repr(self): 124 # we can get a proper ticket-granting cookie representation 125 # (i.e. one that can be fed to `eval`) 126 tgc = TicketGrantingCookie('foo', 12.1) 127 assert tgc.__repr__() == "TicketGrantingCookie('foo', 12.1)" 128 129 def test_ticket_granting_cookie_timestamp(self): 130 # we get a timestamp stored, if none is passed to init 131 tgc = TicketGrantingCookie('foo') 132 assert isinstance(tgc.ts, float) -
main/waeup.cas/trunk/waeup/cas/tests/test_server.py
r10352 r10394 1 1 import os 2 import re 2 3 import shutil 3 4 import tempfile 4 5 import unittest 5 6 from paste.deploy import loadapp 6 from webob import Request 7 from waeup.cas.server import CASServer 7 from webob import Request, Response 8 from webtest import TestApp as WebTestApp # avoid py.test skip message 9 from waeup.cas.authenticators import DummyAuthenticator 10 from waeup.cas.db import DB, LoginTicket, ServiceTicket, TicketGrantingCookie 11 from waeup.cas.server import ( 12 CASServer, create_service_ticket, create_login_ticket, 13 create_tgc_value, check_login_ticket, set_session_cookie, 14 ) 15 16 17 RE_ALPHABET = re.compile('^[a-zA-Z0-9\-]*$') 18 RE_COOKIE = re.compile('^cas-tgc=[A-Za-z0-9\-]+; Path=/; secure; HttpOnly$') 8 19 9 20 … … 19 30 self.workdir = os.path.join(self._new_tmpdir, 'home') 20 31 self.db_path = os.path.join(self.workdir, 'mycas.db') 32 os.mkdir(self.workdir) 21 33 self.paste_conf1 = os.path.join( 22 34 os.path.dirname(__file__), 'sample1.ini') … … 34 46 app = loadapp('config:%s' % self.paste_conf1) 35 47 assert isinstance(app, CASServer) 36 assert hasattr(app, 'db_path') 37 assert app.db_path is not None 48 assert hasattr(app, 'db') 49 assert isinstance(app.db, DB) 50 assert hasattr(app, 'auth') 38 51 39 52 def test_paste_deploy_options(self): … … 41 54 app = loadapp('config:%s' % self.paste_conf2) 42 55 assert isinstance(app, CASServer) 43 assert app.db_path == '/a/path/to/cas.db' 56 assert app.db_connection_string == 'sqlite:///:memory:' 57 assert isinstance(app.auth, DummyAuthenticator) 44 58 45 59 def test_init(self): 46 # we get a db dir set automatically 47 app = CASServer() 48 assert hasattr(app, 'db_path') 49 assert app.db_path is not None 50 assert os.path.exists(os.path.dirname(app.db_path)) 60 # we get a `DB` instance created automatically 61 app = CASServer() 62 assert hasattr(app, 'db') 63 assert app.db is not None 51 64 52 65 def test_init_explicit_db_path(self): 53 66 # we can set a db_path explicitly 54 app = CASServer(db_path=self.db_path) 55 assert hasattr(app, 'db_path') 56 assert app.db_path == self.db_path 67 app = CASServer(db='sqlite:///%s' % self.db_path) 68 assert hasattr(app, 'db') 69 assert isinstance(app.db, DB) 70 assert os.path.isfile(self.db_path) 71 72 def test_get_template(self): 73 app = CASServer() 74 assert app._get_template('login.html') is not None 75 assert app._get_template('not-existent.html') is None 57 76 58 77 def test_call_root(self): … … 93 112 assert resp.content_type == 'text/html' 94 113 assert b'<form ' in resp.body 114 115 116 class BrowserTests(unittest.TestCase): 117 118 def setUp(self): 119 self.raw_app = CASServer(auth=DummyAuthenticator()) 120 self.app = WebTestApp(self.raw_app) 121 122 def test_login(self): 123 resp = self.app.get('/login') 124 assert resp.status == '200 OK' 125 form = resp.forms[0] 126 # 2.1.3: form must be submitted by POST 127 assert form.method == 'post' 128 fieldnames = form.fields.keys() 129 # 2.1.3: form must contain: username, password, lt 130 assert 'username' in fieldnames 131 assert 'password' in fieldnames 132 assert 'lt' in fieldnames 133 assert RE_ALPHABET.match(form['lt'].value) 134 135 def test_login_no_service(self): 136 # w/o a service passed in, the form should not contain service 137 # (not a strict protocol requirement, but handy) 138 resp = self.app.get('/login') 139 assert 'service' not in resp.forms[0].fields.keys() 140 141 def test_login_service_replayed(self): 142 # 2.1.3: the login form must contain the service param sent 143 resp = self.app.get('/login?service=http%3A%2F%2Fwww.service.com') 144 form = resp.forms[0] 145 assert resp.status == '200 OK' 146 assert 'service' in form.fields.keys() 147 assert form['service'].value == 'http://www.service.com' 148 149 def test_login_cred_acceptor_valid_no_service(self): 150 # 2.2.4: successful login w/o service yields a message 151 lt = create_login_ticket() 152 self.raw_app.db.add(lt) 153 lt_string = lt.ticket 154 resp = self.app.post('/login', dict( 155 username='bird', password='bebop', lt=lt_string)) 156 assert resp.status == '200 OK' 157 assert b'successful' in resp.body 158 # single-sign-on session initiated 159 assert 'Set-Cookie' in resp.headers 160 cookie = resp.headers['Set-Cookie'] 161 assert cookie.startswith('cas-tgc=') 162 163 def test_login_cred_acceptor_valid_w_service(self): 164 # 2.2.4: successful login with service makes a redirect 165 # Appendix B: safe redirect 166 lt = create_login_ticket() 167 self.raw_app.db.add(lt) 168 lt_string = lt.ticket 169 resp = self.app.post('/login', dict( 170 username='bird', password='bebop', lt=lt_string, 171 service='http://example.com/Login')) 172 assert resp.status == '303 See Other' 173 assert 'Location' in resp.headers 174 assert resp.headers['Location'].startswith( 175 'http://example.com/Login?ticket=ST-') 176 assert 'Pragma' in resp.headers 177 assert resp.headers['Pragma'] == 'no-cache' 178 assert 'Cache-Control' in resp.headers 179 assert resp.headers['Cache-Control'] == 'no-store' 180 assert 'Expires' in resp.headers 181 assert resp.headers['Expires'] == 'Thu, 01 Dec 1994 16:00:00 GMT' 182 assert b'window.location.href' in resp.body 183 assert b'noscript' in resp.body 184 assert b'ticket=ST-' in resp.body 185 186 187 class CASServerHelperTests(unittest.TestCase): 188 189 def setUp(self): 190 self.workdir = tempfile.mkdtemp() 191 self.db_file = os.path.join(self.workdir, 'mycas.db') 192 self.conn_string = 'sqlite:///%s' % self.db_file 193 self.db = DB(self.conn_string) 194 195 def tearDown(self): 196 shutil.rmtree(self.workdir) 197 198 def test_create_service_ticket(self): 199 # we can create service tickets 200 st = create_service_ticket( 201 user='bob', service='http://www.example.com') 202 assert isinstance(st, ServiceTicket) 203 # 3.1.1: service not part of ticket 204 assert 'example.com' not in st.ticket 205 # 3.1.1: ticket must start with 'ST-' 206 assert st.ticket.startswith('ST-') 207 # 3.1.1: min. ticket length clients must be able to process is 32 208 assert len(st.ticket) < 33 209 # 3.7: allowed character set == [a-zA-Z0-9\-] 210 assert RE_ALPHABET.match(st.ticket), ( 211 'Ticket contains forbidden chars: %s' % st) 212 213 def test_create_login_ticket(self): 214 # we can create login tickets 215 lt = create_login_ticket() 216 # 3.5.1: ticket should start with 'LT-' 217 assert lt.ticket.startswith('LT-') 218 # 3.7: allowed character set == [a-zA-Z0-9\-] 219 assert RE_ALPHABET.match(lt.ticket), ( 220 'Ticket contains forbidden chars: %s' % lt) 221 222 def test_create_login_ticket_unique(self): 223 # 3.5.1: login tickets are unique (although not hard to guess) 224 ticket_num = 1000 # increase to test more thoroughly 225 lt_list = [create_login_ticket() for x in range(ticket_num)] 226 assert len(set(lt_list)) == ticket_num 227 228 def test_create_tgc_value(self): 229 # we can create ticket granting cookies 230 tgc = create_tgc_value() 231 assert isinstance(tgc, TicketGrantingCookie) 232 # 3.6.1: cookie value should start with 'TGC-' 233 assert tgc.value.startswith('TGC-') 234 # 3.7: allowed character set == [a-zA-Z0-9\-] 235 assert RE_ALPHABET.match(tgc.value), ( 236 'Cookie value contains forbidden chars: %s' % tgc) 237 238 def test_check_login_ticket(self): 239 db = DB('sqlite:///') 240 lt = LoginTicket('LT-123456') 241 db.add(lt) 242 assert check_login_ticket(db, None) is False 243 assert check_login_ticket(db, 'LT-123456') is True 244 # the ticket will be removed after check 245 assert check_login_ticket(db, 'LT-123456') is False 246 assert check_login_ticket(db, 'LT-654321') is False 247 248 def test_set_session_cookie(self): 249 # make sure we can add session cookies to responses 250 db = DB('sqlite:///') 251 resp = set_session_cookie(Response(), db) 252 assert 'Set-Cookie' in resp.headers 253 cookie = resp.headers['Set-Cookie'] 254 assert RE_COOKIE.match(cookie), ( 255 'Cookie in unexpected format: %s' % cookie) 256 # the cookie is stored in database 257 value = cookie.split('=')[1].split(';')[0] 258 q = db.query(TicketGrantingCookie).filter( 259 TicketGrantingCookie.value == value) 260 assert len(list(q)) == 1
Note: See TracChangeset for help on using the changeset viewer.