1 from openid.association import Association
2 from openid.cryptutil import randomString
3 from openid.store.nonce import mkNonce, split
4
5 import unittest
6 import string
7 import time
8 import socket
9 import random
10 import os
11
12 db_host = 'dbtest'
13
14 allowed_handle = []
15 for c in string.printable:
16 if c not in string.whitespace:
17 allowed_handle.append(c)
18 allowed_handle = ''.join(allowed_handle)
19
22
23 generateSecret = randomString
24
26 hostname = socket.gethostname()
27 hostname = hostname.replace('.', '_')
28 hostname = hostname.replace('-', '_')
29 return "%s_%d_%s_openid_test" % \
30 (hostname, os.getpid(), \
31 random.randrange(1, int(time.time())))
32
34 """Make sure a given store has a minimum of API compliance. Call
35 this function with an empty store.
36
37 Raises AssertionError if the store does not work as expected.
38
39 OpenIDStore -> NoneType
40 """
41
42 now = int(time.time())
43
44 server_url = 'http://www.myopenid.com/openid'
45 def genAssoc(issued, lifetime=600):
46 sec = generateSecret(20)
47 hdl = generateHandle(128)
48 return Association(hdl, sec, now + issued, lifetime, 'HMAC-SHA1')
49
50 def checkRetrieve(url, handle=None, expected=None):
51 retrieved_assoc = store.getAssociation(url, handle)
52 assert retrieved_assoc == expected, (retrieved_assoc, expected)
53 if expected is not None:
54 if retrieved_assoc is expected:
55 print ('Unexpected: retrieved a reference to the expected '
56 'value instead of a new object')
57 assert retrieved_assoc.handle == expected.handle
58 assert retrieved_assoc.secret == expected.secret
59
60 def checkRemove(url, handle, expected):
61 present = store.removeAssociation(url, handle)
62 assert bool(expected) == bool(present)
63
64 assoc = genAssoc(issued=0)
65
66
67 checkRetrieve(server_url)
68
69
70 store.storeAssociation(server_url, assoc)
71 checkRetrieve(server_url, None, assoc)
72
73
74 checkRetrieve(server_url, None, assoc)
75
76
77 store.storeAssociation(server_url, assoc)
78 checkRetrieve(server_url, None, assoc)
79
80
81 checkRemove(server_url, assoc.handle + 'x', False)
82
83
84 checkRemove(server_url + 'x', assoc.handle, False)
85
86
87 checkRemove(server_url, assoc.handle, True)
88
89
90 checkRemove(server_url, assoc.handle, False)
91
92
93 store.storeAssociation(server_url, assoc)
94
95
96 assoc2 = genAssoc(issued=1)
97 store.storeAssociation(server_url, assoc2)
98
99
100
101 checkRetrieve(server_url, None, assoc2)
102
103
104 checkRetrieve(server_url, assoc.handle, assoc)
105
106
107
108 checkRetrieve(server_url, assoc2.handle, assoc2)
109
110
111
112
113 assoc3 = genAssoc(issued=2, lifetime=100)
114 store.storeAssociation(server_url, assoc3)
115
116 checkRetrieve(server_url, None, assoc3)
117 checkRetrieve(server_url, assoc.handle, assoc)
118 checkRetrieve(server_url, assoc2.handle, assoc2)
119 checkRetrieve(server_url, assoc3.handle, assoc3)
120
121 checkRemove(server_url, assoc2.handle, True)
122
123 checkRetrieve(server_url, None, assoc3)
124 checkRetrieve(server_url, assoc.handle, assoc)
125 checkRetrieve(server_url, assoc2.handle, None)
126 checkRetrieve(server_url, assoc3.handle, assoc3)
127
128 checkRemove(server_url, assoc2.handle, False)
129 checkRemove(server_url, assoc3.handle, True)
130
131 checkRetrieve(server_url, None, assoc)
132 checkRetrieve(server_url, assoc.handle, assoc)
133 checkRetrieve(server_url, assoc2.handle, None)
134 checkRetrieve(server_url, assoc3.handle, None)
135
136 checkRemove(server_url, assoc2.handle, False)
137 checkRemove(server_url, assoc.handle, True)
138 checkRemove(server_url, assoc3.handle, False)
139
140 checkRetrieve(server_url, None, None)
141 checkRetrieve(server_url, assoc.handle, None)
142 checkRetrieve(server_url, assoc2.handle, None)
143 checkRetrieve(server_url, assoc3.handle, None)
144
145 checkRemove(server_url, assoc2.handle, False)
146 checkRemove(server_url, assoc.handle, False)
147 checkRemove(server_url, assoc3.handle, False)
148
149
150
151
152
153
154 assocValid1 = genAssoc(issued=-3600,lifetime=7200)
155 assocValid2 = genAssoc(issued=-5)
156 assocExpired1 = genAssoc(issued=-7200,lifetime=3600)
157 assocExpired2 = genAssoc(issued=-7200,lifetime=3600)
158
159 store.cleanupAssociations()
160 store.storeAssociation(server_url + '1', assocValid1)
161 store.storeAssociation(server_url + '1', assocExpired1)
162 store.storeAssociation(server_url + '2', assocExpired2)
163 store.storeAssociation(server_url + '3', assocValid2)
164
165 cleaned = store.cleanupAssociations()
166 assert cleaned == 2, cleaned
167
168
169
170 def checkUseNonce(nonce, expected, server_url, msg=''):
171 stamp, salt = split(nonce)
172 actual = store.useNonce(server_url, stamp, salt)
173 assert bool(actual) == bool(expected), "%r != %r: %s" % (actual, expected,
174 msg)
175
176 for url in [server_url, '']:
177
178 nonce1 = mkNonce()
179
180
181 checkUseNonce(nonce1, True, url)
182
183
184
185 checkUseNonce(nonce1, False, url)
186 checkUseNonce(nonce1, False, url)
187
188
189 old_nonce = mkNonce(3600)
190 checkUseNonce(old_nonce, False, url, "Old nonce (%r) passed." % (old_nonce,))
191
192
193 old_nonce1 = mkNonce(now - 20000)
194 old_nonce2 = mkNonce(now - 10000)
195 recent_nonce = mkNonce(now - 600)
196
197 from openid.store import nonce as nonceModule
198 orig_skew = nonceModule.SKEW
199 try:
200 nonceModule.SKEW = 0
201 store.cleanupNonces()
202
203 nonceModule.SKEW = 100000
204 assert store.useNonce(server_url, *split(old_nonce1))
205 assert store.useNonce(server_url, *split(old_nonce2))
206 assert store.useNonce(server_url, *split(recent_nonce))
207
208 nonceModule.SKEW = 3600
209 cleaned = store.cleanupNonces()
210 assert cleaned == 2, "Cleaned %r nonces." % (cleaned,)
211
212 nonceModule.SKEW = 100000
213
214
215 assert store.useNonce(server_url, *split(old_nonce1))
216 assert store.useNonce(server_url, *split(old_nonce2))
217
218 assert not store.useNonce(server_url, *split(recent_nonce))
219 finally:
220 nonceModule.SKEW = orig_skew
221
222
242
254
256 from openid.store import sqlstore
257 try:
258 import MySQLdb
259 except ImportError:
260 pass
261 else:
262 db_user = 'openid_test'
263 db_passwd = ''
264 db_name = getTmpDbName()
265
266 from MySQLdb.constants import ER
267
268
269 try:
270 conn = MySQLdb.connect(user=db_user, passwd=db_passwd, host = db_host)
271 except MySQLdb.OperationalError, why:
272 if why[0] == 2005:
273 print ('Skipping MySQL store test (cannot connect '
274 'to test server on host %r)' % (db_host,))
275 return
276 else:
277 raise
278
279 conn.query('CREATE DATABASE %s;' % db_name)
280 try:
281 conn.query('USE %s;' % db_name)
282
283
284
285 store = sqlstore.MySQLStore(conn)
286 store.createTables()
287
288
289 testStore(store)
290 finally:
291
292
293 conn.query('DROP DATABASE %s;' % db_name)
294
296 """
297 Tests the PostgreSQLStore on a locally-hosted PostgreSQL database
298 cluster, version 7.4 or later. To run this test, you must have:
299
300 - The 'psycopg' python module (version 1.1) installed
301
302 - PostgreSQL running locally
303
304 - An 'openid_test' user account in your database cluster, which
305 you can create by running 'createuser -Ad openid_test' as the
306 'postgres' user
307
308 - Trust auth for the 'openid_test' account, which you can activate
309 by adding the following line to your pg_hba.conf file:
310
311 local all openid_test trust
312
313 This test connects to the database cluster three times:
314
315 - To the 'template1' database, to create the test database
316
317 - To the test database, to run the store tests
318
319 - To the 'template1' database once more, to drop the test database
320 """
321 from openid.store import sqlstore
322 try:
323 import psycopg
324 except ImportError:
325 pass
326 else:
327 db_name = getTmpDbName()
328 db_user = 'openid_test'
329
330
331
332 conn_create = psycopg.connect(database = 'template1', user = db_user,
333 host = db_host)
334 conn_create.autocommit()
335
336
337 cursor = conn_create.cursor()
338 cursor.execute('CREATE DATABASE %s;' % (db_name,))
339 conn_create.close()
340
341
342 conn_test = psycopg.connect(database = db_name, user = db_user,
343 host = db_host)
344
345
346
347 store = sqlstore.PostgreSQLStore(conn_test)
348 store.createTables()
349
350
351 testStore(store)
352
353
354 conn_test.close()
355
356
357
358
359
360 import time
361 time.sleep(1)
362
363
364 conn_remove = psycopg.connect(database = 'template1', user = db_user,
365 host = db_host)
366 conn_remove.autocommit()
367
368 cursor = conn_remove.cursor()
369 cursor.execute('DROP DATABASE %s;' % (db_name,))
370 conn_remove.close()
371
375
376 test_functions = [
377 test_filestore,
378 test_sqlite,
379 test_mysql,
380 test_postgresql,
381 test_memstore,
382 ]
383
385 tests = map(unittest.FunctionTestCase, test_functions)
386 load = unittest.defaultTestLoader.loadTestsFromTestCase
387 return unittest.TestSuite(tests)
388
389 if __name__ == '__main__':
390 import sys
391 suite = pyUnitTests()
392 runner = unittest.TextTestRunner()
393 result = runner.run(suite)
394 if result.wasSuccessful():
395 sys.exit(0)
396 else:
397 sys.exit(1)
398