Package openid :: Package test :: Module test_consumer
[frames] | no frames]

Source Code for Module openid.test.test_consumer

   1  import urlparse 
   2  import cgi 
   3  import time 
   4  import warnings 
   5   
   6  from openid.message import Message, OPENID_NS, OPENID2_NS, IDENTIFIER_SELECT, \ 
   7       OPENID1_NS, BARE_NS 
   8  from openid import cryptutil, dh, oidutil, kvform 
   9  from openid.store.nonce import mkNonce, split as splitNonce 
  10  from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_2_0_TYPE, \ 
  11       OPENID_1_1_TYPE 
  12  from openid.consumer.consumer import \ 
  13       AuthRequest, GenericConsumer, SUCCESS, FAILURE, CANCEL, SETUP_NEEDED, \ 
  14       SuccessResponse, FailureResponse, SetupNeededResponse, CancelResponse, \ 
  15       DiffieHellmanSHA1ConsumerSession, Consumer, PlainTextConsumerSession, \ 
  16       SetupNeededError, DiffieHellmanSHA256ConsumerSession, ServerError, \ 
  17       ProtocolError, _httpResponseToMessage 
  18  from openid import association 
  19  from openid.server.server import \ 
  20       PlainTextServerSession, DiffieHellmanSHA1ServerSession 
  21  from openid.yadis.manager import Discovery 
  22  from openid.yadis.discover import DiscoveryFailure 
  23  from openid.dh import DiffieHellman 
  24   
  25  from openid.fetchers import HTTPResponse, HTTPFetchingError 
  26  from openid import fetchers 
  27  from openid.store import memstore 
  28   
  29  from support import CatchLogs 
  30   
  31  assocs = [ 
  32      ('another 20-byte key.', 'Snarky'), 
  33      ('\x00' * 20, 'Zeros'), 
  34      ] 
  35   
36 -def mkSuccess(endpoint, q):
37 """Convenience function to create a SuccessResponse with the given 38 arguments, all signed.""" 39 signed_list = ['openid.' + k for k in q.keys()] 40 return SuccessResponse(endpoint, Message.fromOpenIDArgs(q), signed_list)
41
42 -def parseQuery(qs):
43 q = {} 44 for (k, v) in cgi.parse_qsl(qs): 45 assert not q.has_key(k) 46 q[k] = v 47 return q
48
49 -def associate(qs, assoc_secret, assoc_handle):
50 """Do the server's half of the associate call, using the given 51 secret and handle.""" 52 q = parseQuery(qs) 53 assert q['openid.mode'] == 'associate' 54 assert q['openid.assoc_type'] == 'HMAC-SHA1' 55 reply_dict = { 56 'assoc_type':'HMAC-SHA1', 57 'assoc_handle':assoc_handle, 58 'expires_in':'600', 59 } 60 61 if q.get('openid.session_type') == 'DH-SHA1': 62 assert len(q) == 6 or len(q) == 4 63 message = Message.fromPostArgs(q) 64 session = DiffieHellmanSHA1ServerSession.fromMessage(message) 65 reply_dict['session_type'] = 'DH-SHA1' 66 else: 67 assert len(q) == 2 68 session = PlainTextServerSession.fromQuery(q) 69 70 reply_dict.update(session.answer(assoc_secret)) 71 return kvform.dictToKV(reply_dict)
72 73 74 GOODSIG = "[A Good Signature]" 75 76
77 -class GoodAssociation:
78 expiresIn = 3600 79 handle = "-blah-" 80
81 - def getExpiresIn(self):
82 return self.expiresIn
83
84 - def checkMessageSignature(self, message):
85 return message.getArg(OPENID_NS, 'sig') == GOODSIG
86 87
88 -class GoodAssocStore(memstore.MemoryStore):
89 - def getAssociation(self, server_url, handle=None):
90 return GoodAssociation()
91 92
93 -class TestFetcher(object):
94 - def __init__(self, user_url, user_page, (assoc_secret, assoc_handle)):
95 self.get_responses = {user_url:self.response(user_url, 200, user_page)} 96 self.assoc_secret = assoc_secret 97 self.assoc_handle = assoc_handle 98 self.num_assocs = 0
99
100 - def response(self, url, status, body):
101 return HTTPResponse( 102 final_url=url, status=status, headers={}, body=body)
103
104 - def fetch(self, url, body=None, headers=None):
105 if body is None: 106 if url in self.get_responses: 107 return self.get_responses[url] 108 else: 109 try: 110 body.index('openid.mode=associate') 111 except ValueError: 112 pass # fall through 113 else: 114 assert body.find('DH-SHA1') != -1 115 response = associate( 116 body, self.assoc_secret, self.assoc_handle) 117 self.num_assocs += 1 118 return self.response(url, 200, response) 119 120 return self.response(url, 404, 'Not found')
121
122 -def makeFastConsumerSession():
123 """ 124 Create custom DH object so tests run quickly. 125 """ 126 dh = DiffieHellman(100389557, 2) 127 return DiffieHellmanSHA1ConsumerSession(dh)
128
129 -def setConsumerSession(con):
130 con.session_types = {'DH-SHA1': makeFastConsumerSession}
131
132 -def _test_success(server_url, user_url, delegate_url, links, immediate=False):
133 store = memstore.MemoryStore() 134 if immediate: 135 mode = 'checkid_immediate' 136 else: 137 mode = 'checkid_setup' 138 139 endpoint = OpenIDServiceEndpoint() 140 endpoint.claimed_id = user_url 141 endpoint.server_url = server_url 142 endpoint.local_id = delegate_url 143 endpoint.type_uris = [OPENID_1_1_TYPE] 144 145 fetcher = TestFetcher(None, None, assocs[0]) 146 fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False) 147 148 def run(): 149 trust_root = consumer_url 150 151 consumer = GenericConsumer(store) 152 setConsumerSession(consumer) 153 154 request = consumer.begin(endpoint) 155 return_to = consumer_url 156 157 m = request.getMessage(trust_root, return_to, immediate) 158 159 redirect_url = request.redirectURL(trust_root, return_to, immediate) 160 161 parsed = urlparse.urlparse(redirect_url) 162 qs = parsed[4] 163 q = parseQuery(qs) 164 new_return_to = q['openid.return_to'] 165 del q['openid.return_to'] 166 assert q == { 167 'openid.mode':mode, 168 'openid.identity':delegate_url, 169 'openid.trust_root':trust_root, 170 'openid.assoc_handle':fetcher.assoc_handle, 171 }, (q, user_url, delegate_url, mode) 172 173 assert new_return_to.startswith(return_to) 174 assert redirect_url.startswith(server_url) 175 176 parsed = urlparse.urlparse(new_return_to) 177 query = parseQuery(parsed[4]) 178 query.update({ 179 'openid.mode':'id_res', 180 'openid.return_to':new_return_to, 181 'openid.identity':delegate_url, 182 'openid.assoc_handle':fetcher.assoc_handle, 183 }) 184 185 assoc = store.getAssociation(server_url, fetcher.assoc_handle) 186 187 message = Message.fromPostArgs(query) 188 message = assoc.signMessage(message) 189 info = consumer.complete(message, request.endpoint, new_return_to) 190 assert info.status == SUCCESS, info.message 191 assert info.identity_url == user_url
192 193 assert fetcher.num_assocs == 0 194 run() 195 assert fetcher.num_assocs == 1 196 197 # Test that doing it again uses the existing association 198 run() 199 assert fetcher.num_assocs == 1 200 201 # Another association is created if we remove the existing one 202 store.removeAssociation(server_url, fetcher.assoc_handle) 203 run() 204 assert fetcher.num_assocs == 2 205 206 # Test that doing it again uses the existing association 207 run() 208 assert fetcher.num_assocs == 2 209 210 import unittest 211 212 http_server_url = 'http://server.example.com/' 213 consumer_url = 'http://consumer.example.com/' 214 https_server_url = 'https://server.example.com/' 215
216 -class TestSuccess(unittest.TestCase, CatchLogs):
217 server_url = http_server_url 218 user_url = 'http://www.example.com/user.html' 219 delegate_url = 'http://consumer.example.com/user' 220
221 - def setUp(self):
222 CatchLogs.setUp(self) 223 self.links = '<link rel="openid.server" href="%s" />' % ( 224 self.server_url,) 225 226 self.delegate_links = ('<link rel="openid.server" href="%s" />' 227 '<link rel="openid.delegate" href="%s" />') % ( 228 self.server_url, self.delegate_url)
229
230 - def tearDown(self):
231 CatchLogs.tearDown(self)
232
233 - def test_nodelegate(self):
234 _test_success(self.server_url, self.user_url, 235 self.user_url, self.links)
236
237 - def test_nodelegateImmediate(self):
238 _test_success(self.server_url, self.user_url, 239 self.user_url, self.links, True)
240
241 - def test_delegate(self):
242 _test_success(self.server_url, self.user_url, 243 self.delegate_url, self.delegate_links)
244
245 - def test_delegateImmediate(self):
246 _test_success(self.server_url, self.user_url, 247 self.delegate_url, self.delegate_links, True)
248 249
250 -class TestSuccessHTTPS(TestSuccess):
251 server_url = https_server_url
252 253
254 -class TestConstruct(unittest.TestCase):
255 - def setUp(self):
256 self.store_sentinel = object()
257
258 - def test_construct(self):
259 oidc = GenericConsumer(self.store_sentinel) 260 self.failUnless(oidc.store is self.store_sentinel)
261
262 - def test_nostore(self):
263 self.failUnlessRaises(TypeError, GenericConsumer)
264 265
266 -class TestIdRes(unittest.TestCase, CatchLogs):
267 consumer_class = GenericConsumer 268
269 - def setUp(self):
270 CatchLogs.setUp(self) 271 272 self.store = memstore.MemoryStore() 273 self.consumer = self.consumer_class(self.store) 274 self.return_to = "nonny" 275 self.endpoint = OpenIDServiceEndpoint() 276 self.endpoint.claimed_id = self.consumer_id = "consu" 277 self.endpoint.server_url = self.server_url = "serlie" 278 self.endpoint.local_id = self.server_id = "sirod" 279 self.endpoint.type_uris = [OPENID_1_1_TYPE]
280
282 """Set the discovery verification to a no-op for test cases in 283 which we don't care.""" 284 def dummyVerifyDiscover(_, endpoint): 285 return endpoint
286 self.consumer._verifyDiscoveryResults = dummyVerifyDiscover
287
288 - def disableReturnToChecking(self):
289 def checkReturnTo(unused1, unused2): 290 return True
291 self.consumer._checkReturnTo = checkReturnTo 292 complete = self.consumer.complete 293 def callCompleteWithoutReturnTo(message, endpoint): 294 return complete(message, endpoint, None) 295 self.consumer.complete = callCompleteWithoutReturnTo 296
297 -class TestIdResCheckSignature(TestIdRes):
298 - def setUp(self):
299 TestIdRes.setUp(self) 300 self.assoc = GoodAssociation() 301 self.assoc.handle = "{not_dumb}" 302 self.store.storeAssociation(self.endpoint.server_url, self.assoc) 303 304 self.message = Message.fromPostArgs({ 305 'openid.mode': 'id_res', 306 'openid.identity': '=example', 307 'openid.sig': GOODSIG, 308 'openid.assoc_handle': self.assoc.handle, 309 'openid.signed': 'mode,identity,assoc_handle,signed', 310 'frobboz': 'banzit', 311 })
312 313
314 - def test_sign(self):
315 # assoc_handle to assoc with good sig 316 self.consumer._idResCheckSignature(self.message, 317 self.endpoint.server_url)
318 319
320 - def test_signFailsWithBadSig(self):
321 self.message.setArg(OPENID_NS, 'sig', 'BAD SIGNATURE') 322 self.failUnlessRaises( 323 ProtocolError, self.consumer._idResCheckSignature, 324 self.message, self.endpoint.server_url)
325 326
327 - def test_stateless(self):
328 # assoc_handle missing assoc, consumer._checkAuth returns goodthings 329 self.message.setArg(OPENID_NS, "assoc_handle", "dumbHandle") 330 self.consumer._processCheckAuthResponse = ( 331 lambda response, server_url: True) 332 self.consumer._makeKVPost = lambda args, server_url: {} 333 self.consumer._idResCheckSignature(self.message, 334 self.endpoint.server_url)
335
336 - def test_statelessRaisesError(self):
337 # assoc_handle missing assoc, consumer._checkAuth returns goodthings 338 self.message.setArg(OPENID_NS, "assoc_handle", "dumbHandle") 339 self.consumer._checkAuth = lambda unused1, unused2: False 340 self.failUnlessRaises( 341 ProtocolError, self.consumer._idResCheckSignature, 342 self.message, self.endpoint.server_url)
343
344 - def test_stateless_noStore(self):
345 # assoc_handle missing assoc, consumer._checkAuth returns goodthings 346 self.message.setArg(OPENID_NS, "assoc_handle", "dumbHandle") 347 self.consumer.store = None 348 self.consumer._processCheckAuthResponse = ( 349 lambda response, server_url: True) 350 self.consumer._makeKVPost = lambda args, server_url: {} 351 self.consumer._idResCheckSignature(self.message, 352 self.endpoint.server_url)
353
355 # assoc_handle missing assoc, consumer._checkAuth returns goodthings 356 self.message.setArg(OPENID_NS, "assoc_handle", "dumbHandle") 357 self.consumer._checkAuth = lambda unused1, unused2: False 358 self.consumer.store = None 359 self.failUnlessRaises( 360 ProtocolError, self.consumer._idResCheckSignature, 361 self.message, self.endpoint.server_url)
362 363
364 -class TestQueryFormat(TestIdRes):
365 - def test_notAList(self):
366 # XXX: should be a Message object test, not a consumer test 367 368 # Value should be a single string. If it's a list, it should generate 369 # an exception. 370 query = {'openid.mode': ['cancel']} 371 try: 372 r = Message.fromPostArgs(query) 373 except TypeError, err: 374 self.failUnless(str(err).find('values') != -1, err) 375 else: 376 self.fail("expected TypeError, got this instead: %s" % (r,))
377
378 -class TestComplete(TestIdRes):
379 """Testing GenericConsumer.complete. 380 381 Other TestIdRes subclasses test more specific aspects. 382 """ 383
384 - def test_setupNeededIdRes(self):
385 message = Message.fromOpenIDArgs({'mode': 'id_res'}) 386 setup_url_sentinel = object() 387 388 def raiseSetupNeeded(msg): 389 self.failUnless(msg is message) 390 raise SetupNeededError(setup_url_sentinel)
391 392 self.consumer._checkSetupNeeded = raiseSetupNeeded 393 394 response = self.consumer.complete(message, None, None) 395 self.failUnlessEqual(SETUP_NEEDED, response.status) 396 self.failUnless(setup_url_sentinel is response.setup_url)
397
398 - def test_cancel(self):
399 message = Message.fromPostArgs({'openid.mode': 'cancel'}) 400 self.disableReturnToChecking() 401 r = self.consumer.complete(message, self.endpoint) 402 self.failUnlessEqual(r.status, CANCEL) 403 self.failUnless(r.identity_url == self.endpoint.claimed_id)
404
405 - def test_cancel_with_return_to(self):
406 message = Message.fromPostArgs({'openid.mode': 'cancel'}) 407 r = self.consumer.complete(message, self.endpoint, self.return_to) 408 self.failUnlessEqual(r.status, CANCEL) 409 self.failUnless(r.identity_url == self.endpoint.claimed_id)
410
411 - def test_error(self):
412 msg = 'an error message' 413 message = Message.fromPostArgs({'openid.mode': 'error', 414 'openid.error': msg, 415 }) 416 self.disableReturnToChecking() 417 r = self.consumer.complete(message, self.endpoint) 418 self.failUnlessEqual(r.status, FAILURE) 419 self.failUnless(r.identity_url == self.endpoint.claimed_id) 420 self.failUnlessEqual(r.message, msg)
421
422 - def test_errorWithNoOptionalKeys(self):
423 msg = 'an error message' 424 contact = 'some contact info here' 425 message = Message.fromPostArgs({'openid.mode': 'error', 426 'openid.error': msg, 427 'openid.contact': contact, 428 }) 429 self.disableReturnToChecking() 430 r = self.consumer.complete(message, self.endpoint) 431 self.failUnlessEqual(r.status, FAILURE) 432 self.failUnless(r.identity_url == self.endpoint.claimed_id) 433 self.failUnless(r.contact == contact) 434 self.failUnless(r.reference is None) 435 self.failUnlessEqual(r.message, msg)
436
437 - def test_errorWithOptionalKeys(self):
438 msg = 'an error message' 439 contact = 'me' 440 reference = 'support ticket' 441 message = Message.fromPostArgs({'openid.mode': 'error', 442 'openid.error': msg, 'openid.reference': reference, 443 'openid.contact': contact, 'openid.ns': OPENID2_NS, 444 }) 445 r = self.consumer.complete(message, self.endpoint, None) 446 self.failUnlessEqual(r.status, FAILURE) 447 self.failUnless(r.identity_url == self.endpoint.claimed_id) 448 self.failUnless(r.contact == contact) 449 self.failUnless(r.reference == reference) 450 self.failUnlessEqual(r.message, msg)
451
452 - def test_noMode(self):
453 message = Message.fromPostArgs({}) 454 r = self.consumer.complete(message, self.endpoint, None) 455 self.failUnlessEqual(r.status, FAILURE) 456 self.failUnless(r.identity_url == self.endpoint.claimed_id)
457
458 - def test_idResMissingField(self):
459 # XXX - this test is passing, but not necessarily by what it 460 # is supposed to test for. status in FAILURE, but it's because 461 # *check_auth* failed, not because it's missing an arg, exactly. 462 message = Message.fromPostArgs({'openid.mode': 'id_res'}) 463 self.failUnlessRaises(ProtocolError, self.consumer._doIdRes, 464 message, self.endpoint, None)
465
466 - def test_idResURLMismatch(self):
467 class VerifiedError(Exception): pass 468 469 def discoverAndVerify(claimed_id, _to_match_endpoints): 470 raise VerifiedError
471 472 self.consumer._discoverAndVerify = discoverAndVerify 473 self.disableReturnToChecking() 474 475 message = Message.fromPostArgs( 476 {'openid.mode': 'id_res', 477 'openid.return_to': 'return_to (just anything)', 478 'openid.identity': 'something wrong (not self.consumer_id)', 479 'openid.assoc_handle': 'does not matter', 480 'openid.sig': GOODSIG, 481 'openid.signed': 'identity,return_to', 482 }) 483 self.consumer.store = GoodAssocStore() 484 485 self.failUnlessRaises(VerifiedError, 486 self.consumer.complete, 487 message, self.endpoint) 488 489 self.failUnlessLogMatches('Error attempting to use stored', 490 'Attempting discovery') 491
492 -class TestCompleteMissingSig(unittest.TestCase, CatchLogs):
493
494 - def setUp(self):
495 self.store = GoodAssocStore() 496 self.consumer = GenericConsumer(self.store) 497 self.server_url = "http://idp.unittest/" 498 CatchLogs.setUp(self) 499 500 claimed_id = 'bogus.claimed' 501 502 self.message = Message.fromOpenIDArgs( 503 {'mode': 'id_res', 504 'return_to': 'return_to (just anything)', 505 'identity': claimed_id, 506 'assoc_handle': 'does not matter', 507 'sig': GOODSIG, 508 'response_nonce': mkNonce(), 509 'signed': 'identity,return_to,response_nonce,assoc_handle,claimed_id,op_endpoint', 510 'claimed_id': claimed_id, 511 'op_endpoint': self.server_url, 512 'ns':OPENID2_NS, 513 }) 514 515 self.endpoint = OpenIDServiceEndpoint() 516 self.endpoint.server_url = self.server_url 517 self.endpoint.claimed_id = claimed_id 518 self.consumer._checkReturnTo = lambda unused1, unused2 : True
519
520 - def tearDown(self):
521 CatchLogs.tearDown(self)
522 523
524 - def test_idResMissingNoSigs(self):
525 def _vrfy(resp_msg, endpoint=None): 526 return endpoint
527 528 self.consumer._verifyDiscoveryResults = _vrfy 529 r = self.consumer.complete(self.message, self.endpoint, None) 530 self.failUnlessSuccess(r)
531 532
533 - def test_idResNoIdentity(self):
534 self.message.delArg(OPENID_NS, 'identity') 535 self.message.delArg(OPENID_NS, 'claimed_id') 536 self.endpoint.claimed_id = None 537 self.message.setArg(OPENID_NS, 'signed', 'return_to,response_nonce,assoc_handle,op_endpoint') 538 r = self.consumer.complete(self.message, self.endpoint, None) 539 self.failUnlessSuccess(r)
540 541
542 - def test_idResMissingIdentitySig(self):
543 self.message.setArg(OPENID_NS, 'signed', 'return_to,response_nonce,assoc_handle,claimed_id') 544 r = self.consumer.complete(self.message, self.endpoint, None) 545 self.failUnlessEqual(r.status, FAILURE)
546 547
548 - def test_idResMissingReturnToSig(self):
549 self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,assoc_handle,claimed_id') 550 r = self.consumer.complete(self.message, self.endpoint, None) 551 self.failUnlessEqual(r.status, FAILURE)
552 553
554 - def test_idResMissingAssocHandleSig(self):
555 self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,return_to,claimed_id') 556 r = self.consumer.complete(self.message, self.endpoint, None) 557 self.failUnlessEqual(r.status, FAILURE)
558 559
560 - def test_idResMissingClaimedIDSig(self):
561 self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,return_to,assoc_handle') 562 r = self.consumer.complete(self.message, self.endpoint, None) 563 self.failUnlessEqual(r.status, FAILURE)
564 565
566 - def failUnlessSuccess(self, response):
567 if response.status != SUCCESS: 568 self.fail("Non-successful response: %s" % (response,))
569 570 571
572 -class TestCheckAuthResponse(TestIdRes, CatchLogs):
573 - def setUp(self):
574 CatchLogs.setUp(self) 575 TestIdRes.setUp(self)
576
577 - def tearDown(self):
578 CatchLogs.tearDown(self)
579
580 - def _createAssoc(self):
581 issued = time.time() 582 lifetime = 1000 583 assoc = association.Association( 584 'handle', 'secret', issued, lifetime, 'HMAC-SHA1') 585 store = self.consumer.store 586 store.storeAssociation(self.server_url, assoc) 587 assoc2 = store.getAssociation(self.server_url) 588 self.failUnlessEqual(assoc, assoc2)
589
590 - def test_goodResponse(self):
591 """successful response to check_authentication""" 592 response = Message.fromOpenIDArgs({'is_valid':'true',}) 593 r = self.consumer._processCheckAuthResponse(response, self.server_url) 594 self.failUnless(r)
595
596 - def test_missingAnswer(self):
597 """check_authentication returns false when the server sends no answer""" 598 response = Message.fromOpenIDArgs({}) 599 r = self.consumer._processCheckAuthResponse(response, self.server_url) 600 self.failIf(r)
601
602 - def test_badResponse(self):
603 """check_authentication returns false when is_valid is false""" 604 response = Message.fromOpenIDArgs({'is_valid':'false',}) 605 r = self.consumer._processCheckAuthResponse(response, self.server_url) 606 self.failIf(r)
607
608 - def test_badResponseInvalidate(self):
609 """Make sure that the handle is invalidated when is_valid is false 610 611 From "Verifying directly with the OpenID Provider":: 612 613 If the OP responds with "is_valid" set to "true", and 614 "invalidate_handle" is present, the Relying Party SHOULD 615 NOT send further authentication requests with that handle. 616 """ 617 self._createAssoc() 618 response = Message.fromOpenIDArgs({ 619 'is_valid':'false', 620 'invalidate_handle':'handle', 621 }) 622 r = self.consumer._processCheckAuthResponse(response, self.server_url) 623 self.failIf(r) 624 self.failUnless( 625 self.consumer.store.getAssociation(self.server_url) is None)
626
627 - def test_invalidateMissing(self):
628 """invalidate_handle with a handle that is not present""" 629 response = Message.fromOpenIDArgs({ 630 'is_valid':'true', 631 'invalidate_handle':'missing', 632 }) 633 r = self.consumer._processCheckAuthResponse(response, self.server_url) 634 self.failUnless(r) 635 self.failUnlessLogMatches( 636 'Received "invalidate_handle"' 637 )
638
640 """invalidate_handle with a handle that is not present""" 641 response = Message.fromOpenIDArgs({ 642 'is_valid':'true', 643 'invalidate_handle':'missing', 644 }) 645 self.consumer.store = None 646 r = self.consumer._processCheckAuthResponse(response, self.server_url) 647 self.failUnless(r) 648 self.failUnlessLogMatches( 649 'Received "invalidate_handle"', 650 'Unexpectedly got invalidate_handle without a store')
651
652 - def test_invalidatePresent(self):
653 """invalidate_handle with a handle that exists 654 655 From "Verifying directly with the OpenID Provider":: 656 657 If the OP responds with "is_valid" set to "true", and 658 "invalidate_handle" is present, the Relying Party SHOULD 659 NOT send further authentication requests with that handle. 660 """ 661 self._createAssoc() 662 response = Message.fromOpenIDArgs({ 663 'is_valid':'true', 664 'invalidate_handle':'handle', 665 }) 666 r = self.consumer._processCheckAuthResponse(response, self.server_url) 667 self.failUnless(r) 668 self.failUnless( 669 self.consumer.store.getAssociation(self.server_url) is None)
670
671 -class TestSetupNeeded(TestIdRes):
672 - def failUnlessSetupNeeded(self, expected_setup_url, message):
673 try: 674 self.consumer._checkSetupNeeded(message) 675 except SetupNeededError, why: 676 self.failUnlessEqual(expected_setup_url, why.user_setup_url) 677 else: 678 self.fail("Expected to find an immediate-mode response")
679
680 - def test_setupNeededOpenID1(self):
681 """The minimum conditions necessary to trigger Setup Needed""" 682 setup_url = 'http://unittest/setup-here' 683 message = Message.fromPostArgs({ 684 'openid.mode': 'id_res', 685 'openid.user_setup_url': setup_url, 686 }) 687 self.failUnless(message.isOpenID1()) 688 self.failUnlessSetupNeeded(setup_url, message)
689
691 """Extra stuff along with setup_url still trigger Setup Needed""" 692 setup_url = 'http://unittest/setup-here' 693 message = Message.fromPostArgs({ 694 'openid.mode': 'id_res', 695 'openid.user_setup_url': setup_url, 696 'openid.identity': 'bogus', 697 }) 698 self.failUnless(message.isOpenID1()) 699 self.failUnlessSetupNeeded(setup_url, message)
700
701 - def test_noSetupNeededOpenID1(self):
702 """When the user_setup_url is missing on an OpenID 1 message, 703 we assume that it's not a cancel response to checkid_immediate""" 704 message = Message.fromOpenIDArgs({'mode': 'id_res'}) 705 self.failUnless(message.isOpenID1()) 706 707 # No SetupNeededError raised 708 self.consumer._checkSetupNeeded(message)
709
710 - def test_setupNeededOpenID2(self):
711 message = Message.fromOpenIDArgs({ 712 'mode':'setup_needed', 713 'ns':OPENID2_NS, 714 }) 715 self.failUnless(message.isOpenID2()) 716 response = self.consumer.complete(message, None, None) 717 self.failUnlessEqual('setup_needed', response.status) 718 self.failUnlessEqual(None, response.setup_url)
719
721 message = Message.fromOpenIDArgs({ 722 'mode':'setup_needed', 723 }) 724 725 # No SetupNeededError raised 726 self.consumer._checkSetupNeeded(message) 727 728 response = self.consumer.complete(message, None, None) 729 self.failUnlessEqual('failure', response.status) 730 self.failUnless(response.message.startswith('Invalid openid.mode'))
731
732 - def test_noSetupNeededOpenID2(self):
733 message = Message.fromOpenIDArgs({ 734 'mode':'id_res', 735 'game':'puerto_rico', 736 'ns':OPENID2_NS, 737 }) 738 self.failUnless(message.isOpenID2()) 739 740 # No SetupNeededError raised 741 self.consumer._checkSetupNeeded(message)
742
743 -class IdResCheckForFieldsTest(TestIdRes):
744 - def setUp(self):
745 self.consumer = GenericConsumer(None)
746
747 - def mkSuccessTest(openid_args, signed_list):
748 def test(self): 749 message = Message.fromOpenIDArgs(openid_args) 750 message.setArg(OPENID_NS, 'signed', ','.join(signed_list)) 751 self.consumer._idResCheckForFields(message)
752 return test
753 754 test_openid1Success = mkSuccessTest( 755 {'return_to':'return', 756 'assoc_handle':'assoc handle', 757 'sig':'a signature', 758 'identity':'someone', 759 }, 760 ['return_to', 'identity']) 761 762 test_openid2Success = mkSuccessTest( 763 {'ns':OPENID2_NS, 764 'return_to':'return', 765 'assoc_handle':'assoc handle', 766 'sig':'a signature', 767 'op_endpoint':'my favourite server', 768 'response_nonce':'use only once', 769 }, 770 ['return_to', 'response_nonce', 'assoc_handle', 'op_endpoint']) 771 772 test_openid2Success_identifiers = mkSuccessTest( 773 {'ns':OPENID2_NS, 774 'return_to':'return', 775 'assoc_handle':'assoc handle', 776 'sig':'a signature', 777 'claimed_id':'i claim to be me', 778 'identity':'my server knows me as me', 779 'op_endpoint':'my favourite server', 780 'response_nonce':'use only once', 781 }, 782 ['return_to', 'response_nonce', 'identity', 783 'claimed_id', 'assoc_handle', 'op_endpoint']) 784
785 - def mkMissingFieldTest(openid_args):
786 def test(self): 787 message = Message.fromOpenIDArgs(openid_args) 788 try: 789 self.consumer._idResCheckForFields(message) 790 except ProtocolError, why: 791 self.failUnless(why[0].startswith('Missing required')) 792 else: 793 self.fail('Expected an error, but none occurred')
794 return test 795
796 - def mkMissingSignedTest(openid_args):
797 def test(self): 798 message = Message.fromOpenIDArgs(openid_args) 799 try: 800 self.consumer._idResCheckForFields(message) 801 except ProtocolError, why: 802 self.failUnless(why[0].endswith('not signed')) 803 else: 804 self.fail('Expected an error, but none occurred')
805 return test 806 807 test_openid1Missing_returnToSig = mkMissingSignedTest( 808 {'return_to':'return', 809 'assoc_handle':'assoc handle', 810 'sig':'a signature', 811 'identity':'someone', 812 'signed':'identity', 813 }) 814 815 test_openid1Missing_identitySig = mkMissingSignedTest( 816 {'return_to':'return', 817 'assoc_handle':'assoc handle', 818 'sig':'a signature', 819 'identity':'someone', 820 'signed':'return_to' 821 }) 822 823 test_openid2Missing_opEndpointSig = mkMissingSignedTest( 824 {'ns':OPENID2_NS, 825 'return_to':'return', 826 'assoc_handle':'assoc handle', 827 'sig':'a signature', 828 'identity':'someone', 829 'op_endpoint':'the endpoint', 830 'signed':'return_to,identity,assoc_handle' 831 }) 832 833 test_openid1MissingReturnTo = mkMissingFieldTest( 834 {'assoc_handle':'assoc handle', 835 'sig':'a signature', 836 'identity':'someone', 837 }) 838 839 test_openid1MissingAssocHandle = mkMissingFieldTest( 840 {'return_to':'return', 841 'sig':'a signature', 842 'identity':'someone', 843 }) 844 845 # XXX: I could go on... 846
847 -class CheckAuthHappened(Exception): pass
848
849 -class CheckNonceVerifyTest(TestIdRes, CatchLogs):
850 - def setUp(self):
851 CatchLogs.setUp(self) 852 TestIdRes.setUp(self) 853 self.consumer.openid1_nonce_query_arg_name = 'nonce'
854
855 - def tearDown(self):
856 CatchLogs.tearDown(self)
857
858 - def test_openid1Success(self):
859 """use consumer-generated nonce""" 860 nonce_value = mkNonce() 861 self.return_to = 'http://rt.unittest/?nonce=%s' % (nonce_value,) 862 self.response = Message.fromOpenIDArgs({'return_to': self.return_to}) 863 self.response.setArg(BARE_NS, 'nonce', nonce_value) 864 self.consumer._idResCheckNonce(self.response, self.endpoint) 865 self.failUnlessLogEmpty()
866
867 - def test_openid1Missing(self):
868 """use consumer-generated nonce""" 869 self.response = Message.fromOpenIDArgs({}) 870 n = self.consumer._idResGetNonceOpenID1(self.response, self.endpoint) 871 self.failUnless(n is None, n) 872 self.failUnlessLogEmpty()
873
874 - def test_consumerNonceOpenID2(self):
875 """OpenID 2 does not use consumer-generated nonce""" 876 self.return_to = 'http://rt.unittest/?nonce=%s' % (mkNonce(),) 877 self.response = Message.fromOpenIDArgs( 878 {'return_to': self.return_to, 'ns':OPENID2_NS}) 879 self.failUnlessRaises(ProtocolError, self.consumer._idResCheckNonce, 880 self.response, self.endpoint) 881 self.failUnlessLogEmpty()
882
883 - def test_serverNonce(self):
884 """use server-generated nonce""" 885 self.response = Message.fromOpenIDArgs( 886 {'ns':OPENID2_NS, 'response_nonce': mkNonce(),}) 887 self.consumer._idResCheckNonce(self.response, self.endpoint) 888 self.failUnlessLogEmpty()
889
890 - def test_serverNonceOpenID1(self):
891 """OpenID 1 does not use server-generated nonce""" 892 self.response = Message.fromOpenIDArgs( 893 {'ns':OPENID1_NS, 894 'return_to': 'http://return.to/', 895 'response_nonce': mkNonce(),}) 896 self.failUnlessRaises(ProtocolError, self.consumer._idResCheckNonce, 897 self.response, self.endpoint) 898 self.failUnlessLogEmpty()
899
900 - def test_badNonce(self):
901 """remove the nonce from the store 902 903 From "Checking the Nonce":: 904 905 When the Relying Party checks the signature on an assertion, the 906 907 Relying Party SHOULD ensure that an assertion has not yet 908 been accepted with the same value for "openid.response_nonce" 909 from the same OP Endpoint URL. 910 """ 911 nonce = mkNonce() 912 stamp, salt = splitNonce(nonce) 913 self.store.useNonce(self.server_url, stamp, salt) 914 self.response = Message.fromOpenIDArgs( 915 {'response_nonce': nonce, 916 'ns':OPENID2_NS, 917 }) 918 self.failUnlessRaises(ProtocolError, self.consumer._idResCheckNonce, 919 self.response, self.endpoint)
920
921 - def test_successWithNoStore(self):
922 """When there is no store, checking the nonce succeeds""" 923 self.consumer.store = None 924 self.response = Message.fromOpenIDArgs( 925 {'response_nonce': mkNonce(), 926 'ns':OPENID2_NS, 927 }) 928 self.consumer._idResCheckNonce(self.response, self.endpoint) 929 self.failUnlessLogEmpty()
930
931 - def test_tamperedNonce(self):
932 """Malformed nonce""" 933 self.response = Message.fromOpenIDArgs( 934 {'ns':OPENID2_NS, 935 'response_nonce':'malformed'}) 936 self.failUnlessRaises(ProtocolError, self.consumer._idResCheckNonce, 937 self.response, self.endpoint)
938
939 - def test_missingNonce(self):
940 """no nonce parameter on the return_to""" 941 self.response = Message.fromOpenIDArgs( 942 {'return_to': self.return_to}) 943 self.failUnlessRaises(ProtocolError, self.consumer._idResCheckNonce, 944 self.response, self.endpoint)
945
946 -class CheckAuthDetectingConsumer(GenericConsumer):
947 - def _checkAuth(self, *args):
948 raise CheckAuthHappened(args)
949
950 - def _idResCheckNonce(self, *args):
951 """We're not testing nonce-checking, so just return success 952 when it asks.""" 953 return True
954
955 -class TestCheckAuthTriggered(TestIdRes, CatchLogs):
956 consumer_class = CheckAuthDetectingConsumer 957
958 - def setUp(self):
962
963 - def test_checkAuthTriggered(self):
964 message = Message.fromPostArgs({ 965 'openid.return_to':self.return_to, 966 'openid.identity':self.server_id, 967 'openid.assoc_handle':'not_found', 968 'openid.sig': GOODSIG, 969 'openid.signed': 'identity,return_to', 970 }) 971 self.disableReturnToChecking() 972 try: 973 result = self.consumer._doIdRes(message, self.endpoint, None) 974 except CheckAuthHappened: 975 pass 976 else: 977 self.fail('_checkAuth did not happen. Result was: %r %s' % 978 (result, self.messages))
979
981 # Store an association for this server that does not match the 982 # handle that is in the message 983 issued = time.time() 984 lifetime = 1000 985 assoc = association.Association( 986 'handle', 'secret', issued, lifetime, 'HMAC-SHA1') 987 self.store.storeAssociation(self.server_url, assoc) 988 self.disableReturnToChecking() 989 message = Message.fromPostArgs({ 990 'openid.return_to':self.return_to, 991 'openid.identity':self.server_id, 992 'openid.assoc_handle':'not_found', 993 'openid.sig': GOODSIG, 994 'openid.signed': 'identity,return_to', 995 }) 996 try: 997 result = self.consumer._doIdRes(message, self.endpoint, None) 998 except CheckAuthHappened: 999 pass 1000 else: 1001 self.fail('_checkAuth did not happen. Result was: %r' % (result,))
1002
1003 - def test_expiredAssoc(self):
1004 # Store an expired association for the server with the handle 1005 # that is in the message 1006 issued = time.time() - 10 1007 lifetime = 0 1008 handle = 'handle' 1009 assoc = association.Association( 1010 handle, 'secret', issued, lifetime, 'HMAC-SHA1') 1011 self.failUnless(assoc.expiresIn <= 0) 1012 self.store.storeAssociation(self.server_url, assoc) 1013 1014 message = Message.fromPostArgs({ 1015 'openid.return_to':self.return_to, 1016 'openid.identity':self.server_id, 1017 'openid.assoc_handle':handle, 1018 'openid.sig': GOODSIG, 1019 'openid.signed': 'identity,return_to', 1020 }) 1021 self.disableReturnToChecking() 1022 self.failUnlessRaises(ProtocolError, self.consumer._doIdRes, 1023 message, self.endpoint, None)
1024
1025 - def test_newerAssoc(self):
1026 lifetime = 1000 1027 1028 good_issued = time.time() - 10 1029 good_handle = 'handle' 1030 good_assoc = association.Association( 1031 good_handle, 'secret', good_issued, lifetime, 'HMAC-SHA1') 1032 self.store.storeAssociation(self.server_url, good_assoc) 1033 1034 bad_issued = time.time() - 5 1035 bad_handle = 'handle2' 1036 bad_assoc = association.Association( 1037 bad_handle, 'secret', bad_issued, lifetime, 'HMAC-SHA1') 1038 self.store.storeAssociation(self.server_url, bad_assoc) 1039 1040 query = { 1041 'return_to':self.return_to, 1042 'identity':self.server_id, 1043 'assoc_handle':good_handle, 1044 } 1045 1046 message = Message.fromOpenIDArgs(query) 1047 message = good_assoc.signMessage(message) 1048 self.disableReturnToChecking() 1049 info = self.consumer._doIdRes(message, self.endpoint, None) 1050 self.failUnlessEqual(info.status, SUCCESS, info.message) 1051 self.failUnlessEqual(self.consumer_id, info.identity_url)
1052 1053 1054
1055 -class TestReturnToArgs(unittest.TestCase):
1056 """Verifying the Return URL paramaters. 1057 From the specification "Verifying the Return URL":: 1058 1059 To verify that the "openid.return_to" URL matches the URL that is 1060 processing this assertion: 1061 1062 - The URL scheme, authority, and path MUST be the same between the 1063 two URLs. 1064 1065 - Any query parameters that are present in the "openid.return_to" 1066 URL MUST also be present with the same values in the 1067 accepting URL. 1068 1069 XXX: So far we have only tested the second item on the list above. 1070 XXX: _verifyReturnToArgs is not invoked anywhere. 1071 """ 1072
1073 - def setUp(self):
1074 store = object() 1075 self.consumer = GenericConsumer(store)
1076
1077 - def test_returnToArgsOkay(self):
1078 query = { 1079 'openid.mode': 'id_res', 1080 'openid.return_to': 'http://example.com/?foo=bar', 1081 'foo': 'bar', 1082 } 1083 # no return value, success is assumed if there are no exceptions. 1084 self.consumer._verifyReturnToArgs(query)
1085
1087 query = { 1088 'openid.mode': 'id_res', 1089 'openid.return_to': 'http://example.com/', 1090 'foo': 'bar', 1091 } 1092 # no return value, success is assumed if there are no exceptions. 1093 self.failUnlessRaises(ProtocolError, 1094 self.consumer._verifyReturnToArgs, query)
1095
1096 - def test_returnToMismatch(self):
1097 query = { 1098 'openid.mode': 'id_res', 1099 'openid.return_to': 'http://example.com/?foo=bar', 1100 } 1101 # fail, query has no key 'foo'. 1102 self.failUnlessRaises(ValueError, 1103 self.consumer._verifyReturnToArgs, query) 1104 1105 query['foo'] = 'baz' 1106 # fail, values for 'foo' do not match. 1107 self.failUnlessRaises(ValueError, 1108 self.consumer._verifyReturnToArgs, query)
1109 1110
1111 - def test_noReturnTo(self):
1112 query = {'openid.mode': 'id_res'} 1113 self.failUnlessRaises(ValueError, 1114 self.consumer._verifyReturnToArgs, query)
1115
1116 - def test_completeBadReturnTo(self):
1117 """Test GenericConsumer.complete()'s handling of bad return_to 1118 values. 1119 """ 1120 return_to = "http://some.url/path?foo=bar" 1121 1122 # Scheme, authority, and path differences are checked by 1123 # GenericConsumer._checkReturnTo. Query args checked by 1124 # GenericConsumer._verifyReturnToArgs. 1125 bad_return_tos = [ 1126 # Scheme only 1127 "https://some.url/path?foo=bar", 1128 # Authority only 1129 "http://some.url.invalid/path?foo=bar", 1130 # Path only 1131 "http://some.url/path_extra?foo=bar", 1132 # Query args differ 1133 "http://some.url/path?foo=bar2", 1134 "http://some.url/path?foo2=bar", 1135 ] 1136 1137 m = Message(OPENID1_NS) 1138 m.setArg(OPENID_NS, 'mode', 'cancel') 1139 m.setArg(BARE_NS, 'foo', 'bar') 1140 endpoint = None 1141 1142 for bad in bad_return_tos: 1143 m.setArg(OPENID_NS, 'return_to', bad) 1144 self.failIf(self.consumer._checkReturnTo(m, return_to))
1145
1146 - def test_completeGoodReturnTo(self):
1147 """Test GenericConsumer.complete()'s handling of good 1148 return_to values. 1149 """ 1150 return_to = "http://some.url/path" 1151 1152 good_return_tos = [ 1153 (return_to, {}), 1154 (return_to + "?another=arg", {(BARE_NS, 'another'): 'arg'}), 1155 (return_to + "?another=arg#fragment", {(BARE_NS, 'another'): 'arg'}), 1156 ("HTTP"+return_to[4:], {}), 1157 (return_to.replace('url','URL'), {}), 1158 ("http://some.url:80/path", {}), 1159 ("http://some.url/p%61th", {}), 1160 ("http://some.url/./path", {}), 1161 ] 1162 1163 endpoint = None 1164 1165 for good, extra in good_return_tos: 1166 m = Message(OPENID1_NS) 1167 m.setArg(OPENID_NS, 'mode', 'cancel') 1168 1169 for ns, key in extra: 1170 m.setArg(ns, key, extra[(ns, key)]) 1171 1172 m.setArg(OPENID_NS, 'return_to', good) 1173 result = self.consumer.complete(m, endpoint, return_to) 1174 self.failUnless(isinstance(result, CancelResponse), \ 1175 "Expected CancelResponse, got %r for %s" % (result, good,))
1176
1177 -class MockFetcher(object):
1178 - def __init__(self, response=None):
1179 self.response = response or HTTPResponse() 1180 self.fetches = []
1181
1182 - def fetch(self, url, body=None, headers=None):
1183 self.fetches.append((url, body, headers)) 1184 return self.response
1185
1186 -class ExceptionRaisingMockFetcher(object):
1187 - class MyException(Exception):
1188 pass
1189
1190 - def fetch(self, url, body=None, headers=None):
1191 raise self.MyException('mock fetcher exception')
1192
1193 -class BadArgCheckingConsumer(GenericConsumer):
1194 - def _makeKVPost(self, args, _):
1195 assert args == { 1196 'openid.mode':'check_authentication', 1197 'openid.signed':'foo', 1198 'openid.ns':OPENID1_NS 1199 }, args 1200 return None
1201
1202 -class TestCheckAuth(unittest.TestCase, CatchLogs):
1203 consumer_class = GenericConsumer 1204
1205 - def setUp(self):
1206 CatchLogs.setUp(self) 1207 self.store = memstore.MemoryStore() 1208 1209 self.consumer = self.consumer_class(self.store) 1210 1211 self._orig_fetcher = fetchers.getDefaultFetcher() 1212 self.fetcher = MockFetcher() 1213 fetchers.setDefaultFetcher(self.fetcher)
1214
1215 - def tearDown(self):
1216 CatchLogs.tearDown(self) 1217 fetchers.setDefaultFetcher(self._orig_fetcher, wrap_exceptions=False)
1218
1219 - def test_error(self):
1220 self.fetcher.response = HTTPResponse( 1221 "http://some_url", 404, {'Hea': 'der'}, 'blah:blah\n') 1222 query = {'openid.signed': 'stuff', 1223 'openid.stuff':'a value'} 1224 r = self.consumer._checkAuth(Message.fromPostArgs(query), 1225 http_server_url) 1226 self.failIf(r) 1227 self.failUnless(self.messages)
1228
1229 - def test_bad_args(self):
1230 query = { 1231 'openid.signed':'foo', 1232 'closid.foo':'something', 1233 } 1234 consumer = BadArgCheckingConsumer(self.store) 1235 consumer._checkAuth(Message.fromPostArgs(query), 'does://not.matter')
1236 1237
1238 - def test_signedList(self):
1239 query = Message.fromOpenIDArgs({ 1240 'mode': 'id_res', 1241 'sig': 'rabbits', 1242 'identity': '=example', 1243 'assoc_handle': 'munchkins', 1244 'ns.sreg': 'urn:sreg', 1245 'sreg.email': 'bogus@example.com', 1246 'signed': 'identity,mode,ns.sreg,sreg.email', 1247 'foo': 'bar', 1248 }) 1249 args = self.consumer._createCheckAuthRequest(query) 1250 self.failUnless(args.isOpenID1()) 1251 for signed_arg in query.getArg(OPENID_NS, 'signed').split(','): 1252 self.failUnless(args.getAliasedArg(signed_arg), signed_arg)
1253
1254 - def test_112(self):
1255 args = {'openid.assoc_handle': 'fa1f5ff0-cde4-11dc-a183-3714bfd55ca8', 1256 'openid.claimed_id': 'http://binkley.lan/user/test01', 1257 'openid.identity': 'http://test01.binkley.lan/', 1258 'openid.mode': 'id_res', 1259 'openid.ns': 'http://specs.openid.net/auth/2.0', 1260 'openid.ns.pape': 'http://specs.openid.net/extensions/pape/1.0', 1261 'openid.op_endpoint': 'http://binkley.lan/server', 1262 'openid.pape.auth_policies': 'none', 1263 'openid.pape.auth_time': '2008-01-28T20:42:36Z', 1264 'openid.pape.nist_auth_level': '0', 1265 'openid.response_nonce': '2008-01-28T21:07:04Z99Q=', 1266 'openid.return_to': 'http://binkley.lan:8001/process?janrain_nonce=2008-01-28T21%3A07%3A02Z0tMIKx', 1267 'openid.sig': 'YJlWH4U6SroB1HoPkmEKx9AyGGg=', 1268 'openid.signed': 'assoc_handle,identity,response_nonce,return_to,claimed_id,op_endpoint,pape.auth_time,ns.pape,pape.nist_auth_level,pape.auth_policies' 1269 } 1270 self.failUnlessEqual(OPENID2_NS, args['openid.ns']) 1271 incoming = Message.fromPostArgs(args) 1272 self.failUnless(incoming.isOpenID2()) 1273 car = self.consumer._createCheckAuthRequest(incoming) 1274 expected_args = args.copy() 1275 expected_args['openid.mode'] = 'check_authentication' 1276 expected =Message.fromPostArgs(expected_args) 1277 self.failUnless(expected.isOpenID2()) 1278 self.failUnlessEqual(expected, car) 1279 self.failUnlessEqual(expected_args, car.toPostArgs())
1280 1281 1282
1283 -class TestFetchAssoc(unittest.TestCase, CatchLogs):
1284 consumer_class = GenericConsumer 1285
1286 - def setUp(self):
1287 CatchLogs.setUp(self) 1288 self.store = memstore.MemoryStore() 1289 self.fetcher = MockFetcher() 1290 fetchers.setDefaultFetcher(self.fetcher) 1291 self.consumer = self.consumer_class(self.store)
1292
1293 - def test_error_404(self):
1294 """404 from a kv post raises HTTPFetchingError""" 1295 self.fetcher.response = HTTPResponse( 1296 "http://some_url", 404, {'Hea': 'der'}, 'blah:blah\n') 1297 self.failUnlessRaises( 1298 fetchers.HTTPFetchingError, 1299 self.consumer._makeKVPost, 1300 Message.fromPostArgs({'mode':'associate'}), 1301 "http://server_url")
1302
1304 """Ensure that exceptions are bubbled through from fetchers 1305 when making associations 1306 """ 1307 self.fetcher = ExceptionRaisingMockFetcher() 1308 fetchers.setDefaultFetcher(self.fetcher, wrap_exceptions=False) 1309 self.failUnlessRaises(self.fetcher.MyException, 1310 self.consumer._makeKVPost, 1311 Message.fromPostArgs({'mode':'associate'}), 1312 "http://server_url") 1313 1314 # exception fetching returns no association 1315 e = OpenIDServiceEndpoint() 1316 e.server_url = 'some://url' 1317 self.failUnlessRaises(self.fetcher.MyException, 1318 self.consumer._getAssociation, e) 1319 1320 self.failUnlessRaises(self.fetcher.MyException, 1321 self.consumer._checkAuth, 1322 Message.fromPostArgs({'openid.signed':''}), 1323 'some://url')
1324
1326 """Ensure that openid.fetchers.HTTPFetchingError is caught by 1327 the association creation stuff. 1328 """ 1329 self.fetcher = ExceptionRaisingMockFetcher() 1330 # This will wrap exceptions! 1331 fetchers.setDefaultFetcher(self.fetcher) 1332 self.failUnlessRaises(fetchers.HTTPFetchingError, 1333 self.consumer._makeKVPost, 1334 Message.fromOpenIDArgs({'mode':'associate'}), 1335 "http://server_url") 1336 1337 # exception fetching returns no association 1338 e = OpenIDServiceEndpoint() 1339 e.server_url = 'some://url' 1340 self.failUnless(self.consumer._getAssociation(e) is None) 1341 1342 msg = Message.fromPostArgs({'openid.signed':''}) 1343 self.failIf(self.consumer._checkAuth(msg, 'some://url'))
1344 1345
1346 -class TestSuccessResponse(unittest.TestCase):
1347 - def setUp(self):
1348 self.endpoint = OpenIDServiceEndpoint() 1349 self.endpoint.claimed_id = 'identity_url'
1350
1351 - def test_extensionResponse(self):
1352 resp = mkSuccess(self.endpoint, { 1353 'ns.sreg':'urn:sreg', 1354 'ns.unittest':'urn:unittest', 1355 'unittest.one':'1', 1356 'unittest.two':'2', 1357 'sreg.nickname':'j3h', 1358 'return_to':'return_to', 1359 }) 1360 utargs = resp.extensionResponse('urn:unittest', False) 1361 self.failUnlessEqual(utargs, {'one':'1', 'two':'2'}) 1362 sregargs = resp.extensionResponse('urn:sreg', False) 1363 self.failUnlessEqual(sregargs, {'nickname':'j3h'})
1364
1366 args = { 1367 'ns.sreg':'urn:sreg', 1368 'ns.unittest':'urn:unittest', 1369 'unittest.one':'1', 1370 'unittest.two':'2', 1371 'sreg.nickname':'j3h', 1372 'sreg.dob':'yesterday', 1373 'return_to':'return_to', 1374 'signed': 'sreg.nickname,unittest.one,sreg.dob', 1375 } 1376 1377 signed_list = ['openid.sreg.nickname', 1378 'openid.unittest.one', 1379 'openid.sreg.dob',] 1380 1381 # Don't use mkSuccess because it creates an all-inclusive 1382 # signed list. 1383 msg = Message.fromOpenIDArgs(args) 1384 resp = SuccessResponse(self.endpoint, msg, signed_list) 1385 1386 # All args in this NS are signed, so expect all. 1387 sregargs = resp.extensionResponse('urn:sreg', True) 1388 self.failUnlessEqual(sregargs, {'nickname':'j3h', 'dob': 'yesterday'}) 1389 1390 # Not all args in this NS are signed, so expect None when 1391 # asking for them. 1392 utargs = resp.extensionResponse('urn:unittest', True) 1393 self.failUnlessEqual(utargs, None)
1394
1395 - def test_noReturnTo(self):
1396 resp = mkSuccess(self.endpoint, {}) 1397 self.failUnless(resp.getReturnTo() is None)
1398
1399 - def test_returnTo(self):
1400 resp = mkSuccess(self.endpoint, {'return_to':'return_to'}) 1401 self.failUnlessEqual(resp.getReturnTo(), 'return_to')
1402
1404 resp = mkSuccess(self.endpoint, {}) 1405 self.failUnlessEqual(resp.getDisplayIdentifier(), 1406 resp.endpoint.claimed_id)
1407
1409 self.endpoint.display_identifier = "http://input.url/" 1410 resp = mkSuccess(self.endpoint, {}) 1411 self.failUnlessEqual(resp.getDisplayIdentifier(), 1412 "http://input.url/")
1413
1414 -class StubConsumer(object):
1415 - def __init__(self):
1416 self.assoc = object() 1417 self.response = None 1418 self.endpoint = None
1419
1420 - def begin(self, service):
1421 auth_req = AuthRequest(service, self.assoc) 1422 self.endpoint = service 1423 return auth_req
1424
1425 - def complete(self, message, endpoint, return_to):
1426 assert endpoint is self.endpoint 1427 return self.response
1428
1429 -class ConsumerTest(unittest.TestCase):
1430 """Tests for high-level consumer.Consumer functions. 1431 1432 Its GenericConsumer component is stubbed out with StubConsumer. 1433 """
1434 - def setUp(self):
1435 self.endpoint = OpenIDServiceEndpoint() 1436 self.endpoint.claimed_id = self.identity_url = 'http://identity.url/' 1437 self.store = None 1438 self.session = {} 1439 self.consumer = Consumer(self.session, self.store) 1440 self.consumer.consumer = StubConsumer() 1441 self.discovery = Discovery(self.session, 1442 self.identity_url, 1443 self.consumer.session_key_prefix)
1444
1446 self.consumer.setAssociationPreference([]) 1447 self.failUnless(isinstance(self.consumer.consumer.negotiator, 1448 association.SessionNegotiator)) 1449 self.failUnlessEqual([], 1450 self.consumer.consumer.negotiator.allowed_types) 1451 self.consumer.setAssociationPreference([('HMAC-SHA1', 'DH-SHA1')]) 1452 self.failUnlessEqual([('HMAC-SHA1', 'DH-SHA1')], 1453 self.consumer.consumer.negotiator.allowed_types)
1454
1455 - def withDummyDiscovery(self, callable, dummy_getNextService):
1456 class DummyDisco(object): 1457 def __init__(self, *ignored): 1458 pass
1459 1460 getNextService = dummy_getNextService
1461 1462 import openid.consumer.consumer 1463 old_discovery = openid.consumer.consumer.Discovery 1464 try: 1465 openid.consumer.consumer.Discovery = DummyDisco 1466 callable() 1467 finally: 1468 openid.consumer.consumer.Discovery = old_discovery 1469
1470 - def test_beginHTTPError(self):
1471 """Make sure that the discovery HTTP failure case behaves properly 1472 """ 1473 def getNextService(self, ignored): 1474 raise HTTPFetchingError("Unit test")
1475 1476 def test(): 1477 try: 1478 self.consumer.begin('unused in this test') 1479 except DiscoveryFailure, why: 1480 self.failUnless(why[0].startswith('Error fetching')) 1481 self.failIf(why[0].find('Unit test') == -1) 1482 else: 1483 self.fail('Expected DiscoveryFailure') 1484 1485 self.withDummyDiscovery(test, getNextService) 1486
1487 - def test_beginNoServices(self):
1488 def getNextService(self, ignored): 1489 return None
1490 1491 url = 'http://a.user.url/' 1492 def test(): 1493 try: 1494 self.consumer.begin(url) 1495 except DiscoveryFailure, why: 1496 self.failUnless(why[0].startswith('No usable OpenID')) 1497 self.failIf(why[0].find(url) == -1) 1498 else: 1499 self.fail('Expected DiscoveryFailure') 1500 1501 self.withDummyDiscovery(test, getNextService) 1502 1503
1504 - def test_beginWithoutDiscovery(self):
1505 # Does this really test anything non-trivial? 1506 result = self.consumer.beginWithoutDiscovery(self.endpoint) 1507 1508 # The result is an auth request 1509 self.failUnless(isinstance(result, AuthRequest)) 1510 1511 # Side-effect of calling beginWithoutDiscovery is setting the 1512 # session value to the endpoint attribute of the result 1513 self.failUnless(self.session[self.consumer._token_key] is result.endpoint) 1514 1515 # The endpoint that we passed in is the endpoint on the auth_request 1516 self.failUnless(result.endpoint is self.endpoint)
1517
1518 - def test_completeEmptySession(self):
1519 text = "failed complete" 1520 1521 def checkEndpoint(message, endpoint, return_to): 1522 self.failUnless(endpoint is None) 1523 return FailureResponse(endpoint, text)
1524 1525 self.consumer.consumer.complete = checkEndpoint 1526 1527 response = self.consumer.complete({}, None) 1528 self.failUnlessEqual(response.status, FAILURE) 1529 self.failUnlessEqual(response.message, text) 1530 self.failUnless(response.identity_url is None) 1531
1532 - def _doResp(self, auth_req, exp_resp):
1533 """complete a transaction, using the expected response from 1534 the generic consumer.""" 1535 # response is an attribute of StubConsumer, returned by 1536 # StubConsumer.complete. 1537 self.consumer.consumer.response = exp_resp 1538 1539 # endpoint is stored in the session 1540 self.failUnless(self.session) 1541 resp = self.consumer.complete({}, None) 1542 1543 # All responses should have the same identity URL, and the 1544 # session should be cleaned out 1545 if self.endpoint.claimed_id != IDENTIFIER_SELECT: 1546 self.failUnless(resp.identity_url is self.identity_url) 1547 1548 self.failIf(self.consumer._token_key in self.session) 1549 1550 # Expected status response 1551 self.failUnlessEqual(resp.status, exp_resp.status) 1552 1553 return resp
1554
1555 - def _doRespNoDisco(self, exp_resp):
1556 """Set up a transaction without discovery""" 1557 auth_req = self.consumer.beginWithoutDiscovery(self.endpoint) 1558 resp = self._doResp(auth_req, exp_resp) 1559 # There should be nothing left in the session once we have completed. 1560 self.failIf(self.session) 1561 return resp
1562
1563 - def test_noDiscoCompleteSuccessWithToken(self):
1564 self._doRespNoDisco(mkSuccess(self.endpoint, {}))
1565
1566 - def test_noDiscoCompleteCancelWithToken(self):
1567 self._doRespNoDisco(CancelResponse(self.endpoint))
1568
1569 - def test_noDiscoCompleteFailure(self):
1570 msg = 'failed!' 1571 resp = self._doRespNoDisco(FailureResponse(self.endpoint, msg)) 1572 self.failUnless(resp.message is msg)
1573
1574 - def test_noDiscoCompleteSetupNeeded(self):
1575 setup_url = 'http://setup.url/' 1576 resp = self._doRespNoDisco( 1577 SetupNeededResponse(self.endpoint, setup_url)) 1578 self.failUnless(resp.setup_url is setup_url)
1579 1580 # To test that discovery is cleaned up, we need to initialize a 1581 # Yadis manager, and have it put its values in the session.
1582 - def _doRespDisco(self, is_clean, exp_resp):
1583 """Set up and execute a transaction, with discovery""" 1584 self.discovery.createManager([self.endpoint], self.identity_url) 1585 auth_req = self.consumer.begin(self.identity_url) 1586 resp = self._doResp(auth_req, exp_resp) 1587 1588 manager = self.discovery.getManager() 1589 if is_clean: 1590 self.failUnless(self.discovery.getManager() is None, manager) 1591 else: 1592 self.failIf(self.discovery.getManager() is None, manager) 1593 1594 return resp
1595 1596 # Cancel and success DO clean up the discovery process
1597 - def test_completeSuccess(self):
1598 self._doRespDisco(True, mkSuccess(self.endpoint, {}))
1599
1600 - def test_completeCancel(self):
1601 self._doRespDisco(True, CancelResponse(self.endpoint))
1602 1603 # Failure and setup_needed don't clean up the discovery process
1604 - def test_completeFailure(self):
1605 msg = 'failed!' 1606 resp = self._doRespDisco(False, FailureResponse(self.endpoint, msg)) 1607 self.failUnless(resp.message is msg)
1608
1609 - def test_completeSetupNeeded(self):
1610 setup_url = 'http://setup.url/' 1611 resp = self._doRespDisco( 1612 False, 1613 SetupNeededResponse(self.endpoint, setup_url)) 1614 self.failUnless(resp.setup_url is setup_url)
1615
1616 - def test_successDifferentURL(self):
1617 """ 1618 Be sure that the session gets cleaned up when the response is 1619 successful and has a different URL than the one in the 1620 request. 1621 """ 1622 # Set up a request endpoint describing an IDP URL 1623 self.identity_url = 'http://idp.url/' 1624 self.endpoint.claimed_id = self.endpoint.local_id = IDENTIFIER_SELECT 1625 1626 # Use a response endpoint with a different URL (asserted by 1627 # the IDP) 1628 resp_endpoint = OpenIDServiceEndpoint() 1629 resp_endpoint.claimed_id = "http://user.url/" 1630 1631 resp = self._doRespDisco( 1632 True, 1633 mkSuccess(resp_endpoint, {})) 1634 self.failUnless(self.discovery.getManager(force=True) is None)
1635
1636 - def test_begin(self):
1637 self.discovery.createManager([self.endpoint], self.identity_url) 1638 # Should not raise an exception 1639 auth_req = self.consumer.begin(self.identity_url) 1640 self.failUnless(isinstance(auth_req, AuthRequest)) 1641 self.failUnless(auth_req.endpoint is self.endpoint) 1642 self.failUnless(auth_req.endpoint is self.consumer.consumer.endpoint) 1643 self.failUnless(auth_req.assoc is self.consumer.consumer.assoc)
1644 1645 1646
1647 -class IDPDrivenTest(unittest.TestCase):
1648
1649 - def setUp(self):
1650 self.store = GoodAssocStore() 1651 self.consumer = GenericConsumer(self.store) 1652 self.endpoint = OpenIDServiceEndpoint() 1653 self.endpoint.server_url = "http://idp.unittest/"
1654 1655
1656 - def test_idpDrivenBegin(self):
1657 # Testing here that the token-handling doesn't explode... 1658 self.consumer.begin(self.endpoint)
1659 1660
1661 - def test_idpDrivenComplete(self):
1662 identifier = '=directed_identifier' 1663 message = Message.fromPostArgs({ 1664 'openid.identity': '=directed_identifier', 1665 'openid.return_to': 'x', 1666 'openid.assoc_handle': 'z', 1667 'openid.signed': 'identity,return_to', 1668 'openid.sig': GOODSIG, 1669 }) 1670 1671 discovered_endpoint = OpenIDServiceEndpoint() 1672 discovered_endpoint.claimed_id = identifier 1673 discovered_endpoint.server_url = self.endpoint.server_url 1674 discovered_endpoint.local_id = identifier 1675 iverified = [] 1676 def verifyDiscoveryResults(identifier, endpoint): 1677 self.failUnless(endpoint is self.endpoint) 1678 iverified.append(discovered_endpoint) 1679 return discovered_endpoint
1680 self.consumer._verifyDiscoveryResults = verifyDiscoveryResults 1681 self.consumer._idResCheckNonce = lambda *args: True 1682 self.consumer._checkReturnTo = lambda unused1, unused2 : True 1683 response = self.consumer._doIdRes(message, self.endpoint, None) 1684 1685 self.failUnlessSuccess(response) 1686 self.failUnlessEqual(response.identity_url, "=directed_identifier") 1687 1688 # assert that discovery attempt happens and returns good 1689 self.failUnlessEqual(iverified, [discovered_endpoint])
1690 1691
1692 - def test_idpDrivenCompleteFraud(self):
1693 # crap with an identifier that doesn't match discovery info 1694 message = Message.fromPostArgs({ 1695 'openid.identity': '=directed_identifier', 1696 'openid.return_to': 'x', 1697 'openid.assoc_handle': 'z', 1698 'openid.signed': 'identity,return_to', 1699 'openid.sig': GOODSIG, 1700 }) 1701 def verifyDiscoveryResults(identifier, endpoint): 1702 raise DiscoveryFailure("PHREAK!", None)
1703 self.consumer._verifyDiscoveryResults = verifyDiscoveryResults 1704 self.consumer._checkReturnTo = lambda unused1, unused2 : True 1705 self.failUnlessRaises(DiscoveryFailure, self.consumer._doIdRes, 1706 message, self.endpoint, None) 1707 1708
1709 - def failUnlessSuccess(self, response):
1710 if response.status != SUCCESS: 1711 self.fail("Non-successful response: %s" % (response,))
1712 1713 1714
1715 -class TestDiscoveryVerification(unittest.TestCase):
1716 services = [] 1717
1718 - def setUp(self):
1719 self.store = GoodAssocStore() 1720 self.consumer = GenericConsumer(self.store) 1721 1722 self.consumer._discover = self.discoveryFunc 1723 1724 self.identifier = "http://idp.unittest/1337" 1725 self.server_url = "http://endpoint.unittest/" 1726 1727 self.message = Message.fromPostArgs({ 1728 'openid.ns': OPENID2_NS, 1729 'openid.identity': self.identifier, 1730 'openid.claimed_id': self.identifier, 1731 'openid.op_endpoint': self.server_url, 1732 }) 1733 1734 self.endpoint = OpenIDServiceEndpoint() 1735 self.endpoint.server_url = self.server_url
1736
1737 - def test_theGoodStuff(self):
1738 endpoint = OpenIDServiceEndpoint() 1739 endpoint.type_uris = [OPENID_2_0_TYPE] 1740 endpoint.claimed_id = self.identifier 1741 endpoint.server_url = self.server_url 1742 endpoint.local_id = self.identifier 1743 self.services = [endpoint] 1744 r = self.consumer._verifyDiscoveryResults(self.message, endpoint) 1745 1746 self.failUnlessEqual(r, endpoint)
1747 1748
1749 - def test_otherServer(self):
1750 text = "verify failed" 1751 1752 def discoverAndVerify(claimed_id, to_match_endpoints): 1753 self.failUnlessEqual(claimed_id, self.identifier) 1754 for to_match in to_match_endpoints: 1755 self.failUnlessEqual(claimed_id, to_match.claimed_id) 1756 raise ProtocolError(text)
1757 1758 self.consumer._discoverAndVerify = discoverAndVerify 1759 1760 # a set of things without the stuff 1761 endpoint = OpenIDServiceEndpoint() 1762 endpoint.type_uris = [OPENID_2_0_TYPE] 1763 endpoint.claimed_id = self.identifier 1764 endpoint.server_url = "http://the-MOON.unittest/" 1765 endpoint.local_id = self.identifier 1766 self.services = [endpoint] 1767 try: 1768 r = self.consumer._verifyDiscoveryResults(self.message, endpoint) 1769 except ProtocolError, e: 1770 # Should we make more ProtocolError subclasses? 1771 self.failUnless(str(e), text) 1772 else: 1773 self.fail("expected ProtocolError, %r returned." % (r,))
1774 1775
1776 - def test_foreignDelegate(self):
1777 text = "verify failed" 1778 1779 def discoverAndVerify(claimed_id, to_match_endpoints): 1780 self.failUnlessEqual(claimed_id, self.identifier) 1781 for to_match in to_match_endpoints: 1782 self.failUnlessEqual(claimed_id, to_match.claimed_id) 1783 raise ProtocolError(text)
1784 1785 self.consumer._discoverAndVerify = discoverAndVerify 1786 1787 # a set of things with the server stuff but other delegate 1788 endpoint = OpenIDServiceEndpoint() 1789 endpoint.type_uris = [OPENID_2_0_TYPE] 1790 endpoint.claimed_id = self.identifier 1791 endpoint.server_url = self.server_url 1792 endpoint.local_id = "http://unittest/juan-carlos" 1793 1794 try: 1795 r = self.consumer._verifyDiscoveryResults(self.message, endpoint) 1796 except ProtocolError, e: 1797 self.failUnlessEqual(str(e), text) 1798 else: 1799 self.fail("Exepected ProtocolError, %r returned" % (r,)) 1800
1801 - def test_nothingDiscovered(self):
1802 # a set of no things. 1803 self.services = [] 1804 self.failUnlessRaises(DiscoveryFailure, 1805 self.consumer._verifyDiscoveryResults, 1806 self.message, self.endpoint)
1807 1808
1809 - def discoveryFunc(self, identifier):
1810 return identifier, self.services
1811 1812
1813 -class TestCreateAssociationRequest(unittest.TestCase):
1814 - def setUp(self):
1815 class DummyEndpoint(object): 1816 use_compatibility = False 1817 1818 def compatibilityMode(self): 1819 return self.use_compatibility
1820 1821 self.endpoint = DummyEndpoint() 1822 self.consumer = GenericConsumer(store=None) 1823 self.assoc_type = 'HMAC-SHA1' 1824
1825 - def test_noEncryptionSendsType(self):
1826 session_type = 'no-encryption' 1827 session, args = self.consumer._createAssociateRequest( 1828 self.endpoint, self.assoc_type, session_type) 1829 1830 self.failUnless(isinstance(session, PlainTextConsumerSession)) 1831 expected = Message.fromOpenIDArgs( 1832 {'ns':OPENID2_NS, 1833 'session_type':session_type, 1834 'mode':'associate', 1835 'assoc_type':self.assoc_type, 1836 }) 1837 1838 self.failUnlessEqual(expected, args)
1839
1840 - def test_noEncryptionCompatibility(self):
1841 self.endpoint.use_compatibility = True 1842 session_type = 'no-encryption' 1843 session, args = self.consumer._createAssociateRequest( 1844 self.endpoint, self.assoc_type, session_type) 1845 1846 self.failUnless(isinstance(session, PlainTextConsumerSession)) 1847 self.failUnlessEqual(Message.fromOpenIDArgs({'mode':'associate', 1848 'assoc_type':self.assoc_type, 1849 }), args)
1850
1851 - def test_dhSHA1Compatibility(self):
1852 # Set the consumer's session type to a fast session since we 1853 # need it here. 1854 setConsumerSession(self.consumer) 1855 1856 self.endpoint.use_compatibility = True 1857 session_type = 'DH-SHA1' 1858 session, args = self.consumer._createAssociateRequest( 1859 self.endpoint, self.assoc_type, session_type) 1860 1861 self.failUnless(isinstance(session, DiffieHellmanSHA1ConsumerSession)) 1862 1863 # This is a random base-64 value, so just check that it's 1864 # present. 1865 self.failUnless(args.getArg(OPENID1_NS, 'dh_consumer_public')) 1866 args.delArg(OPENID1_NS, 'dh_consumer_public') 1867 1868 # OK, session_type is set here and not for no-encryption 1869 # compatibility 1870 expected = Message.fromOpenIDArgs({'mode':'associate', 1871 'session_type':'DH-SHA1', 1872 'assoc_type':self.assoc_type, 1873 'dh_modulus': 'BfvStQ==', 1874 'dh_gen': 'Ag==', 1875 }) 1876 1877 self.failUnlessEqual(expected, args)
1878 1879 # XXX: test the other types 1880
1881 -class TestDiffieHellmanResponseParameters(object):
1882 session_cls = None 1883 message_namespace = None 1884
1885 - def setUp(self):
1886 # Pre-compute DH with small prime so tests run quickly. 1887 self.server_dh = DiffieHellman(100389557, 2) 1888 self.consumer_dh = DiffieHellman(100389557, 2) 1889 1890 # base64(btwoc(g ^ xb mod p)) 1891 self.dh_server_public = cryptutil.longToBase64(self.server_dh.public) 1892 1893 self.secret = cryptutil.randomString(self.session_cls.secret_size) 1894 1895 self.enc_mac_key = oidutil.toBase64( 1896 self.server_dh.xorSecret(self.consumer_dh.public, 1897 self.secret, 1898 self.session_cls.hash_func)) 1899 1900 self.consumer_session = self.session_cls(self.consumer_dh) 1901 1902 self.msg = Message(self.message_namespace)
1903
1904 - def testExtractSecret(self):
1905 self.msg.setArg(OPENID_NS, 'dh_server_public', self.dh_server_public) 1906 self.msg.setArg(OPENID_NS, 'enc_mac_key', self.enc_mac_key) 1907 1908 extracted = self.consumer_session.extractSecret(self.msg) 1909 self.failUnlessEqual(extracted, self.secret)
1910
1911 - def testAbsentServerPublic(self):
1912 self.msg.setArg(OPENID_NS, 'enc_mac_key', self.enc_mac_key) 1913 1914 self.failUnlessRaises(KeyError, self.consumer_session.extractSecret, self.msg)
1915
1916 - def testAbsentMacKey(self):
1917 self.msg.setArg(OPENID_NS, 'dh_server_public', self.dh_server_public) 1918 1919 self.failUnlessRaises(KeyError, self.consumer_session.extractSecret, self.msg)
1920
1921 - def testInvalidBase64Public(self):
1922 self.msg.setArg(OPENID_NS, 'dh_server_public', 'n o t b a s e 6 4.') 1923 self.msg.setArg(OPENID_NS, 'enc_mac_key', self.enc_mac_key) 1924 1925 self.failUnlessRaises(ValueError, self.consumer_session.extractSecret, self.msg)
1926
1927 - def testInvalidBase64MacKey(self):
1928 self.msg.setArg(OPENID_NS, 'dh_server_public', self.dh_server_public) 1929 self.msg.setArg(OPENID_NS, 'enc_mac_key', 'n o t base 64') 1930 1931 self.failUnlessRaises(ValueError, self.consumer_session.extractSecret, self.msg)
1932
1933 -class TestOpenID1SHA1(TestDiffieHellmanResponseParameters, unittest.TestCase):
1934 session_cls = DiffieHellmanSHA1ConsumerSession 1935 message_namespace = OPENID1_NS
1936
1937 -class TestOpenID2SHA1(TestDiffieHellmanResponseParameters, unittest.TestCase):
1938 session_cls = DiffieHellmanSHA1ConsumerSession 1939 message_namespace = OPENID2_NS
1940 1941 if cryptutil.SHA256_AVAILABLE:
1942 - class TestOpenID2SHA256(TestDiffieHellmanResponseParameters, unittest.TestCase):
1943 session_cls = DiffieHellmanSHA256ConsumerSession 1944 message_namespace = OPENID2_NS
1945 else: 1946 warnings.warn("Not running SHA256 association session tests.") 1947
1948 -class TestNoStore(unittest.TestCase):
1949 - def setUp(self):
1950 self.consumer = GenericConsumer(None)
1951
1952 - def test_completeNoGetAssoc(self):
1953 """_getAssociation is never called when the store is None""" 1954 def notCalled(unused): 1955 self.fail('This method was unexpectedly called')
1956 1957 endpoint = OpenIDServiceEndpoint() 1958 endpoint.claimed_id = 'identity_url' 1959 1960 self.consumer._getAssociation = notCalled 1961 auth_request = self.consumer.begin(endpoint)
1962 # _getAssociation was not called 1963 1964 1965 1966
1967 -class NonAnonymousAuthRequest(object):
1968 endpoint = 'unused' 1969
1970 - def setAnonymous(self, unused):
1971 raise ValueError('Should trigger ProtocolError')
1972
1973 -class TestConsumerAnonymous(unittest.TestCase):
1975 """Make sure that ValueError for setting an auth request 1976 anonymous gets converted to a ProtocolError 1977 """ 1978 sess = {} 1979 consumer = Consumer(sess, None) 1980 def bogusBegin(unused): 1981 return NonAnonymousAuthRequest()
1982 consumer.consumer.begin = bogusBegin 1983 self.failUnlessRaises( 1984 ProtocolError, 1985 consumer.beginWithoutDiscovery, None)
1986 1987
1988 -class TestDiscoverAndVerify(unittest.TestCase):
1989 - def setUp(self):
1990 self.consumer = GenericConsumer(None) 1991 self.discovery_result = None 1992 def dummyDiscover(unused_identifier): 1993 return self.discovery_result
1994 self.consumer._discover = dummyDiscover 1995 self.to_match = OpenIDServiceEndpoint()
1996
1997 - def failUnlessDiscoveryFailure(self):
1998 self.failUnlessRaises( 1999 DiscoveryFailure, 2000 self.consumer._discoverAndVerify, 2001 'http://claimed-id.com/', 2002 [self.to_match])
2003
2004 - def test_noServices(self):
2005 """Discovery returning no results results in a 2006 DiscoveryFailure exception""" 2007 self.discovery_result = (None, []) 2008 self.failUnlessDiscoveryFailure()
2009
2010 - def test_noMatches(self):
2011 """If no discovered endpoint matches the values from the 2012 assertion, then we end up raising a ProtocolError 2013 """ 2014 self.discovery_result = (None, ['unused']) 2015 def raiseProtocolError(unused1, unused2): 2016 raise ProtocolError('unit test')
2017 self.consumer._verifyDiscoverySingle = raiseProtocolError 2018 self.failUnlessDiscoveryFailure() 2019
2020 - def test_matches(self):
2021 """If an endpoint matches, we return it 2022 """ 2023 # Discovery returns a single "endpoint" object 2024 matching_endpoint = 'matching endpoint' 2025 self.discovery_result = (None, [matching_endpoint]) 2026 2027 # Make verifying discovery return True for this endpoint 2028 def returnTrue(unused1, unused2): 2029 return True
2030 self.consumer._verifyDiscoverySingle = returnTrue 2031 2032 # Since _verifyDiscoverySingle returns True, we should get the 2033 # first endpoint that we passed in as a result. 2034 result = self.consumer._discoverAndVerify( 2035 'http://claimed.id/', [self.to_match]) 2036 self.failUnlessEqual(matching_endpoint, result) 2037 2038 from openid.extension import Extension
2039 -class SillyExtension(Extension):
2040 ns_uri = 'http://silly.example.com/' 2041 ns_alias = 'silly' 2042
2043 - def getExtensionArgs(self):
2044 return {'i_am':'silly'}
2045
2046 -class TestAddExtension(unittest.TestCase):
2047
2048 - def test_SillyExtension(self):
2049 ext = SillyExtension() 2050 ar = AuthRequest(OpenIDServiceEndpoint(), None) 2051 ar.addExtension(ext) 2052 ext_args = ar.message.getArgs(ext.ns_uri) 2053 self.failUnlessEqual(ext.getExtensionArgs(), ext_args)
2054 2055 2056
2057 -class TestKVPost(unittest.TestCase):
2058 - def setUp(self):
2059 self.server_url = 'http://unittest/%s' % (self.id(),)
2060
2061 - def test_200(self):
2062 from openid.fetchers import HTTPResponse 2063 response = HTTPResponse() 2064 response.status = 200 2065 response.body = "foo:bar\nbaz:quux\n" 2066 r = _httpResponseToMessage(response, self.server_url) 2067 expected_msg = Message.fromOpenIDArgs({'foo':'bar','baz':'quux'}) 2068 self.failUnlessEqual(expected_msg, r)
2069 2070
2071 - def test_400(self):
2072 response = HTTPResponse() 2073 response.status = 400 2074 response.body = "error:bonk\nerror_code:7\n" 2075 try: 2076 r = _httpResponseToMessage(response, self.server_url) 2077 except ServerError, e: 2078 self.failUnlessEqual(e.error_text, 'bonk') 2079 self.failUnlessEqual(e.error_code, '7') 2080 else: 2081 self.fail("Expected ServerError, got return %r" % (r,))
2082 2083
2084 - def test_500(self):
2085 # 500 as an example of any non-200, non-400 code. 2086 response = HTTPResponse() 2087 response.status = 500 2088 response.body = "foo:bar\nbaz:quux\n" 2089 self.failUnlessRaises(fetchers.HTTPFetchingError, 2090 _httpResponseToMessage, response, 2091 self.server_url)
2092 2093 2094 2095 2096 if __name__ == '__main__': 2097 unittest.main() 2098