Ignore:
Timestamp:
4 Jul 2013, 09:04:41 (12 years ago)
Author:
uli
Message:

Store work from last days.

Location:
main/waeup.cas/trunk/waeup/cas/tests
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.cas/trunk/waeup/cas/tests/sample2.ini

    r10352 r10394  
    33[app:main]
    44use = egg:waeup.cas#server
    5 # what file to use to store tickets, credentials, etc.
    6 db_path = /a/path/to/cas.db
     5# SQLAlchemy connection string
     6# By default we get an in-memory SQLite database
     7# Samples:
     8#   sqlite in-memory db:  sqlite://:memory:
     9#   sqlite file-based db: sqlite:////path/to/db
     10db = sqlite:///:memory:
     11auth = dummy
  • main/waeup.cas/trunk/waeup/cas/tests/test_db.py

    r10349 r10394  
    55import tempfile
    66import unittest
    7 from waeup.cas.db import create_db
     7from sqlalchemy.engine import Engine
     8from waeup.cas.db import (
     9    DB, LoginTicket, ServiceTicket, TicketGrantingCookie)
    810
    911
     
    1719    def test_create_db(self):
    1820        # we can create a database
    19         db_path = os.path.join(self.workdir, 'sample-cas.db')
    20         create_db(db_path)
    21         assert os.path.isfile(db_path)
    22         conn = sqlite3.connect(db_path)
    23         assert conn is not None
    24         conn.close()
     21        conn_string = 'sqlite:///'
     22        db = DB(conn_string)
     23        assert hasattr(db, 'engine')
     24        assert isinstance(db.engine, Engine)
    2525
    2626    def test_create_db_exists(self):
    2727        # an already existing db will be left untouched
    2828        db_path = os.path.join(self.workdir, 'sample-cas.db')
    29         create_db(db_path)
     29        conn_string = 'sqlite:///%s' % db_path
     30        assert os.path.isdir(self.workdir)
     31        DB(conn_string)  # create database
    3032        conn = sqlite3.connect(db_path)
    31         cursor = conn.cursor()
    32         cursor.execute('''CREATE TABLE mytest (name text, forname text)''')
    33         cursor.execute('''INSERT INTO mytest VALUES ("Foo", "Bar")''')
    34         conn.commit()
     33        with conn:
     34            conn.execute('''CREATE TABLE mytest (name text, forname text)''')
     35            conn.execute('''INSERT INTO mytest VALUES ("Foo", "Bar")''')
    3536        conn.close()
    3637
    37         create_db(db_path)
    3838        conn = sqlite3.connect(db_path)
    39         cursor = conn.cursor()
    40         cursor.execute('''SELECT * FROM mytest''')
    41         assert cursor.fetchall() == [('Foo', 'Bar')]
     39        with conn:
     40            result = [x for x in conn.execute('''SELECT * FROM mytest''')]
     41        conn.close()
     42        assert result == [('Foo', 'Bar')]
     43
     44    def test_create_db_populates_db(self):
     45        # created DBs will be populated with the required tables
     46        db_path = os.path.join(self.workdir, 'sample-cas.db')
     47        conn_string = 'sqlite:///%s' % db_path
     48        DB(conn_string)  # creates database
     49        conn = sqlite3.connect(db_path)
     50        with conn:
     51            result = [x[:2] for x in conn.execute(
     52                '''SELECT * FROM sqlite_master ORDER BY type''')]
     53        conn.close()
     54        assert ('table', 'service_tickets') in result
     55        assert ('table', 'login_tickets') in result
     56
     57
     58class TicketTests(unittest.TestCase):
     59
     60    def setUp(self):
     61        self.db = DB('sqlite:///')
     62        self.db.session()
     63
     64    def tearDown(self):
     65        self.db.session.remove()
     66
     67    def test_login_ticket_add(self):
     68        # we can add login tickets
     69        assert self.db.engine.has_table('login_tickets')
     70
     71        self.db.add(LoginTicket('foo'))
     72        assert [x.ticket for x in self.db.query(LoginTicket)] == ['foo']
     73
     74    def test_login_ticket_delete(self):
     75        # we can delete single login tickets
     76        assert self.db.engine.has_table('login_tickets')
     77
     78        self.db.add(LoginTicket('foo'))
     79        contents = [x for x in self.db.query(LoginTicket)]
     80        assert len(contents) == 1
     81        lt = contents[0]
     82        self.db.delete(lt)
     83        assert [x.ticket for x in self.db.query(LoginTicket)] == []
     84
     85    def test_login_ticket_repr(self):
     86        # we can get a proper LoginTicket representation
     87        # (i.e. one that can be fed to `eval`)
     88        ticket = LoginTicket('foo', 12.1)
     89        assert ticket.__repr__() == "LoginTicket('foo', 12.1)"
     90
     91    def test_login_ticket_timestamp(self):
     92        # we get a timestamp stored, if none is passed to init
     93        lticket = LoginTicket('foo')
     94        assert isinstance(lticket.ts, float)
     95
     96    def test_add_service_ticket(self):
     97        # we can add service tickets
     98        self.db.add(ServiceTicket('foo', 'bar', 'baz', 12.1))
     99        result = [(x.ticket, x.user, x.service, x.ts)
     100                  for x in self.db.query(ServiceTicket)]
     101        assert result == [('foo', 'bar', 'baz', 12.1)]
     102
     103    def test_service_ticket_repr(self):
     104        # we can get a proper ServiceTicket representation
     105        # (i.e. one that can be fed to `eval`)
     106        sticket = ServiceTicket('foo', 'bar', 'baz', 12.1)
     107        st_repr = sticket.__repr__()
     108        assert st_repr == "ServiceTicket('foo', 'bar', 'baz', 12.1)"
     109
     110    def test_service_ticket_timestamp(self):
     111        # we get a timestamp stored, if none is passed to init
     112        sticket = ServiceTicket('foo', 'bar', 'baz')
     113        assert isinstance(sticket.ts, float)
     114
     115    def test_ticket_granting_cookie_add(self):
     116        # we can add ticket granting cookies
     117        assert self.db.engine.has_table('ticket_granting_cookies')
     118
     119        self.db.add(TicketGrantingCookie('foo'))
     120        assert [x.value for x in self.db.query(
     121            TicketGrantingCookie)] == ['foo']
     122
     123    def test_ticket_granting_cookie_repr(self):
     124        # we can get a proper ticket-granting cookie representation
     125        # (i.e. one that can be fed to `eval`)
     126        tgc = TicketGrantingCookie('foo', 12.1)
     127        assert tgc.__repr__() == "TicketGrantingCookie('foo', 12.1)"
     128
     129    def test_ticket_granting_cookie_timestamp(self):
     130        # we get a timestamp stored, if none is passed to init
     131        tgc = TicketGrantingCookie('foo')
     132        assert isinstance(tgc.ts, float)
  • main/waeup.cas/trunk/waeup/cas/tests/test_server.py

    r10352 r10394  
    11import os
     2import re
    23import shutil
    34import tempfile
    45import unittest
    56from paste.deploy import loadapp
    6 from webob import Request
    7 from waeup.cas.server import CASServer
     7from webob import Request, Response
     8from webtest import TestApp as WebTestApp  # avoid py.test skip message
     9from waeup.cas.authenticators import DummyAuthenticator
     10from waeup.cas.db import DB, LoginTicket, ServiceTicket, TicketGrantingCookie
     11from waeup.cas.server import (
     12    CASServer, create_service_ticket, create_login_ticket,
     13    create_tgc_value, check_login_ticket, set_session_cookie,
     14    )
     15
     16
     17RE_ALPHABET = re.compile('^[a-zA-Z0-9\-]*$')
     18RE_COOKIE = re.compile('^cas-tgc=[A-Za-z0-9\-]+; Path=/; secure; HttpOnly$')
    819
    920
     
    1930        self.workdir = os.path.join(self._new_tmpdir, 'home')
    2031        self.db_path = os.path.join(self.workdir, 'mycas.db')
     32        os.mkdir(self.workdir)
    2133        self.paste_conf1 = os.path.join(
    2234            os.path.dirname(__file__), 'sample1.ini')
     
    3446        app = loadapp('config:%s' % self.paste_conf1)
    3547        assert isinstance(app, CASServer)
    36         assert hasattr(app, 'db_path')
    37         assert app.db_path is not None
     48        assert hasattr(app, 'db')
     49        assert isinstance(app.db, DB)
     50        assert hasattr(app, 'auth')
    3851
    3952    def test_paste_deploy_options(self):
     
    4154        app = loadapp('config:%s' % self.paste_conf2)
    4255        assert isinstance(app, CASServer)
    43         assert app.db_path == '/a/path/to/cas.db'
     56        assert app.db_connection_string == 'sqlite:///:memory:'
     57        assert isinstance(app.auth, DummyAuthenticator)
    4458
    4559    def test_init(self):
    46         # we get a db dir set automatically
    47         app = CASServer()
    48         assert hasattr(app, 'db_path')
    49         assert app.db_path is not None
    50         assert os.path.exists(os.path.dirname(app.db_path))
     60        # we get a `DB` instance created automatically
     61        app = CASServer()
     62        assert hasattr(app, 'db')
     63        assert app.db is not None
    5164
    5265    def test_init_explicit_db_path(self):
    5366        # we can set a db_path explicitly
    54         app = CASServer(db_path=self.db_path)
    55         assert hasattr(app, 'db_path')
    56         assert app.db_path == self.db_path
     67        app = CASServer(db='sqlite:///%s' % self.db_path)
     68        assert hasattr(app, 'db')
     69        assert isinstance(app.db, DB)
     70        assert os.path.isfile(self.db_path)
     71
     72    def test_get_template(self):
     73        app = CASServer()
     74        assert app._get_template('login.html') is not None
     75        assert app._get_template('not-existent.html') is None
    5776
    5877    def test_call_root(self):
     
    93112        assert resp.content_type == 'text/html'
    94113        assert b'<form ' in resp.body
     114
     115
     116class BrowserTests(unittest.TestCase):
     117
     118    def setUp(self):
     119        self.raw_app = CASServer(auth=DummyAuthenticator())
     120        self.app = WebTestApp(self.raw_app)
     121
     122    def test_login(self):
     123        resp = self.app.get('/login')
     124        assert resp.status == '200 OK'
     125        form = resp.forms[0]
     126        # 2.1.3: form must be submitted by POST
     127        assert form.method == 'post'
     128        fieldnames = form.fields.keys()
     129        # 2.1.3: form must contain: username, password, lt
     130        assert 'username' in fieldnames
     131        assert 'password' in fieldnames
     132        assert 'lt' in fieldnames
     133        assert RE_ALPHABET.match(form['lt'].value)
     134
     135    def test_login_no_service(self):
     136        # w/o a service passed in, the form should not contain service
     137        # (not a strict protocol requirement, but handy)
     138        resp = self.app.get('/login')
     139        assert 'service' not in resp.forms[0].fields.keys()
     140
     141    def test_login_service_replayed(self):
     142        # 2.1.3: the login form must contain the service param sent
     143        resp = self.app.get('/login?service=http%3A%2F%2Fwww.service.com')
     144        form = resp.forms[0]
     145        assert resp.status == '200 OK'
     146        assert 'service' in form.fields.keys()
     147        assert form['service'].value == 'http://www.service.com'
     148
     149    def test_login_cred_acceptor_valid_no_service(self):
     150        # 2.2.4: successful login w/o service yields a message
     151        lt = create_login_ticket()
     152        self.raw_app.db.add(lt)
     153        lt_string = lt.ticket
     154        resp = self.app.post('/login', dict(
     155            username='bird', password='bebop', lt=lt_string))
     156        assert resp.status == '200 OK'
     157        assert b'successful' in resp.body
     158        # single-sign-on session initiated
     159        assert 'Set-Cookie' in resp.headers
     160        cookie = resp.headers['Set-Cookie']
     161        assert cookie.startswith('cas-tgc=')
     162
     163    def test_login_cred_acceptor_valid_w_service(self):
     164        # 2.2.4: successful login with service makes a redirect
     165        # Appendix B: safe redirect
     166        lt = create_login_ticket()
     167        self.raw_app.db.add(lt)
     168        lt_string = lt.ticket
     169        resp = self.app.post('/login', dict(
     170            username='bird', password='bebop', lt=lt_string,
     171            service='http://example.com/Login'))
     172        assert resp.status == '303 See Other'
     173        assert 'Location' in resp.headers
     174        assert resp.headers['Location'].startswith(
     175            'http://example.com/Login?ticket=ST-')
     176        assert 'Pragma' in resp.headers
     177        assert resp.headers['Pragma'] == 'no-cache'
     178        assert 'Cache-Control' in resp.headers
     179        assert resp.headers['Cache-Control'] == 'no-store'
     180        assert 'Expires' in resp.headers
     181        assert resp.headers['Expires'] == 'Thu, 01 Dec 1994 16:00:00 GMT'
     182        assert b'window.location.href' in resp.body
     183        assert b'noscript' in resp.body
     184        assert b'ticket=ST-' in resp.body
     185
     186
     187class CASServerHelperTests(unittest.TestCase):
     188
     189    def setUp(self):
     190        self.workdir = tempfile.mkdtemp()
     191        self.db_file = os.path.join(self.workdir, 'mycas.db')
     192        self.conn_string = 'sqlite:///%s' % self.db_file
     193        self.db = DB(self.conn_string)
     194
     195    def tearDown(self):
     196        shutil.rmtree(self.workdir)
     197
     198    def test_create_service_ticket(self):
     199        # we can create service tickets
     200        st = create_service_ticket(
     201            user='bob', service='http://www.example.com')
     202        assert isinstance(st, ServiceTicket)
     203        # 3.1.1: service not part of ticket
     204        assert 'example.com' not in st.ticket
     205        # 3.1.1: ticket must start with 'ST-'
     206        assert st.ticket.startswith('ST-')
     207        # 3.1.1: min. ticket length clients must be able to process is 32
     208        assert len(st.ticket) < 33
     209        # 3.7: allowed character set == [a-zA-Z0-9\-]
     210        assert RE_ALPHABET.match(st.ticket), (
     211            'Ticket contains forbidden chars: %s' % st)
     212
     213    def test_create_login_ticket(self):
     214        # we can create login tickets
     215        lt = create_login_ticket()
     216        # 3.5.1: ticket should start with 'LT-'
     217        assert lt.ticket.startswith('LT-')
     218        # 3.7: allowed character set == [a-zA-Z0-9\-]
     219        assert RE_ALPHABET.match(lt.ticket), (
     220            'Ticket contains forbidden chars: %s' % lt)
     221
     222    def test_create_login_ticket_unique(self):
     223        # 3.5.1: login tickets are unique (although not hard to guess)
     224        ticket_num = 1000  # increase to test more thoroughly
     225        lt_list = [create_login_ticket() for x in range(ticket_num)]
     226        assert len(set(lt_list)) == ticket_num
     227
     228    def test_create_tgc_value(self):
     229        # we can create ticket granting cookies
     230        tgc = create_tgc_value()
     231        assert isinstance(tgc, TicketGrantingCookie)
     232        # 3.6.1: cookie value should start with 'TGC-'
     233        assert tgc.value.startswith('TGC-')
     234        # 3.7: allowed character set == [a-zA-Z0-9\-]
     235        assert RE_ALPHABET.match(tgc.value), (
     236            'Cookie value contains forbidden chars: %s' % tgc)
     237
     238    def test_check_login_ticket(self):
     239        db = DB('sqlite:///')
     240        lt = LoginTicket('LT-123456')
     241        db.add(lt)
     242        assert check_login_ticket(db, None) is False
     243        assert check_login_ticket(db, 'LT-123456') is True
     244        # the ticket will be removed after check
     245        assert check_login_ticket(db, 'LT-123456') is False
     246        assert check_login_ticket(db, 'LT-654321') is False
     247
     248    def test_set_session_cookie(self):
     249        # make sure we can add session cookies to responses
     250        db = DB('sqlite:///')
     251        resp = set_session_cookie(Response(), db)
     252        assert 'Set-Cookie' in resp.headers
     253        cookie = resp.headers['Set-Cookie']
     254        assert RE_COOKIE.match(cookie), (
     255            'Cookie in unexpected format: %s' % cookie)
     256        # the cookie is stored in database
     257        value = cookie.split('=')[1].split(';')[0]
     258        q = db.query(TicketGrantingCookie).filter(
     259            TicketGrantingCookie.value == value)
     260        assert len(list(q)) == 1
Note: See TracChangeset for help on using the changeset viewer.