Package openid :: Package test :: Module storetest
[frames] | no frames]

Source Code for Module openid.test.storetest

  1  from openid.association import Association 
  2  from openid.cryptutil import randomString 
  3  from openid.store.nonce import mkNonce, split 
  4   
  5  import unittest 
  6  import string 
  7  import time 
  8  import socket 
  9  import random 
 10  import os 
 11   
 12  db_host = 'dbtest' 
 13   
 14  allowed_handle = [] 
 15  for c in string.printable: 
 16      if c not in string.whitespace: 
 17          allowed_handle.append(c) 
 18  allowed_handle = ''.join(allowed_handle) 
 19   
20 -def generateHandle(n):
21 return randomString(n, allowed_handle)
22 23 generateSecret = randomString 24
25 -def getTmpDbName():
26 hostname = socket.gethostname() 27 hostname = hostname.replace('.', '_') 28 hostname = hostname.replace('-', '_') 29 return "%s_%d_%s_openid_test" % \ 30 (hostname, os.getpid(), \ 31 random.randrange(1, int(time.time())))
32
33 -def testStore(store):
34 """Make sure a given store has a minimum of API compliance. Call 35 this function with an empty store. 36 37 Raises AssertionError if the store does not work as expected. 38 39 OpenIDStore -> NoneType 40 """ 41 ### Association functions 42 now = int(time.time()) 43 44 server_url = 'http://www.myopenid.com/openid' 45 def genAssoc(issued, lifetime=600): 46 sec = generateSecret(20) 47 hdl = generateHandle(128) 48 return Association(hdl, sec, now + issued, lifetime, 'HMAC-SHA1')
49 50 def checkRetrieve(url, handle=None, expected=None): 51 retrieved_assoc = store.getAssociation(url, handle) 52 assert retrieved_assoc == expected, (retrieved_assoc, expected) 53 if expected is not None: 54 if retrieved_assoc is expected: 55 print ('Unexpected: retrieved a reference to the expected ' 56 'value instead of a new object') 57 assert retrieved_assoc.handle == expected.handle 58 assert retrieved_assoc.secret == expected.secret 59 60 def checkRemove(url, handle, expected): 61 present = store.removeAssociation(url, handle) 62 assert bool(expected) == bool(present) 63 64 assoc = genAssoc(issued=0) 65 66 # Make sure that a missing association returns no result 67 checkRetrieve(server_url) 68 69 # Check that after storage, getting returns the same result 70 store.storeAssociation(server_url, assoc) 71 checkRetrieve(server_url, None, assoc) 72 73 # more than once 74 checkRetrieve(server_url, None, assoc) 75 76 # Storing more than once has no ill effect 77 store.storeAssociation(server_url, assoc) 78 checkRetrieve(server_url, None, assoc) 79 80 # Removing an association that does not exist returns not present 81 checkRemove(server_url, assoc.handle + 'x', False) 82 83 # Removing an association that does not exist returns not present 84 checkRemove(server_url + 'x', assoc.handle, False) 85 86 # Removing an association that is present returns present 87 checkRemove(server_url, assoc.handle, True) 88 89 # but not present on subsequent calls 90 checkRemove(server_url, assoc.handle, False) 91 92 # Put assoc back in the store 93 store.storeAssociation(server_url, assoc) 94 95 # More recent and expires after assoc 96 assoc2 = genAssoc(issued=1) 97 store.storeAssociation(server_url, assoc2) 98 99 # After storing an association with a different handle, but the 100 # same server_url, the handle with the later issue date is returned. 101 checkRetrieve(server_url, None, assoc2) 102 103 # We can still retrieve the older association 104 checkRetrieve(server_url, assoc.handle, assoc) 105 106 # Plus we can retrieve the association with the later issue date 107 # explicitly 108 checkRetrieve(server_url, assoc2.handle, assoc2) 109 110 # More recent, and expires earlier than assoc2 or assoc. Make sure 111 # that we're picking the one with the latest issued date and not 112 # taking into account the expiration. 113 assoc3 = genAssoc(issued=2, lifetime=100) 114 store.storeAssociation(server_url, assoc3) 115 116 checkRetrieve(server_url, None, assoc3) 117 checkRetrieve(server_url, assoc.handle, assoc) 118 checkRetrieve(server_url, assoc2.handle, assoc2) 119 checkRetrieve(server_url, assoc3.handle, assoc3) 120 121 checkRemove(server_url, assoc2.handle, True) 122 123 checkRetrieve(server_url, None, assoc3) 124 checkRetrieve(server_url, assoc.handle, assoc) 125 checkRetrieve(server_url, assoc2.handle, None) 126 checkRetrieve(server_url, assoc3.handle, assoc3) 127 128 checkRemove(server_url, assoc2.handle, False) 129 checkRemove(server_url, assoc3.handle, True) 130 131 checkRetrieve(server_url, None, assoc) 132 checkRetrieve(server_url, assoc.handle, assoc) 133 checkRetrieve(server_url, assoc2.handle, None) 134 checkRetrieve(server_url, assoc3.handle, None) 135 136 checkRemove(server_url, assoc2.handle, False) 137 checkRemove(server_url, assoc.handle, True) 138 checkRemove(server_url, assoc3.handle, False) 139 140 checkRetrieve(server_url, None, None) 141 checkRetrieve(server_url, assoc.handle, None) 142 checkRetrieve(server_url, assoc2.handle, None) 143 checkRetrieve(server_url, assoc3.handle, None) 144 145 checkRemove(server_url, assoc2.handle, False) 146 checkRemove(server_url, assoc.handle, False) 147 checkRemove(server_url, assoc3.handle, False) 148 149 ### test expired associations 150 # assoc 1: server 1, valid 151 # assoc 2: server 1, expired 152 # assoc 3: server 2, expired 153 # assoc 4: server 3, valid 154 assocValid1 = genAssoc(issued=-3600,lifetime=7200) 155 assocValid2 = genAssoc(issued=-5) 156 assocExpired1 = genAssoc(issued=-7200,lifetime=3600) 157 assocExpired2 = genAssoc(issued=-7200,lifetime=3600) 158 159 store.cleanupAssociations() 160 store.storeAssociation(server_url + '1', assocValid1) 161 store.storeAssociation(server_url + '1', assocExpired1) 162 store.storeAssociation(server_url + '2', assocExpired2) 163 store.storeAssociation(server_url + '3', assocValid2) 164 165 cleaned = store.cleanupAssociations() 166 assert cleaned == 2, cleaned 167 168 ### Nonce functions 169 170 def checkUseNonce(nonce, expected, server_url, msg=''): 171 stamp, salt = split(nonce) 172 actual = store.useNonce(server_url, stamp, salt) 173 assert bool(actual) == bool(expected), "%r != %r: %s" % (actual, expected, 174 msg) 175 176 for url in [server_url, '']: 177 # Random nonce (not in store) 178 nonce1 = mkNonce() 179 180 # A nonce is allowed by default 181 checkUseNonce(nonce1, True, url) 182 183 # Storing once causes useNonce to return True the first, and only 184 # the first, time it is called after the store. 185 checkUseNonce(nonce1, False, url) 186 checkUseNonce(nonce1, False, url) 187 188 # Nonces from when the universe was an hour old should not pass these days. 189 old_nonce = mkNonce(3600) 190 checkUseNonce(old_nonce, False, url, "Old nonce (%r) passed." % (old_nonce,)) 191 192 193 old_nonce1 = mkNonce(now - 20000) 194 old_nonce2 = mkNonce(now - 10000) 195 recent_nonce = mkNonce(now - 600) 196 197 from openid.store import nonce as nonceModule 198 orig_skew = nonceModule.SKEW 199 try: 200 nonceModule.SKEW = 0 201 store.cleanupNonces() 202 # Set SKEW high so stores will keep our nonces. 203 nonceModule.SKEW = 100000 204 assert store.useNonce(server_url, *split(old_nonce1)) 205 assert store.useNonce(server_url, *split(old_nonce2)) 206 assert store.useNonce(server_url, *split(recent_nonce)) 207 208 nonceModule.SKEW = 3600 209 cleaned = store.cleanupNonces() 210 assert cleaned == 2, "Cleaned %r nonces." % (cleaned,) 211 212 nonceModule.SKEW = 100000 213 # A roundabout method of checking that the old nonces were cleaned is 214 # to see if we're allowed to add them again. 215 assert store.useNonce(server_url, *split(old_nonce1)) 216 assert store.useNonce(server_url, *split(old_nonce2)) 217 # The recent nonce wasn't cleaned, so it should still fail. 218 assert not store.useNonce(server_url, *split(recent_nonce)) 219 finally: 220 nonceModule.SKEW = orig_skew 221 222
223 -def test_filestore():
224 from openid.store import filestore 225 import tempfile 226 import shutil 227 try: 228 temp_dir = tempfile.mkdtemp() 229 except AttributeError: 230 import os 231 temp_dir = os.tmpnam() 232 os.mkdir(temp_dir) 233 234 store = filestore.FileOpenIDStore(temp_dir) 235 try: 236 testStore(store) 237 store.cleanup() 238 except: 239 raise 240 else: 241 shutil.rmtree(temp_dir)
242
243 -def test_sqlite():
244 from openid.store import sqlstore 245 try: 246 from pysqlite2 import dbapi2 as sqlite 247 except ImportError: 248 pass 249 else: 250 conn = sqlite.connect(':memory:') 251 store = sqlstore.SQLiteStore(conn) 252 store.createTables() 253 testStore(store)
254
255 -def test_mysql():
256 from openid.store import sqlstore 257 try: 258 import MySQLdb 259 except ImportError: 260 pass 261 else: 262 db_user = 'openid_test' 263 db_passwd = '' 264 db_name = getTmpDbName() 265 266 from MySQLdb.constants import ER 267 268 # Change this connect line to use the right user and password 269 try: 270 conn = MySQLdb.connect(user=db_user, passwd=db_passwd, host = db_host) 271 except MySQLdb.OperationalError, why: 272 if why[0] == 2005: 273 print ('Skipping MySQL store test (cannot connect ' 274 'to test server on host %r)' % (db_host,)) 275 return 276 else: 277 raise 278 279 conn.query('CREATE DATABASE %s;' % db_name) 280 try: 281 conn.query('USE %s;' % db_name) 282 283 # OK, we're in the right environment. Create store and 284 # create the tables. 285 store = sqlstore.MySQLStore(conn) 286 store.createTables() 287 288 # At last, we get to run the test. 289 testStore(store) 290 finally: 291 # Remove the database. If you want to do post-mortem on a 292 # failing test, comment out this line. 293 conn.query('DROP DATABASE %s;' % db_name)
294
295 -def test_postgresql():
296 """ 297 Tests the PostgreSQLStore on a locally-hosted PostgreSQL database 298 cluster, version 7.4 or later. To run this test, you must have: 299 300 - The 'psycopg' python module (version 1.1) installed 301 302 - PostgreSQL running locally 303 304 - An 'openid_test' user account in your database cluster, which 305 you can create by running 'createuser -Ad openid_test' as the 306 'postgres' user 307 308 - Trust auth for the 'openid_test' account, which you can activate 309 by adding the following line to your pg_hba.conf file: 310 311 local all openid_test trust 312 313 This test connects to the database cluster three times: 314 315 - To the 'template1' database, to create the test database 316 317 - To the test database, to run the store tests 318 319 - To the 'template1' database once more, to drop the test database 320 """ 321 from openid.store import sqlstore 322 try: 323 import psycopg 324 except ImportError: 325 pass 326 else: 327 db_name = getTmpDbName() 328 db_user = 'openid_test' 329 330 # Connect once to create the database; reconnect to access the 331 # new database. 332 conn_create = psycopg.connect(database = 'template1', user = db_user, 333 host = db_host) 334 conn_create.autocommit() 335 336 # Create the test database. 337 cursor = conn_create.cursor() 338 cursor.execute('CREATE DATABASE %s;' % (db_name,)) 339 conn_create.close() 340 341 # Connect to the test database. 342 conn_test = psycopg.connect(database = db_name, user = db_user, 343 host = db_host) 344 345 # OK, we're in the right environment. Create the store 346 # instance and create the tables. 347 store = sqlstore.PostgreSQLStore(conn_test) 348 store.createTables() 349 350 # At last, we get to run the test. 351 testStore(store) 352 353 # Disconnect. 354 conn_test.close() 355 356 # It takes a little time for the close() call above to take 357 # effect, so we'll wait for a second before trying to remove 358 # the database. (Maybe this is because we're using a UNIX 359 # socket to connect to postgres rather than TCP?) 360 import time 361 time.sleep(1) 362 363 # Remove the database now that the test is over. 364 conn_remove = psycopg.connect(database = 'template1', user = db_user, 365 host = db_host) 366 conn_remove.autocommit() 367 368 cursor = conn_remove.cursor() 369 cursor.execute('DROP DATABASE %s;' % (db_name,)) 370 conn_remove.close()
371
372 -def test_memstore():
373 from openid.store import memstore 374 testStore(memstore.MemoryStore())
375 376 test_functions = [ 377 test_filestore, 378 test_sqlite, 379 test_mysql, 380 test_postgresql, 381 test_memstore, 382 ] 383
384 -def pyUnitTests():
385 tests = map(unittest.FunctionTestCase, test_functions) 386 load = unittest.defaultTestLoader.loadTestsFromTestCase 387 return unittest.TestSuite(tests)
388 389 if __name__ == '__main__': 390 import sys 391 suite = pyUnitTests() 392 runner = unittest.TextTestRunner() 393 result = runner.run(suite) 394 if result.wasSuccessful(): 395 sys.exit(0) 396 else: 397 sys.exit(1) 398