Package openid :: Package test :: Module test_server
[frames] | no frames]

Source Code for Module openid.test.test_server

   1  """Tests for openid.server. 
   2  """ 
   3  from openid.server import server 
   4  from openid import association, cryptutil, oidutil 
   5  from openid.message import Message, OPENID_NS, OPENID2_NS, OPENID1_NS, \ 
   6       IDENTIFIER_SELECT, no_default, OPENID1_URL_LIMIT 
   7  from openid.store import memstore 
   8  import cgi 
   9   
  10  import unittest 
  11  import warnings 
  12   
  13  from urlparse import urlparse 
  14   
  15  # In general, if you edit or add tests here, try to move in the direction 
  16  # of testing smaller units.  For testing the external interfaces, we'll be 
  17  # developing an implementation-agnostic testing suite. 
  18   
  19  # for more, see /etc/ssh/moduli 
  20   
  21  ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7 
  22  ALT_GEN = 5 
  23   
24 -class CatchLogs(object):
25 - def setUp(self):
26 self.old_logger = oidutil.log 27 oidutil.log = self.gotLogMessage 28 self.messages = []
29
30 - def gotLogMessage(self, message):
31 self.messages.append(message)
32
33 - def tearDown(self):
34 oidutil.log = self.old_logger
35
36 -class TestProtocolError(unittest.TestCase):
37 - def test_browserWithReturnTo(self):
38 return_to = "http://rp.unittest/consumer" 39 # will be a ProtocolError raised by Decode or CheckIDRequest.answer 40 args = Message.fromPostArgs({ 41 'openid.mode': 'monkeydance', 42 'openid.identity': 'http://wagu.unittest/', 43 'openid.return_to': return_to, 44 }) 45 e = server.ProtocolError(args, "plucky") 46 self.failUnless(e.hasReturnTo()) 47 expected_args = { 48 'openid.mode': ['error'], 49 'openid.error': ['plucky'], 50 } 51 52 rt_base, result_args = e.encodeToURL().split('?', 1) 53 result_args = cgi.parse_qs(result_args) 54 self.failUnlessEqual(result_args, expected_args)
55
57 return_to = "http://rp.unittest/consumer" 58 # will be a ProtocolError raised by Decode or CheckIDRequest.answer 59 args = Message.fromPostArgs({ 60 'openid.ns': OPENID2_NS, 61 'openid.mode': 'monkeydance', 62 'openid.identity': 'http://wagu.unittest/', 63 'openid.claimed_id': 'http://wagu.unittest/', 64 'openid.return_to': return_to, 65 }) 66 e = server.ProtocolError(args, "plucky") 67 self.failUnless(e.hasReturnTo()) 68 expected_args = { 69 'openid.ns': [OPENID2_NS], 70 'openid.mode': ['error'], 71 'openid.error': ['plucky'], 72 } 73 74 rt_base, result_args = e.encodeToURL().split('?', 1) 75 result_args = cgi.parse_qs(result_args) 76 self.failUnlessEqual(result_args, expected_args)
77
78 - def test_browserWithReturnTo_OpenID2_POST(self):
79 return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT) 80 # will be a ProtocolError raised by Decode or CheckIDRequest.answer 81 args = Message.fromPostArgs({ 82 'openid.ns': OPENID2_NS, 83 'openid.mode': 'monkeydance', 84 'openid.identity': 'http://wagu.unittest/', 85 'openid.claimed_id': 'http://wagu.unittest/', 86 'openid.return_to': return_to, 87 }) 88 e = server.ProtocolError(args, "plucky") 89 self.failUnless(e.hasReturnTo()) 90 expected_args = { 91 'openid.ns': [OPENID2_NS], 92 'openid.mode': ['error'], 93 'openid.error': ['plucky'], 94 } 95 96 self.failUnless(e.whichEncoding() == server.ENCODE_HTML_FORM) 97 self.failUnless(e.toFormMarkup() == e.toMessage().toFormMarkup( 98 args.getArg(OPENID_NS, 'return_to')))
99
101 return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT) 102 # will be a ProtocolError raised by Decode or CheckIDRequest.answer 103 args = Message.fromPostArgs({ 104 'openid.mode': 'monkeydance', 105 'openid.identity': 'http://wagu.unittest/', 106 'openid.return_to': return_to, 107 }) 108 e = server.ProtocolError(args, "plucky") 109 self.failUnless(e.hasReturnTo()) 110 expected_args = { 111 'openid.mode': ['error'], 112 'openid.error': ['plucky'], 113 } 114 115 self.failUnless(e.whichEncoding() == server.ENCODE_URL) 116 117 rt_base, result_args = e.encodeToURL().split('?', 1) 118 result_args = cgi.parse_qs(result_args) 119 self.failUnlessEqual(result_args, expected_args)
120
121 - def test_noReturnTo(self):
122 # will be a ProtocolError raised by Decode or CheckIDRequest.answer 123 args = Message.fromPostArgs({ 124 'openid.mode': 'zebradance', 125 'openid.identity': 'http://wagu.unittest/', 126 }) 127 e = server.ProtocolError(args, "waffles") 128 self.failIf(e.hasReturnTo()) 129 expected = """error:waffles 130 mode:error 131 """ 132 self.failUnlessEqual(e.encodeToKVForm(), expected)
133 134
135 - def test_noMessage(self):
136 e = server.ProtocolError(None, "no moar pancakes") 137 self.failIf(e.hasReturnTo()) 138 self.failUnlessEqual(e.whichEncoding(), None)
139 140
141 -class TestDecode(unittest.TestCase):
142 - def setUp(self):
143 self.claimed_id = 'http://de.legating.de.coder.unittest/' 144 self.id_url = "http://decoder.am.unittest/" 145 self.rt_url = "http://rp.unittest/foobot/?qux=zam" 146 self.tr_url = "http://rp.unittest/" 147 self.assoc_handle = "{assoc}{handle}" 148 self.op_endpoint = 'http://endpoint.unittest/encode' 149 self.store = memstore.MemoryStore() 150 self.server = server.Server(self.store, self.op_endpoint) 151 self.decode = self.server.decoder.decode 152 self.decode = server.Decoder(self.server).decode
153
154 - def test_none(self):
155 args = {} 156 r = self.decode(args) 157 self.failUnlessEqual(r, None)
158
159 - def test_irrelevant(self):
160 args = { 161 'pony': 'spotted', 162 'sreg.mutant_power': 'decaffinator', 163 } 164 self.failUnlessRaises(server.ProtocolError, self.decode, args)
165
166 - def test_bad(self):
167 args = { 168 'openid.mode': 'twos-compliment', 169 'openid.pants': 'zippered', 170 } 171 self.failUnlessRaises(server.ProtocolError, self.decode, args)
172
173 - def test_dictOfLists(self):
174 args = { 175 'openid.mode': ['checkid_setup'], 176 'openid.identity': self.id_url, 177 'openid.assoc_handle': self.assoc_handle, 178 'openid.return_to': self.rt_url, 179 'openid.trust_root': self.tr_url, 180 } 181 try: 182 result = self.decode(args) 183 except TypeError, err: 184 self.failUnless(str(err).find('values') != -1, err) 185 else: 186 self.fail("Expected TypeError, but got result %s" % (result,))
187
188 - def test_checkidImmediate(self):
189 args = { 190 'openid.mode': 'checkid_immediate', 191 'openid.identity': self.id_url, 192 'openid.assoc_handle': self.assoc_handle, 193 'openid.return_to': self.rt_url, 194 'openid.trust_root': self.tr_url, 195 # should be ignored 196 'openid.some.extension': 'junk', 197 } 198 r = self.decode(args) 199 self.failUnless(isinstance(r, server.CheckIDRequest)) 200 self.failUnlessEqual(r.mode, "checkid_immediate") 201 self.failUnlessEqual(r.immediate, True) 202 self.failUnlessEqual(r.identity, self.id_url) 203 self.failUnlessEqual(r.trust_root, self.tr_url) 204 self.failUnlessEqual(r.return_to, self.rt_url) 205 self.failUnlessEqual(r.assoc_handle, self.assoc_handle)
206
207 - def test_checkidSetup(self):
208 args = { 209 'openid.mode': 'checkid_setup', 210 'openid.identity': self.id_url, 211 'openid.assoc_handle': self.assoc_handle, 212 'openid.return_to': self.rt_url, 213 'openid.trust_root': self.tr_url, 214 } 215 r = self.decode(args) 216 self.failUnless(isinstance(r, server.CheckIDRequest)) 217 self.failUnlessEqual(r.mode, "checkid_setup") 218 self.failUnlessEqual(r.immediate, False) 219 self.failUnlessEqual(r.identity, self.id_url) 220 self.failUnlessEqual(r.trust_root, self.tr_url) 221 self.failUnlessEqual(r.return_to, self.rt_url)
222
223 - def test_checkidSetupOpenID2(self):
224 args = { 225 'openid.ns': OPENID2_NS, 226 'openid.mode': 'checkid_setup', 227 'openid.identity': self.id_url, 228 'openid.claimed_id': self.claimed_id, 229 'openid.assoc_handle': self.assoc_handle, 230 'openid.return_to': self.rt_url, 231 'openid.realm': self.tr_url, 232 } 233 r = self.decode(args) 234 self.failUnless(isinstance(r, server.CheckIDRequest)) 235 self.failUnlessEqual(r.mode, "checkid_setup") 236 self.failUnlessEqual(r.immediate, False) 237 self.failUnlessEqual(r.identity, self.id_url) 238 self.failUnlessEqual(r.claimed_id, self.claimed_id) 239 self.failUnlessEqual(r.trust_root, self.tr_url) 240 self.failUnlessEqual(r.return_to, self.rt_url)
241
243 args = { 244 'openid.ns': OPENID2_NS, 245 'openid.mode': 'checkid_setup', 246 'openid.identity': self.id_url, 247 'openid.assoc_handle': self.assoc_handle, 248 'openid.return_to': self.rt_url, 249 'openid.realm': self.tr_url, 250 } 251 self.failUnlessRaises(server.ProtocolError, self.decode, args)
252
254 args = { 255 'openid.ns': OPENID2_NS, 256 'openid.mode': 'checkid_setup', 257 'openid.assoc_handle': self.assoc_handle, 258 'openid.return_to': self.rt_url, 259 'openid.realm': self.tr_url, 260 } 261 r = self.decode(args) 262 self.failUnless(isinstance(r, server.CheckIDRequest)) 263 self.failUnlessEqual(r.mode, "checkid_setup") 264 self.failUnlessEqual(r.immediate, False) 265 self.failUnlessEqual(r.identity, None) 266 self.failUnlessEqual(r.trust_root, self.tr_url) 267 self.failUnlessEqual(r.return_to, self.rt_url)
268
270 """Make sure an OpenID 1 request cannot be decoded if it lacks 271 a return_to. 272 """ 273 args = { 274 'openid.mode': 'checkid_setup', 275 'openid.identity': self.id_url, 276 'openid.assoc_handle': self.assoc_handle, 277 'openid.trust_root': self.tr_url, 278 } 279 self.failUnlessRaises(server.ProtocolError, self.decode, args)
280
282 """Make sure an OpenID 2 request with no return_to can be 283 decoded, and make sure a response to such a request raises 284 NoReturnToError. 285 """ 286 args = { 287 'openid.ns': OPENID2_NS, 288 'openid.mode': 'checkid_setup', 289 'openid.identity': self.id_url, 290 'openid.claimed_id': self.id_url, 291 'openid.assoc_handle': self.assoc_handle, 292 'openid.realm': self.tr_url, 293 } 294 self.failUnless(isinstance(self.decode(args), server.CheckIDRequest)) 295 296 req = self.decode(args) 297 self.assertRaises(server.NoReturnToError, req.answer, False) 298 self.assertRaises(server.NoReturnToError, req.encodeToURL, 'bogus') 299 self.assertRaises(server.NoReturnToError, req.getCancelURL)
300
302 """Make sure that an OpenID 2 request which lacks return_to 303 cannot be decoded if it lacks a realm. Spec: This value 304 (openid.realm) MUST be sent if openid.return_to is omitted. 305 """ 306 args = { 307 'openid.ns': OPENID2_NS, 308 'openid.mode': 'checkid_setup', 309 'openid.identity': self.id_url, 310 'openid.assoc_handle': self.assoc_handle, 311 } 312 self.failUnlessRaises(server.ProtocolError, self.decode, args)
313
314 - def test_checkidSetupBadReturn(self):
315 args = { 316 'openid.mode': 'checkid_setup', 317 'openid.identity': self.id_url, 318 'openid.assoc_handle': self.assoc_handle, 319 'openid.return_to': 'not a url', 320 } 321 try: 322 result = self.decode(args) 323 except server.ProtocolError, err: 324 self.failUnless(err.openid_message) 325 else: 326 self.fail("Expected ProtocolError, instead returned with %s" % 327 (result,))
328
330 args = { 331 'openid.mode': 'checkid_setup', 332 'openid.identity': self.id_url, 333 'openid.assoc_handle': self.assoc_handle, 334 'openid.return_to': self.rt_url, 335 'openid.trust_root': 'http://not-the-return-place.unittest/', 336 } 337 try: 338 result = self.decode(args) 339 except server.UntrustedReturnURL, err: 340 self.failUnless(err.openid_message) 341 else: 342 self.fail("Expected UntrustedReturnURL, instead returned with %s" % 343 (result,))
344
345 - def test_checkAuth(self):
346 args = { 347 'openid.mode': 'check_authentication', 348 'openid.assoc_handle': '{dumb}{handle}', 349 'openid.sig': 'sigblob', 350 'openid.signed': 'identity,return_to,response_nonce,mode', 351 'openid.identity': 'signedval1', 352 'openid.return_to': 'signedval2', 353 'openid.response_nonce': 'signedval3', 354 'openid.baz': 'unsigned', 355 } 356 r = self.decode(args) 357 self.failUnless(isinstance(r, server.CheckAuthRequest)) 358 self.failUnlessEqual(r.mode, 'check_authentication') 359 self.failUnlessEqual(r.sig, 'sigblob')
360 361
363 args = { 364 'openid.mode': 'check_authentication', 365 'openid.assoc_handle': '{dumb}{handle}', 366 'openid.signed': 'foo,bar,mode', 367 'openid.foo': 'signedval1', 368 'openid.bar': 'signedval2', 369 'openid.baz': 'unsigned', 370 } 371 self.failUnlessRaises(server.ProtocolError, self.decode, args)
372 373
375 args = { 376 'openid.mode': 'check_authentication', 377 'openid.assoc_handle': '{dumb}{handle}', 378 'openid.invalidate_handle': '[[SMART_handle]]', 379 'openid.sig': 'sigblob', 380 'openid.signed': 'identity,return_to,response_nonce,mode', 381 'openid.identity': 'signedval1', 382 'openid.return_to': 'signedval2', 383 'openid.response_nonce': 'signedval3', 384 'openid.baz': 'unsigned', 385 } 386 r = self.decode(args) 387 self.failUnless(isinstance(r, server.CheckAuthRequest)) 388 self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]')
389 390
391 - def test_associateDH(self):
392 args = { 393 'openid.mode': 'associate', 394 'openid.session_type': 'DH-SHA1', 395 'openid.dh_consumer_public': "Rzup9265tw==", 396 } 397 r = self.decode(args) 398 self.failUnless(isinstance(r, server.AssociateRequest)) 399 self.failUnlessEqual(r.mode, "associate") 400 self.failUnlessEqual(r.session.session_type, "DH-SHA1") 401 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1") 402 self.failUnless(r.session.consumer_pubkey)
403
404 - def test_associateDHMissingKey(self):
405 """Trying DH assoc w/o public key""" 406 args = { 407 'openid.mode': 'associate', 408 'openid.session_type': 'DH-SHA1', 409 } 410 # Using DH-SHA1 without supplying dh_consumer_public is an error. 411 self.failUnlessRaises(server.ProtocolError, self.decode, args)
412 413
415 args = { 416 'openid.mode': 'associate', 417 'openid.session_type': 'DH-SHA1', 418 'openid.dh_consumer_public': "donkeydonkeydonkey", 419 } 420 self.failUnlessRaises(server.ProtocolError, self.decode, args)
421 422
423 - def test_associateDHModGen(self):
424 # test dh with non-default but valid values for dh_modulus and dh_gen 425 args = { 426 'openid.mode': 'associate', 427 'openid.session_type': 'DH-SHA1', 428 'openid.dh_consumer_public': "Rzup9265tw==", 429 'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS), 430 'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) , 431 } 432 r = self.decode(args) 433 self.failUnless(isinstance(r, server.AssociateRequest)) 434 self.failUnlessEqual(r.mode, "associate") 435 self.failUnlessEqual(r.session.session_type, "DH-SHA1") 436 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1") 437 self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS) 438 self.failUnlessEqual(r.session.dh.generator, ALT_GEN) 439 self.failUnless(r.session.consumer_pubkey)
440 441
443 # test dh with non-default but valid values for dh_modulus and dh_gen 444 args = { 445 'openid.mode': 'associate', 446 'openid.session_type': 'DH-SHA1', 447 'openid.dh_consumer_public': "Rzup9265tw==", 448 'openid.dh_modulus': 'pizza', 449 'openid.dh_gen': 'gnocchi', 450 } 451 self.failUnlessRaises(server.ProtocolError, self.decode, args)
452 453
455 # test dh with non-default but valid values for dh_modulus and dh_gen 456 args = { 457 'openid.mode': 'associate', 458 'openid.session_type': 'DH-SHA1', 459 'openid.dh_consumer_public': "Rzup9265tw==", 460 'openid.dh_modulus': 'pizza', 461 } 462 self.failUnlessRaises(server.ProtocolError, self.decode, args)
463 464 465 # def test_associateDHInvalidModGen(self): 466 # # test dh with properly encoded values that are not a valid 467 # # modulus/generator combination. 468 # args = { 469 # 'openid.mode': 'associate', 470 # 'openid.session_type': 'DH-SHA1', 471 # 'openid.dh_consumer_public': "Rzup9265tw==", 472 # 'openid.dh_modulus': cryptutil.longToBase64(9), 473 # 'openid.dh_gen': cryptutil.longToBase64(27) , 474 # } 475 # self.failUnlessRaises(server.ProtocolError, self.decode, args) 476 # test_associateDHInvalidModGen.todo = "low-priority feature" 477 478
479 - def test_associateWeirdSession(self):
480 args = { 481 'openid.mode': 'associate', 482 'openid.session_type': 'FLCL6', 483 'openid.dh_consumer_public': "YQ==\n", 484 } 485 self.failUnlessRaises(server.ProtocolError, self.decode, args)
486 487
488 - def test_associatePlain(self):
489 args = { 490 'openid.mode': 'associate', 491 } 492 r = self.decode(args) 493 self.failUnless(isinstance(r, server.AssociateRequest)) 494 self.failUnlessEqual(r.mode, "associate") 495 self.failUnlessEqual(r.session.session_type, "no-encryption") 496 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
497
498 - def test_nomode(self):
499 args = { 500 'openid.session_type': 'DH-SHA1', 501 'openid.dh_consumer_public': "my public keeey", 502 } 503 self.failUnlessRaises(server.ProtocolError, self.decode, args)
504
505 - def test_invalidns(self):
506 args = {'openid.ns': 'Tuesday', 507 'openid.mode': 'associate'} 508 509 try: 510 r = self.decode(args) 511 except server.ProtocolError, err: 512 # Assert that the ProtocolError does have a Message attached 513 # to it, even though the request wasn't a well-formed Message. 514 self.failUnless(err.openid_message) 515 # The error message contains the bad openid.ns. 516 self.failUnless('Tuesday' in str(err), str(err)) 517 else: 518 self.fail("Expected ProtocolError but returned with %r" % (r,))
519 520
521 -class TestEncode(unittest.TestCase):
522 - def setUp(self):
523 self.encoder = server.Encoder() 524 self.encode = self.encoder.encode 525 self.op_endpoint = 'http://endpoint.unittest/encode' 526 self.store = memstore.MemoryStore() 527 self.server = server.Server(self.store, self.op_endpoint)
528
529 - def test_id_res_OpenID2_GET(self):
530 """ 531 Check that when an OpenID 2 response does not exceed the 532 OpenID 1 message size, a GET response (i.e., redirect) is 533 issued. 534 """ 535 request = server.CheckIDRequest( 536 identity = 'http://bombom.unittest/', 537 trust_root = 'http://burr.unittest/', 538 return_to = 'http://burr.unittest/999', 539 immediate = False, 540 op_endpoint = self.server.op_endpoint, 541 ) 542 request.message = Message(OPENID2_NS) 543 response = server.OpenIDResponse(request) 544 response.fields = Message.fromOpenIDArgs({ 545 'ns': OPENID2_NS, 546 'mode': 'id_res', 547 'identity': request.identity, 548 'claimed_id': request.identity, 549 'return_to': request.return_to, 550 }) 551 552 self.failIf(response.renderAsForm()) 553 self.failUnless(response.whichEncoding() == server.ENCODE_URL) 554 webresponse = self.encode(response) 555 self.failUnless(webresponse.headers.has_key('location'))
556
557 - def test_id_res_OpenID2_POST(self):
558 """ 559 Check that when an OpenID 2 response exceeds the OpenID 1 560 message size, a POST response (i.e., an HTML form) is 561 returned. 562 """ 563 request = server.CheckIDRequest( 564 identity = 'http://bombom.unittest/', 565 trust_root = 'http://burr.unittest/', 566 return_to = 'http://burr.unittest/999', 567 immediate = False, 568 op_endpoint = self.server.op_endpoint, 569 ) 570 request.message = Message(OPENID2_NS) 571 response = server.OpenIDResponse(request) 572 response.fields = Message.fromOpenIDArgs({ 573 'ns': OPENID2_NS, 574 'mode': 'id_res', 575 'identity': request.identity, 576 'claimed_id': request.identity, 577 'return_to': 'x' * OPENID1_URL_LIMIT, 578 }) 579 580 self.failUnless(response.renderAsForm()) 581 self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT) 582 self.failUnless(response.whichEncoding() == server.ENCODE_HTML_FORM) 583 webresponse = self.encode(response) 584 self.failUnlessEqual(webresponse.body, response.toFormMarkup())
585
586 - def test_toFormMarkup(self):
587 request = server.CheckIDRequest( 588 identity = 'http://bombom.unittest/', 589 trust_root = 'http://burr.unittest/', 590 return_to = 'http://burr.unittest/999', 591 immediate = False, 592 op_endpoint = self.server.op_endpoint, 593 ) 594 request.message = Message(OPENID2_NS) 595 response = server.OpenIDResponse(request) 596 response.fields = Message.fromOpenIDArgs({ 597 'ns': OPENID2_NS, 598 'mode': 'id_res', 599 'identity': request.identity, 600 'claimed_id': request.identity, 601 'return_to': 'x' * OPENID1_URL_LIMIT, 602 }) 603 604 form_markup = response.toFormMarkup({'foo':'bar'}) 605 self.failUnless(' foo="bar"' in form_markup)
606
607 - def test_toHTML(self):
608 request = server.CheckIDRequest( 609 identity = 'http://bombom.unittest/', 610 trust_root = 'http://burr.unittest/', 611 return_to = 'http://burr.unittest/999', 612 immediate = False, 613 op_endpoint = self.server.op_endpoint, 614 ) 615 request.message = Message(OPENID2_NS) 616 response = server.OpenIDResponse(request) 617 response.fields = Message.fromOpenIDArgs({ 618 'ns': OPENID2_NS, 619 'mode': 'id_res', 620 'identity': request.identity, 621 'claimed_id': request.identity, 622 'return_to': 'x' * OPENID1_URL_LIMIT, 623 }) 624 html = response.toHTML() 625 self.failUnless('<html>' in html) 626 self.failUnless('</html>' in html) 627 self.failUnless('<body onload=' in html) 628 self.failUnless('<form' in html) 629 self.failUnless('http://bombom.unittest/' in html)
630
632 """ 633 Check that when an OpenID 1 response exceeds the OpenID 1 634 message size, a GET response is issued. Technically, this 635 shouldn't be permitted by the library, but this test is in 636 place to preserve the status quo for OpenID 1. 637 """ 638 request = server.CheckIDRequest( 639 identity = 'http://bombom.unittest/', 640 trust_root = 'http://burr.unittest/', 641 return_to = 'http://burr.unittest/999', 642 immediate = False, 643 op_endpoint = self.server.op_endpoint, 644 ) 645 request.message = Message(OPENID2_NS) 646 response = server.OpenIDResponse(request) 647 response.fields = Message.fromOpenIDArgs({ 648 'mode': 'id_res', 649 'identity': request.identity, 650 'return_to': 'x' * OPENID1_URL_LIMIT, 651 }) 652 653 self.failIf(response.renderAsForm()) 654 self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT) 655 self.failUnless(response.whichEncoding() == server.ENCODE_URL) 656 webresponse = self.encode(response) 657 self.failUnlessEqual(webresponse.headers['location'], response.encodeToURL())
658
659 - def test_id_res(self):
660 request = server.CheckIDRequest( 661 identity = 'http://bombom.unittest/', 662 trust_root = 'http://burr.unittest/', 663 return_to = 'http://burr.unittest/999', 664 immediate = False, 665 op_endpoint = self.server.op_endpoint, 666 ) 667 request.message = Message(OPENID2_NS) 668 response = server.OpenIDResponse(request) 669 response.fields = Message.fromOpenIDArgs({ 670 'mode': 'id_res', 671 'identity': request.identity, 672 'return_to': request.return_to, 673 }) 674 webresponse = self.encode(response) 675 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT) 676 self.failUnless(webresponse.headers.has_key('location')) 677 678 location = webresponse.headers['location'] 679 self.failUnless(location.startswith(request.return_to), 680 "%s does not start with %s" % (location, 681 request.return_to)) 682 # argh. 683 q2 = dict(cgi.parse_qsl(urlparse(location)[4])) 684 expected = response.fields.toPostArgs() 685 self.failUnlessEqual(q2, expected)
686
687 - def test_cancel(self):
688 request = server.CheckIDRequest( 689 identity = 'http://bombom.unittest/', 690 trust_root = 'http://burr.unittest/', 691 return_to = 'http://burr.unittest/999', 692 immediate = False, 693 op_endpoint = self.server.op_endpoint, 694 ) 695 request.message = Message(OPENID2_NS) 696 response = server.OpenIDResponse(request) 697 response.fields = Message.fromOpenIDArgs({ 698 'mode': 'cancel', 699 }) 700 webresponse = self.encode(response) 701 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT) 702 self.failUnless(webresponse.headers.has_key('location'))
703
704 - def test_cancelToForm(self):
705 request = server.CheckIDRequest( 706 identity = 'http://bombom.unittest/', 707 trust_root = 'http://burr.unittest/', 708 return_to = 'http://burr.unittest/999', 709 immediate = False, 710 op_endpoint = self.server.op_endpoint, 711 ) 712 request.message = Message(OPENID2_NS) 713 response = server.OpenIDResponse(request) 714 response.fields = Message.fromOpenIDArgs({ 715 'mode': 'cancel', 716 }) 717 form = response.toFormMarkup() 718 self.failUnless(form)
719
720 - def test_assocReply(self):
721 msg = Message(OPENID2_NS) 722 msg.setArg(OPENID2_NS, 'session_type', 'no-encryption') 723 request = server.AssociateRequest.fromMessage(msg) 724 response = server.OpenIDResponse(request) 725 response.fields = Message.fromPostArgs( 726 {'openid.assoc_handle': "every-zig"}) 727 webresponse = self.encode(response) 728 body = """assoc_handle:every-zig 729 """ 730 self.failUnlessEqual(webresponse.code, server.HTTP_OK) 731 self.failUnlessEqual(webresponse.headers, {}) 732 self.failUnlessEqual(webresponse.body, body)
733
734 - def test_checkauthReply(self):
735 request = server.CheckAuthRequest('a_sock_monkey', 736 'siggggg', 737 []) 738 response = server.OpenIDResponse(request) 739 response.fields = Message.fromOpenIDArgs({ 740 'is_valid': 'true', 741 'invalidate_handle': 'xXxX:xXXx' 742 }) 743 body = """invalidate_handle:xXxX:xXXx 744 is_valid:true 745 """ 746 webresponse = self.encode(response) 747 self.failUnlessEqual(webresponse.code, server.HTTP_OK) 748 self.failUnlessEqual(webresponse.headers, {}) 749 self.failUnlessEqual(webresponse.body, body)
750
751 - def test_unencodableError(self):
752 args = Message.fromPostArgs({ 753 'openid.identity': 'http://limu.unittest/', 754 }) 755 e = server.ProtocolError(args, "wet paint") 756 self.failUnlessRaises(server.EncodingError, self.encode, e)
757
758 - def test_encodableError(self):
759 args = Message.fromPostArgs({ 760 'openid.mode': 'associate', 761 'openid.identity': 'http://limu.unittest/', 762 }) 763 body="error:snoot\nmode:error\n" 764 webresponse = self.encode(server.ProtocolError(args, "snoot")) 765 self.failUnlessEqual(webresponse.code, server.HTTP_ERROR) 766 self.failUnlessEqual(webresponse.headers, {}) 767 self.failUnlessEqual(webresponse.body, body)
768 769 770
771 -class TestSigningEncode(unittest.TestCase):
772 - def setUp(self):
773 self._dumb_key = server.Signatory._dumb_key 774 self._normal_key = server.Signatory._normal_key 775 self.store = memstore.MemoryStore() 776 self.server = server.Server(self.store, "http://signing.unittest/enc") 777 self.request = server.CheckIDRequest( 778 identity = 'http://bombom.unittest/', 779 trust_root = 'http://burr.unittest/', 780 return_to = 'http://burr.unittest/999', 781 immediate = False, 782 op_endpoint = self.server.op_endpoint, 783 ) 784 self.request.message = Message(OPENID2_NS) 785 self.response = server.OpenIDResponse(self.request) 786 self.response.fields = Message.fromOpenIDArgs({ 787 'mode': 'id_res', 788 'identity': self.request.identity, 789 'return_to': self.request.return_to, 790 }) 791 self.signatory = server.Signatory(self.store) 792 self.encoder = server.SigningEncoder(self.signatory) 793 self.encode = self.encoder.encode
794
795 - def test_idres(self):
796 assoc_handle = '{bicycle}{shed}' 797 self.store.storeAssociation( 798 self._normal_key, 799 association.Association.fromExpiresIn(60, assoc_handle, 800 'sekrit', 'HMAC-SHA1')) 801 self.request.assoc_handle = assoc_handle 802 webresponse = self.encode(self.response) 803 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT) 804 self.failUnless(webresponse.headers.has_key('location')) 805 806 location = webresponse.headers['location'] 807 query = cgi.parse_qs(urlparse(location)[4]) 808 self.failUnless('openid.sig' in query) 809 self.failUnless('openid.assoc_handle' in query) 810 self.failUnless('openid.signed' in query)
811
812 - def test_idresDumb(self):
813 webresponse = self.encode(self.response) 814 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT) 815 self.failUnless(webresponse.headers.has_key('location')) 816 817 location = webresponse.headers['location'] 818 query = cgi.parse_qs(urlparse(location)[4]) 819 self.failUnless('openid.sig' in query) 820 self.failUnless('openid.assoc_handle' in query) 821 self.failUnless('openid.signed' in query)
822
823 - def test_forgotStore(self):
824 self.encoder.signatory = None 825 self.failUnlessRaises(ValueError, self.encode, self.response)
826
827 - def test_cancel(self):
828 request = server.CheckIDRequest( 829 identity = 'http://bombom.unittest/', 830 trust_root = 'http://burr.unittest/', 831 return_to = 'http://burr.unittest/999', 832 immediate = False, 833 op_endpoint = self.server.op_endpoint, 834 ) 835 request.message = Message(OPENID2_NS) 836 response = server.OpenIDResponse(request) 837 response.fields.setArg(OPENID_NS, 'mode', 'cancel') 838 webresponse = self.encode(response) 839 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT) 840 self.failUnless(webresponse.headers.has_key('location')) 841 location = webresponse.headers['location'] 842 query = cgi.parse_qs(urlparse(location)[4]) 843 self.failIf('openid.sig' in query, response.fields.toPostArgs())
844
845 - def test_assocReply(self):
846 msg = Message(OPENID2_NS) 847 msg.setArg(OPENID2_NS, 'session_type', 'no-encryption') 848 request = server.AssociateRequest.fromMessage(msg) 849 response = server.OpenIDResponse(request) 850 response.fields = Message.fromOpenIDArgs({'assoc_handle': "every-zig"}) 851 webresponse = self.encode(response) 852 body = """assoc_handle:every-zig 853 """ 854 self.failUnlessEqual(webresponse.code, server.HTTP_OK) 855 self.failUnlessEqual(webresponse.headers, {}) 856 self.failUnlessEqual(webresponse.body, body)
857
858 - def test_alreadySigned(self):
859 self.response.fields.setArg(OPENID_NS, 'sig', 'priorSig==') 860 self.failUnlessRaises(server.AlreadySigned, self.encode, self.response)
861
862 -class TestCheckID(unittest.TestCase):
863 - def setUp(self):
864 self.op_endpoint = 'http://endpoint.unittest/' 865 self.store = memstore.MemoryStore() 866 self.server = server.Server(self.store, self.op_endpoint) 867 self.request = server.CheckIDRequest( 868 identity = 'http://bambam.unittest/', 869 trust_root = 'http://bar.unittest/', 870 return_to = 'http://bar.unittest/999', 871 immediate = False, 872 op_endpoint = self.server.op_endpoint, 873 ) 874 self.request.message = Message(OPENID2_NS)
875
876 - def test_trustRootInvalid(self):
877 self.request.trust_root = "http://foo.unittest/17" 878 self.request.return_to = "http://foo.unittest/39" 879 self.failIf(self.request.trustRootValid())
880
881 - def test_trustRootValid(self):
882 self.request.trust_root = "http://foo.unittest/" 883 self.request.return_to = "http://foo.unittest/39" 884 self.failUnless(self.request.trustRootValid())
885
886 - def test_malformedTrustRoot(self):
887 self.request.trust_root = "invalid://trust*root/" 888 self.request.return_to = "http://foo.unittest/39" 889 sentinel = object() 890 self.request.message = sentinel 891 try: 892 result = self.request.trustRootValid() 893 except server.MalformedTrustRoot, why: 894 self.failUnless(sentinel is why.openid_message) 895 else: 896 self.fail('Expected MalformedTrustRoot exception. Got %r' 897 % (result,))
898
900 request = server.CheckIDRequest( 901 identity = 'http://bambam.unittest/', 902 trust_root = 'http://bar.unittest/', 903 return_to = None, 904 immediate = False, 905 op_endpoint = self.server.op_endpoint, 906 ) 907 908 self.failUnless(request.trustRootValid())
909
911 """Make sure that verifyReturnTo is calling the trustroot 912 function verifyReturnTo 913 """ 914 def withVerifyReturnTo(new_verify, callable): 915 old_verify = server.verifyReturnTo 916 try: 917 server.verifyReturnTo = new_verify 918 return callable() 919 finally: 920 server.verifyReturnTo = old_verify
921 922 # Ensure that exceptions are passed through 923 sentinel = Exception() 924 def vrfyExc(trust_root, return_to): 925 self.failUnlessEqual(self.request.trust_root, trust_root) 926 self.failUnlessEqual(self.request.return_to, return_to) 927 raise sentinel
928 929 try: 930 withVerifyReturnTo(vrfyExc, self.request.returnToVerified) 931 except Exception, e: 932 self.failUnless(e is sentinel, e) 933 934 # Ensure that True and False are passed through unchanged 935 def constVerify(val): 936 def verify(trust_root, return_to): 937 self.failUnlessEqual(self.request.trust_root, trust_root) 938 self.failUnlessEqual(self.request.return_to, return_to) 939 return val 940 return verify 941 942 for val in [True, False]: 943 self.failUnlessEqual( 944 val, 945 withVerifyReturnTo(constVerify(val), 946 self.request.returnToVerified)) 947
948 - def _expectAnswer(self, answer, identity=None, claimed_id=None):
949 expected_list = [ 950 ('mode', 'id_res'), 951 ('return_to', self.request.return_to), 952 ('op_endpoint', self.op_endpoint), 953 ] 954 if identity: 955 expected_list.append(('identity', identity)) 956 if claimed_id: 957 expected_list.append(('claimed_id', claimed_id)) 958 else: 959 expected_list.append(('claimed_id', identity)) 960 961 for k, expected in expected_list: 962 actual = answer.fields.getArg(OPENID_NS, k) 963 self.failUnlessEqual(actual, expected, "%s: expected %s, got %s" % (k, expected, actual)) 964 965 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce')) 966 self.failUnless(answer.fields.getOpenIDNamespace() == OPENID2_NS) 967 968 # One for nonce, one for ns 969 self.failUnlessEqual(len(answer.fields.toPostArgs()), 970 len(expected_list) + 2, 971 answer.fields.toPostArgs())
972
973 - def test_answerAllow(self):
974 """Check the fields specified by "Positive Assertions" 975 976 including mode=id_res, identity, claimed_id, op_endpoint, return_to 977 """ 978 answer = self.request.answer(True) 979 self.failUnlessEqual(answer.request, self.request) 980 self._expectAnswer(answer, self.request.identity)
981
982 - def test_answerAllowDelegatedIdentity(self):
983 self.request.claimed_id = 'http://delegating.unittest/' 984 answer = self.request.answer(True) 985 self._expectAnswer(answer, self.request.identity, 986 self.request.claimed_id)
987
988 - def test_answerAllowDelegatedIdentity2(self):
989 # This time with the identity argument explicitly passed in to 990 # answer() 991 self.request.claimed_id = 'http://delegating.unittest/' 992 answer = self.request.answer(True, identity='http://bambam.unittest/') 993 self._expectAnswer(answer, self.request.identity, 994 self.request.claimed_id)
995
996 - def test_answerAllowWithoutIdentityReally(self):
997 self.request.identity = None 998 answer = self.request.answer(True) 999 self.failUnlessEqual(answer.request, self.request) 1000 self._expectAnswer(answer)
1001
1002 - def test_answerAllowAnonymousFail(self):
1003 self.request.identity = None 1004 # XXX - Check on this, I think this behavior is legal in OpenID 2.0? 1005 self.failUnlessRaises( 1006 ValueError, self.request.answer, True, identity="=V")
1007
1008 - def test_answerAllowWithIdentity(self):
1009 self.request.identity = IDENTIFIER_SELECT 1010 selected_id = 'http://anon.unittest/9861' 1011 answer = self.request.answer(True, identity=selected_id) 1012 self._expectAnswer(answer, selected_id)
1013
1014 - def test_answerAllowWithDelegatedIdentityOpenID2(self):
1015 """Answer an IDENTIFIER_SELECT case with a delegated identifier. 1016 """ 1017 # claimed_id delegates to selected_id here. 1018 self.request.identity = IDENTIFIER_SELECT 1019 selected_id = 'http://anon.unittest/9861' 1020 claimed_id = 'http://monkeyhat.unittest/' 1021 answer = self.request.answer(True, identity=selected_id, 1022 claimed_id=claimed_id) 1023 self._expectAnswer(answer, selected_id, claimed_id)
1024
1025 - def test_answerAllowWithDelegatedIdentityOpenID1(self):
1026 """claimed_id parameter doesn't exist in OpenID 1. 1027 """ 1028 self.request.message = Message(OPENID1_NS) 1029 # claimed_id delegates to selected_id here. 1030 self.request.identity = IDENTIFIER_SELECT 1031 selected_id = 'http://anon.unittest/9861' 1032 claimed_id = 'http://monkeyhat.unittest/' 1033 self.failUnlessRaises(server.VersionError, 1034 self.request.answer, True, 1035 identity=selected_id, 1036 claimed_id=claimed_id)
1037
1038 - def test_answerAllowWithAnotherIdentity(self):
1039 # XXX - Check on this, I think this behavior is legal in OpenID 2.0? 1040 self.failUnlessRaises(ValueError, self.request.answer, True, 1041 identity="http://pebbles.unittest/")
1042
1043 - def test_answerAllowWithIdentityNormalization(self):
1044 # The RP has sent us a non-normalized value for openid.identity, 1045 # and the library user is passing an explicit value for identity 1046 # to CheckIDRequest.answer. 1047 non_normalized = 'http://bambam.unittest' 1048 normalized = non_normalized + '/' 1049 1050 self.request.identity = non_normalized 1051 self.request.claimed_id = non_normalized 1052 1053 answer = self.request.answer(True, identity=normalized) 1054 1055 # Expect the values that were sent in the request, even though 1056 # they're not normalized. 1057 self._expectAnswer(answer, identity=non_normalized, 1058 claimed_id=non_normalized)
1059
1060 - def test_answerAllowNoIdentityOpenID1(self):
1061 self.request.message = Message(OPENID1_NS) 1062 self.request.identity = None 1063 self.failUnlessRaises(ValueError, self.request.answer, True, 1064 identity=None)
1065
1066 - def test_answerAllowForgotEndpoint(self):
1067 self.request.op_endpoint = None 1068 self.failUnlessRaises(RuntimeError, self.request.answer, True)
1069
1070 - def test_checkIDWithNoIdentityOpenID1(self):
1071 msg = Message(OPENID1_NS) 1072 msg.setArg(OPENID_NS, 'return_to', 'bogus') 1073 msg.setArg(OPENID_NS, 'trust_root', 'bogus') 1074 msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 1075 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 1076 1077 self.failUnlessRaises(server.ProtocolError, 1078 server.CheckIDRequest.fromMessage, 1079 msg, self.server)
1080
1081 - def test_fromMessageClaimedIDWithoutIdentityOpenID2(self):
1082 name = 'https://example.myopenid.com' 1083 1084 msg = Message(OPENID2_NS) 1085 msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 1086 msg.setArg(OPENID_NS, 'return_to', 'http://invalid:8000/rt') 1087 msg.setArg(OPENID_NS, 'claimed_id', name) 1088 1089 self.failUnlessRaises(server.ProtocolError, 1090 server.CheckIDRequest.fromMessage, 1091 msg, self.server)
1092
1093 - def test_fromMessageIdentityWithoutClaimedIDOpenID2(self):
1094 name = 'https://example.myopenid.com' 1095 1096 msg = Message(OPENID2_NS) 1097 msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 1098 msg.setArg(OPENID_NS, 'return_to', 'http://invalid:8000/rt') 1099 msg.setArg(OPENID_NS, 'identity', name) 1100 1101 self.failUnlessRaises(server.ProtocolError, 1102 server.CheckIDRequest.fromMessage, 1103 msg, self.server)
1104
1105 - def test_trustRootOpenID1(self):
1106 """Ignore openid.realm in OpenID 1""" 1107 msg = Message(OPENID1_NS) 1108 msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 1109 msg.setArg(OPENID_NS, 'trust_root', 'http://real_trust_root/') 1110 msg.setArg(OPENID_NS, 'realm', 'http://fake_trust_root/') 1111 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo') 1112 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 1113 msg.setArg(OPENID_NS, 'identity', 'george') 1114 1115 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 1116 1117 self.failUnless(result.trust_root == 'http://real_trust_root/')
1118
1119 - def test_trustRootOpenID2(self):
1120 """Ignore openid.trust_root in OpenID 2""" 1121 msg = Message(OPENID2_NS) 1122 msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 1123 msg.setArg(OPENID_NS, 'realm', 'http://real_trust_root/') 1124 msg.setArg(OPENID_NS, 'trust_root', 'http://fake_trust_root/') 1125 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo') 1126 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 1127 msg.setArg(OPENID_NS, 'identity', 'george') 1128 msg.setArg(OPENID_NS, 'claimed_id', 'george') 1129 1130 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 1131 1132 self.failUnless(result.trust_root == 'http://real_trust_root/')
1133
1134 - def test_answerAllowNoTrustRoot(self):
1135 self.request.trust_root = None 1136 answer = self.request.answer(True) 1137 self.failUnlessEqual(answer.request, self.request) 1138 self._expectAnswer(answer, self.request.identity)
1139
1140 - def test_fromMessageWithoutTrustRoot(self):
1141 msg = Message(OPENID2_NS) 1142 msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 1143 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo') 1144 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 1145 msg.setArg(OPENID_NS, 'identity', 'george') 1146 msg.setArg(OPENID_NS, 'claimed_id', 'george') 1147 1148 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 1149 1150 self.failUnlessEqual(result.trust_root, 'http://real_trust_root/foo')
1151
1152 - def test_fromMessageWithEmptyTrustRoot(self):
1153 return_to = u'http://someplace.invalid/?go=thing' 1154 msg = Message.fromPostArgs({ 1155 u'openid.assoc_handle': u'{blah}{blah}{OZivdQ==}', 1156 u'openid.claimed_id': u'http://delegated.invalid/', 1157 u'openid.identity': u'http://op-local.example.com/', 1158 u'openid.mode': u'checkid_setup', 1159 u'openid.ns': u'http://openid.net/signon/1.0', 1160 u'openid.return_to': return_to, 1161 u'openid.trust_root': u''}) 1162 1163 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 1164 1165 self.failUnlessEqual(result.trust_root, return_to)
1166
1167 - def test_fromMessageWithoutTrustRootOrReturnTo(self):
1168 msg = Message(OPENID2_NS) 1169 msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 1170 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 1171 msg.setArg(OPENID_NS, 'identity', 'george') 1172 msg.setArg(OPENID_NS, 'claimed_id', 'george') 1173 1174 self.failUnlessRaises(server.ProtocolError, 1175 server.CheckIDRequest.fromMessage, 1176 msg, self.server.op_endpoint)
1177
1178 - def test_answerAllowNoEndpointOpenID1(self):
1179 """Test .allow() with an OpenID 1.x Message on a CheckIDRequest 1180 built without an op_endpoint parameter. 1181 """ 1182 identity = 'http://bambam.unittest/' 1183 reqmessage = Message.fromOpenIDArgs({ 1184 'identity': identity, 1185 'trust_root': 'http://bar.unittest/', 1186 'return_to': 'http://bar.unittest/999', 1187 }) 1188 self.request = server.CheckIDRequest.fromMessage(reqmessage, None) 1189 answer = self.request.answer(True) 1190 1191 expected_list = [ 1192 ('mode', 'id_res'), 1193 ('return_to', self.request.return_to), 1194 ('identity', identity), 1195 ] 1196 1197 for k, expected in expected_list: 1198 actual = answer.fields.getArg(OPENID_NS, k) 1199 self.failUnlessEqual( 1200 expected, actual, 1201 "%s: expected %s, got %s" % (k, expected, actual)) 1202 1203 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce')) 1204 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS) 1205 self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS)) 1206 1207 # One for nonce (OpenID v1 namespace is implicit) 1208 self.failUnlessEqual(len(answer.fields.toPostArgs()), 1209 len(expected_list) + 1, 1210 answer.fields.toPostArgs())
1211
1212 - def test_answerImmediateDenyOpenID2(self):
1213 """Look for mode=setup_needed in checkid_immediate negative 1214 response in OpenID 2 case. 1215 1216 See specification Responding to Authentication Requests / 1217 Negative Assertions / In Response to Immediate Requests. 1218 """ 1219 self.request.mode = 'checkid_immediate' 1220 self.request.immediate = True 1221 self.request.claimed_id = 'http://claimed-id.test/' 1222 server_url = "http://setup-url.unittest/" 1223 # crappiting setup_url, you dirty my interface with your presence! 1224 answer = self.request.answer(False, server_url=server_url) 1225 self.failUnlessEqual(answer.request, self.request) 1226 self.failUnlessEqual(len(answer.fields.toPostArgs()), 3, answer.fields) 1227 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID2_NS) 1228 self.failUnlessEqual(answer.fields.getArg(OPENID_NS, 'mode'), 1229 'setup_needed') 1230 1231 usu = answer.fields.getArg(OPENID_NS, 'user_setup_url') 1232 expected_substr = 'openid.claimed_id=http%3A%2F%2Fclaimed-id.test%2F' 1233 self.failUnless(expected_substr in usu, usu)
1234
1235 - def test_answerImmediateDenyOpenID1(self):
1236 """Look for user_setup_url in checkid_immediate negative 1237 response in OpenID 1 case.""" 1238 self.request.message = Message(OPENID1_NS) 1239 self.request.mode = 'checkid_immediate' 1240 self.request.immediate = True 1241 server_url = "http://setup-url.unittest/" 1242 # crappiting setup_url, you dirty my interface with your presence! 1243 answer = self.request.answer(False, server_url=server_url) 1244 self.failUnlessEqual(answer.request, self.request) 1245 self.failUnlessEqual(len(answer.fields.toPostArgs()), 2, answer.fields) 1246 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS) 1247 self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS)) 1248 self.failUnlessEqual(answer.fields.getArg(OPENID_NS, 'mode'), 'id_res') 1249 self.failUnless(answer.fields.getArg( 1250 OPENID_NS, 'user_setup_url', '').startswith(server_url))
1251
1252 - def test_answerSetupDeny(self):
1253 answer = self.request.answer(False) 1254 self.failUnlessEqual(answer.fields.getArgs(OPENID_NS), { 1255 'mode': 'cancel', 1256 })
1257
1258 - def test_encodeToURL(self):
1259 server_url = 'http://openid-server.unittest/' 1260 result = self.request.encodeToURL(server_url) 1261 1262 # How to check? How about a round-trip test. 1263 base, result_args = result.split('?', 1) 1264 result_args = dict(cgi.parse_qsl(result_args)) 1265 message = Message.fromPostArgs(result_args) 1266 rebuilt_request = server.CheckIDRequest.fromMessage(message, 1267 self.server.op_endpoint) 1268 # argh, lousy hack 1269 self.request.message = message 1270 self.failUnlessEqual(rebuilt_request.__dict__, self.request.__dict__)
1271
1272 - def test_getCancelURL(self):
1273 url = self.request.getCancelURL() 1274 rt, query_string = url.split('?') 1275 self.failUnlessEqual(self.request.return_to, rt) 1276 query = dict(cgi.parse_qsl(query_string)) 1277 self.failUnlessEqual(query, {'openid.mode':'cancel', 1278 'openid.ns':OPENID2_NS})
1279
1280 - def test_getCancelURLimmed(self):
1281 self.request.mode = 'checkid_immediate' 1282 self.request.immediate = True 1283 self.failUnlessRaises(ValueError, self.request.getCancelURL)
1284 1285 1286
1287 -class TestCheckIDExtension(unittest.TestCase):
1288
1289 - def setUp(self):
1290 self.op_endpoint = 'http://endpoint.unittest/ext' 1291 self.store = memstore.MemoryStore() 1292 self.server = server.Server(self.store, self.op_endpoint) 1293 self.request = server.CheckIDRequest( 1294 identity = 'http://bambam.unittest/', 1295 trust_root = 'http://bar.unittest/', 1296 return_to = 'http://bar.unittest/999', 1297 immediate = False, 1298 op_endpoint = self.server.op_endpoint, 1299 ) 1300 self.request.message = Message(OPENID2_NS) 1301 self.response = server.OpenIDResponse(self.request) 1302 self.response.fields.setArg(OPENID_NS, 'mode', 'id_res') 1303 self.response.fields.setArg(OPENID_NS, 'blue', 'star')
1304 1305
1306 - def test_addField(self):
1307 namespace = 'something:' 1308 self.response.fields.setArg(namespace, 'bright', 'potato') 1309 self.failUnlessEqual(self.response.fields.getArgs(OPENID_NS), 1310 {'blue': 'star', 1311 'mode': 'id_res', 1312 }) 1313 1314 self.failUnlessEqual(self.response.fields.getArgs(namespace), 1315 {'bright':'potato'})
1316 1317
1318 - def test_addFields(self):
1319 namespace = 'mi5:' 1320 args = {'tangy': 'suspenders', 1321 'bravo': 'inclusion'} 1322 self.response.fields.updateArgs(namespace, args) 1323 self.failUnlessEqual(self.response.fields.getArgs(OPENID_NS), 1324 {'blue': 'star', 1325 'mode': 'id_res', 1326 }) 1327 self.failUnlessEqual(self.response.fields.getArgs(namespace), args)
1328 1329 1330
1331 -class MockSignatory(object):
1332 isValid = True 1333
1334 - def __init__(self, assoc):
1335 self.assocs = [assoc]
1336
1337 - def verify(self, assoc_handle, message):
1338 assert message.hasKey(OPENID_NS, "sig") 1339 if (True, assoc_handle) in self.assocs: 1340 return self.isValid 1341 else: 1342 return False
1343
1344 - def getAssociation(self, assoc_handle, dumb):
1345 if (dumb, assoc_handle) in self.assocs: 1346 # This isn't a valid implementation for many uses of this 1347 # function, mind you. 1348 return True 1349 else: 1350 return None
1351
1352 - def invalidate(self, assoc_handle, dumb):
1353 if (dumb, assoc_handle) in self.assocs: 1354 self.assocs.remove((dumb, assoc_handle))
1355 1356
1357 -class TestCheckAuth(unittest.TestCase):
1358 - def setUp(self):
1359 self.assoc_handle = 'mooooooooo' 1360 self.message = Message.fromPostArgs({ 1361 'openid.sig': 'signarture', 1362 'one': 'alpha', 1363 'two': 'beta', 1364 }) 1365 self.request = server.CheckAuthRequest( 1366 self.assoc_handle, self.message) 1367 1368 self.signatory = MockSignatory((True, self.assoc_handle))
1369
1370 - def test_valid(self):
1371 r = self.request.answer(self.signatory) 1372 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'}) 1373 self.failUnlessEqual(r.request, self.request)
1374
1375 - def test_invalid(self):
1376 self.signatory.isValid = False 1377 r = self.request.answer(self.signatory) 1378 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), 1379 {'is_valid': 'false'})
1380
1381 - def test_replay(self):
1382 """Don't validate the same response twice. 1383 1384 From "Checking the Nonce":: 1385 1386 When using "check_authentication", the OP MUST ensure that an 1387 assertion has not yet been accepted with the same value for 1388 "openid.response_nonce". 1389 1390 In this implementation, the assoc_handle is only valid once. And 1391 nonces are a signed component of the message, so they can't be used 1392 with another handle without breaking the sig. 1393 """ 1394 r = self.request.answer(self.signatory) 1395 r = self.request.answer(self.signatory) 1396 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), 1397 {'is_valid': 'false'})
1398
1399 - def test_invalidatehandle(self):
1400 self.request.invalidate_handle = "bogusHandle" 1401 r = self.request.answer(self.signatory) 1402 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), 1403 {'is_valid': 'true', 1404 'invalidate_handle': "bogusHandle"}) 1405 self.failUnlessEqual(r.request, self.request)
1406
1407 - def test_invalidatehandleNo(self):
1408 assoc_handle = 'goodhandle' 1409 self.signatory.assocs.append((False, 'goodhandle')) 1410 self.request.invalidate_handle = assoc_handle 1411 r = self.request.answer(self.signatory) 1412 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'})
1413 1414
1415 -class TestAssociate(unittest.TestCase):
1416 # TODO: test DH with non-default values for modulus and gen. 1417 # (important to do because we actually had it broken for a while.) 1418
1419 - def setUp(self):
1420 self.request = server.AssociateRequest.fromMessage( 1421 Message.fromPostArgs({})) 1422 self.store = memstore.MemoryStore() 1423 self.signatory = server.Signatory(self.store)
1424
1425 - def test_dhSHA1(self):
1426 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1') 1427 from openid.dh import DiffieHellman 1428 from openid.server.server import DiffieHellmanSHA1ServerSession 1429 consumer_dh = DiffieHellman.fromDefaults() 1430 cpub = consumer_dh.public 1431 server_dh = DiffieHellman.fromDefaults() 1432 session = DiffieHellmanSHA1ServerSession(server_dh, cpub) 1433 self.request = server.AssociateRequest(session, 'HMAC-SHA1') 1434 response = self.request.answer(self.assoc) 1435 rfg = lambda f: response.fields.getArg(OPENID_NS, f) 1436 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 1437 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 1438 self.failIf(rfg("mac_key")) 1439 self.failUnlessEqual(rfg("session_type"), "DH-SHA1") 1440 self.failUnless(rfg("enc_mac_key")) 1441 self.failUnless(rfg("dh_server_public")) 1442 1443 enc_key = rfg("enc_mac_key").decode('base64') 1444 spub = cryptutil.base64ToLong(rfg("dh_server_public")) 1445 secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha1) 1446 self.failUnlessEqual(secret, self.assoc.secret)
1447 1448 1449 if not cryptutil.SHA256_AVAILABLE: 1450 warnings.warn("Not running SHA256 tests.") 1451 else:
1452 - def test_dhSHA256(self):
1453 self.assoc = self.signatory.createAssociation( 1454 dumb=False, assoc_type='HMAC-SHA256') 1455 from openid.dh import DiffieHellman 1456 from openid.server.server import DiffieHellmanSHA256ServerSession 1457 consumer_dh = DiffieHellman.fromDefaults() 1458 cpub = consumer_dh.public 1459 server_dh = DiffieHellman.fromDefaults() 1460 session = DiffieHellmanSHA256ServerSession(server_dh, cpub) 1461 self.request = server.AssociateRequest(session, 'HMAC-SHA256') 1462 response = self.request.answer(self.assoc) 1463 rfg = lambda f: response.fields.getArg(OPENID_NS, f) 1464 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA256") 1465 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 1466 self.failIf(rfg("mac_key")) 1467 self.failUnlessEqual(rfg("session_type"), "DH-SHA256") 1468 self.failUnless(rfg("enc_mac_key")) 1469 self.failUnless(rfg("dh_server_public")) 1470 1471 enc_key = rfg("enc_mac_key").decode('base64') 1472 spub = cryptutil.base64ToLong(rfg("dh_server_public")) 1473 secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha256) 1474 self.failUnlessEqual(secret, self.assoc.secret)
1475
1476 - def test_protoError256(self):
1477 from openid.consumer.consumer import \ 1478 DiffieHellmanSHA256ConsumerSession 1479 1480 s256_session = DiffieHellmanSHA256ConsumerSession() 1481 1482 invalid_s256 = {'openid.assoc_type':'HMAC-SHA1', 1483 'openid.session_type':'DH-SHA256',} 1484 invalid_s256.update(s256_session.getRequest()) 1485 1486 invalid_s256_2 = {'openid.assoc_type':'MONKEY-PIRATE', 1487 'openid.session_type':'DH-SHA256',} 1488 invalid_s256_2.update(s256_session.getRequest()) 1489 1490 bad_request_argss = [ 1491 invalid_s256, 1492 invalid_s256_2, 1493 ] 1494 1495 for request_args in bad_request_argss: 1496 message = Message.fromPostArgs(request_args) 1497 self.failUnlessRaises(server.ProtocolError, 1498 server.AssociateRequest.fromMessage, 1499 message)
1500
1501 - def test_protoError(self):
1502 from openid.consumer.consumer import DiffieHellmanSHA1ConsumerSession 1503 1504 s1_session = DiffieHellmanSHA1ConsumerSession() 1505 1506 invalid_s1 = {'openid.assoc_type':'HMAC-SHA256', 1507 'openid.session_type':'DH-SHA1',} 1508 invalid_s1.update(s1_session.getRequest()) 1509 1510 invalid_s1_2 = {'openid.assoc_type':'ROBOT-NINJA', 1511 'openid.session_type':'DH-SHA1',} 1512 invalid_s1_2.update(s1_session.getRequest()) 1513 1514 bad_request_argss = [ 1515 {'openid.assoc_type':'Wha?'}, 1516 invalid_s1, 1517 invalid_s1_2, 1518 ] 1519 1520 for request_args in bad_request_argss: 1521 message = Message.fromPostArgs(request_args) 1522 self.failUnlessRaises(server.ProtocolError, 1523 server.AssociateRequest.fromMessage, 1524 message)
1525
1526 - def test_protoErrorFields(self):
1527 1528 contact = 'user@example.invalid' 1529 reference = 'Trac ticket number MAX_INT' 1530 error = 'poltergeist' 1531 1532 openid1_args = { 1533 'openid.identitiy': 'invalid', 1534 'openid.mode': 'checkid_setup', 1535 } 1536 1537 openid2_args = dict(openid1_args) 1538 openid2_args.update({'openid.ns': OPENID2_NS}) 1539 1540 # Check presence of optional fields in both protocol versions 1541 1542 openid1_msg = Message.fromPostArgs(openid1_args) 1543 p = server.ProtocolError(openid1_msg, error, 1544 contact=contact, reference=reference) 1545 reply = p.toMessage() 1546 1547 self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference) 1548 self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact) 1549 1550 openid2_msg = Message.fromPostArgs(openid2_args) 1551 p = server.ProtocolError(openid2_msg, error, 1552 contact=contact, reference=reference) 1553 reply = p.toMessage() 1554 1555 self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference) 1556 self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact)
1557
1558 - def failUnlessExpiresInMatches(self, msg, expected_expires_in):
1559 expires_in_str = msg.getArg(OPENID_NS, 'expires_in', no_default) 1560 expires_in = int(expires_in_str) 1561 1562 # Slop is necessary because the tests can sometimes get run 1563 # right on a second boundary 1564 slop = 1 # second 1565 difference = expected_expires_in - expires_in 1566 1567 error_message = ('"expires_in" value not within %s of expected: ' 1568 'expected=%s, actual=%s' % 1569 (slop, expected_expires_in, expires_in)) 1570 self.failUnless(0 <= difference <= slop, error_message)
1571
1572 - def test_plaintext(self):
1573 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1') 1574 response = self.request.answer(self.assoc) 1575 rfg = lambda f: response.fields.getArg(OPENID_NS, f) 1576 1577 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 1578 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 1579 1580 self.failUnlessExpiresInMatches( 1581 response.fields, self.signatory.SECRET_LIFETIME) 1582 1583 self.failUnlessEqual( 1584 rfg("mac_key"), oidutil.toBase64(self.assoc.secret)) 1585 self.failIf(rfg("session_type")) 1586 self.failIf(rfg("enc_mac_key")) 1587 self.failIf(rfg("dh_server_public"))
1588
1589 - def test_plaintext_v2(self):
1590 # The main difference between this and the v1 test is that 1591 # session_type is always returned in v2. 1592 args = { 1593 'openid.ns': OPENID2_NS, 1594 'openid.mode': 'associate', 1595 'openid.assoc_type': 'HMAC-SHA1', 1596 'openid.session_type': 'no-encryption', 1597 } 1598 self.request = server.AssociateRequest.fromMessage( 1599 Message.fromPostArgs(args)) 1600 1601 self.failIf(self.request.message.isOpenID1()) 1602 1603 self.assoc = self.signatory.createAssociation( 1604 dumb=False, assoc_type='HMAC-SHA1') 1605 response = self.request.answer(self.assoc) 1606 rfg = lambda f: response.fields.getArg(OPENID_NS, f) 1607 1608 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 1609 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 1610 1611 self.failUnlessExpiresInMatches( 1612 response.fields, self.signatory.SECRET_LIFETIME) 1613 1614 self.failUnlessEqual( 1615 rfg("mac_key"), oidutil.toBase64(self.assoc.secret)) 1616 1617 self.failUnlessEqual(rfg("session_type"), "no-encryption") 1618 self.failIf(rfg("enc_mac_key")) 1619 self.failIf(rfg("dh_server_public"))
1620
1621 - def test_plaintext256(self):
1622 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA256') 1623 response = self.request.answer(self.assoc) 1624 rfg = lambda f: response.fields.getArg(OPENID_NS, f) 1625 1626 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 1627 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 1628 1629 self.failUnlessExpiresInMatches( 1630 response.fields, self.signatory.SECRET_LIFETIME) 1631 1632 self.failUnlessEqual( 1633 rfg("mac_key"), oidutil.toBase64(self.assoc.secret)) 1634 self.failIf(rfg("session_type")) 1635 self.failIf(rfg("enc_mac_key")) 1636 self.failIf(rfg("dh_server_public"))
1637
1638 - def test_unsupportedPrefer(self):
1639 allowed_assoc = 'COLD-PET-RAT' 1640 allowed_sess = 'FROG-BONES' 1641 message = 'This is a unit test' 1642 1643 # Set an OpenID 2 message so answerUnsupported doesn't raise 1644 # ProtocolError. 1645 self.request.message = Message(OPENID2_NS) 1646 1647 response = self.request.answerUnsupported( 1648 message=message, 1649 preferred_session_type=allowed_sess, 1650 preferred_association_type=allowed_assoc, 1651 ) 1652 rfg = lambda f: response.fields.getArg(OPENID_NS, f) 1653 self.failUnlessEqual(rfg('error_code'), 'unsupported-type') 1654 self.failUnlessEqual(rfg('assoc_type'), allowed_assoc) 1655 self.failUnlessEqual(rfg('error'), message) 1656 self.failUnlessEqual(rfg('session_type'), allowed_sess)
1657
1658 - def test_unsupported(self):
1659 message = 'This is a unit test' 1660 1661 # Set an OpenID 2 message so answerUnsupported doesn't raise 1662 # ProtocolError. 1663 self.request.message = Message(OPENID2_NS) 1664 1665 response = self.request.answerUnsupported(message) 1666 rfg = lambda f: response.fields.getArg(OPENID_NS, f) 1667 self.failUnlessEqual(rfg('error_code'), 'unsupported-type') 1668 self.failUnlessEqual(rfg('assoc_type'), None) 1669 self.failUnlessEqual(rfg('error'), message) 1670 self.failUnlessEqual(rfg('session_type'), None)
1671
1672 -class Counter(object):
1673 - def __init__(self):
1674 self.count = 0
1675
1676 - def inc(self):
1677 self.count += 1
1678
1679 -class TestServer(unittest.TestCase, CatchLogs):
1680 - def setUp(self):
1681 self.store = memstore.MemoryStore() 1682 self.server = server.Server(self.store, "http://server.unittest/endpt") 1683 CatchLogs.setUp(self)
1684
1685 - def test_dispatch(self):
1686 monkeycalled = Counter() 1687 def monkeyDo(request): 1688 monkeycalled.inc() 1689 r = server.OpenIDResponse(request) 1690 return r
1691 self.server.openid_monkeymode = monkeyDo 1692 request = server.OpenIDRequest() 1693 request.mode = "monkeymode" 1694 request.namespace = OPENID1_NS 1695 webresult = self.server.handleRequest(request) 1696 self.failUnlessEqual(monkeycalled.count, 1)
1697
1698 - def test_associate(self):
1699 request = server.AssociateRequest.fromMessage(Message.fromPostArgs({})) 1700 response = self.server.openid_associate(request) 1701 self.failUnless(response.fields.hasKey(OPENID_NS, "assoc_handle"), 1702 "No assoc_handle here: %s" % (response.fields,))
1703
1704 - def test_associate2(self):
1705 """Associate when the server has no allowed association types 1706 1707 Gives back an error with error_code and no fallback session or 1708 assoc types.""" 1709 self.server.negotiator.setAllowedTypes([]) 1710 1711 # Set an OpenID 2 message so answerUnsupported doesn't raise 1712 # ProtocolError. 1713 msg = Message.fromPostArgs({ 1714 'openid.ns': OPENID2_NS, 1715 'openid.session_type': 'no-encryption', 1716 }) 1717 1718 request = server.AssociateRequest.fromMessage(msg) 1719 1720 response = self.server.openid_associate(request) 1721 self.failUnless(response.fields.hasKey(OPENID_NS, "error")) 1722 self.failUnless(response.fields.hasKey(OPENID_NS, "error_code")) 1723 self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle")) 1724 self.failIf(response.fields.hasKey(OPENID_NS, "assoc_type")) 1725 self.failIf(response.fields.hasKey(OPENID_NS, "session_type"))
1726
1727 - def test_associate3(self):
1728 """Request an assoc type that is not supported when there are 1729 supported types. 1730 1731 Should give back an error message with a fallback type. 1732 """ 1733 self.server.negotiator.setAllowedTypes([('HMAC-SHA256', 'DH-SHA256')]) 1734 1735 msg = Message.fromPostArgs({ 1736 'openid.ns': OPENID2_NS, 1737 'openid.session_type': 'no-encryption', 1738 }) 1739 1740 request = server.AssociateRequest.fromMessage(msg) 1741 response = self.server.openid_associate(request) 1742 1743 self.failUnless(response.fields.hasKey(OPENID_NS, "error")) 1744 self.failUnless(response.fields.hasKey(OPENID_NS, "error_code")) 1745 self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle")) 1746 self.failUnlessEqual(response.fields.getArg(OPENID_NS, "assoc_type"), 1747 'HMAC-SHA256') 1748 self.failUnlessEqual(response.fields.getArg(OPENID_NS, "session_type"), 1749 'DH-SHA256')
1750 1751 if not cryptutil.SHA256_AVAILABLE: 1752 warnings.warn("Not running SHA256 tests.") 1753 else:
1754 - def test_associate4(self):
1755 """DH-SHA256 association session""" 1756 self.server.negotiator.setAllowedTypes( 1757 [('HMAC-SHA256', 'DH-SHA256')]) 1758 query = { 1759 'openid.dh_consumer_public': 1760 'ALZgnx8N5Lgd7pCj8K86T/DDMFjJXSss1SKoLmxE72kJTzOtG6I2PaYrHX' 1761 'xku4jMQWSsGfLJxwCZ6280uYjUST/9NWmuAfcrBfmDHIBc3H8xh6RBnlXJ' 1762 '1WxJY3jHd5k1/ZReyRZOxZTKdF/dnIqwF8ZXUwI6peV0TyS/K1fOfF/s', 1763 'openid.assoc_type': 'HMAC-SHA256', 1764 'openid.session_type': 'DH-SHA256', 1765 } 1766 message = Message.fromPostArgs(query) 1767 request = server.AssociateRequest.fromMessage(message) 1768 response = self.server.openid_associate(request) 1769 self.failUnless(response.fields.hasKey(OPENID_NS, "assoc_handle"))
1770
1771 - def test_missingSessionTypeOpenID2(self):
1772 """Make sure session_type is required in OpenID 2""" 1773 msg = Message.fromPostArgs({ 1774 'openid.ns': OPENID2_NS, 1775 }) 1776 1777 self.assertRaises(server.ProtocolError, 1778 server.AssociateRequest.fromMessage, msg)
1779
1780 - def test_checkAuth(self):
1781 request = server.CheckAuthRequest('arrrrrf', '0x3999', []) 1782 response = self.server.openid_check_authentication(request) 1783 self.failUnless(response.fields.hasKey(OPENID_NS, "is_valid"))
1784
1785 -class TestSignatory(unittest.TestCase, CatchLogs):
1786 - def setUp(self):
1787 self.store = memstore.MemoryStore() 1788 self.signatory = server.Signatory(self.store) 1789 self._dumb_key = self.signatory._dumb_key 1790 self._normal_key = self.signatory._normal_key 1791 CatchLogs.setUp(self)
1792
1793 - def test_sign(self):
1794 request = server.OpenIDRequest() 1795 assoc_handle = '{assoc}{lookatme}' 1796 self.store.storeAssociation( 1797 self._normal_key, 1798 association.Association.fromExpiresIn(60, assoc_handle, 1799 'sekrit', 'HMAC-SHA1')) 1800 request.assoc_handle = assoc_handle 1801 request.namespace = OPENID1_NS 1802 response = server.OpenIDResponse(request) 1803 response.fields = Message.fromOpenIDArgs({ 1804 'foo': 'amsigned', 1805 'bar': 'notsigned', 1806 'azu': 'alsosigned', 1807 }) 1808 sresponse = self.signatory.sign(response) 1809 self.failUnlessEqual( 1810 sresponse.fields.getArg(OPENID_NS, 'assoc_handle'), 1811 assoc_handle) 1812 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'), 1813 'assoc_handle,azu,bar,foo,signed') 1814 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 1815 self.failIf(self.messages, self.messages)
1816
1817 - def test_signDumb(self):
1818 request = server.OpenIDRequest() 1819 request.assoc_handle = None 1820 request.namespace = OPENID2_NS 1821 response = server.OpenIDResponse(request) 1822 response.fields = Message.fromOpenIDArgs({ 1823 'foo': 'amsigned', 1824 'bar': 'notsigned', 1825 'azu': 'alsosigned', 1826 'ns':OPENID2_NS, 1827 }) 1828 sresponse = self.signatory.sign(response) 1829 assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle') 1830 self.failUnless(assoc_handle) 1831 assoc = self.signatory.getAssociation(assoc_handle, dumb=True) 1832 self.failUnless(assoc) 1833 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'), 1834 'assoc_handle,azu,bar,foo,ns,signed') 1835 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 1836 self.failIf(self.messages, self.messages)
1837
1838 - def test_signExpired(self):
1839 """Sign a response to a message with an expired handle (using invalidate_handle). 1840 1841 From "Verifying with an Association":: 1842 1843 If an authentication request included an association handle for an 1844 association between the OP and the Relying party, and the OP no 1845 longer wishes to use that handle (because it has expired or the 1846 secret has been compromised, for instance), the OP will send a 1847 response that must be verified directly with the OP, as specified 1848 in Section 11.3.2. In that instance, the OP will include the field 1849 "openid.invalidate_handle" set to the association handle that the 1850 Relying Party included with the original request. 1851 """ 1852 request = server.OpenIDRequest() 1853 request.namespace = OPENID2_NS 1854 assoc_handle = '{assoc}{lookatme}' 1855 self.store.storeAssociation( 1856 self._normal_key, 1857 association.Association.fromExpiresIn(-10, assoc_handle, 1858 'sekrit', 'HMAC-SHA1')) 1859 self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle)) 1860 1861 request.assoc_handle = assoc_handle 1862 response = server.OpenIDResponse(request) 1863 response.fields = Message.fromOpenIDArgs({ 1864 'foo': 'amsigned', 1865 'bar': 'notsigned', 1866 'azu': 'alsosigned', 1867 }) 1868 sresponse = self.signatory.sign(response) 1869 1870 new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle') 1871 self.failUnless(new_assoc_handle) 1872 self.failIfEqual(new_assoc_handle, assoc_handle) 1873 1874 self.failUnlessEqual( 1875 sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'), 1876 assoc_handle) 1877 1878 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'), 1879 'assoc_handle,azu,bar,foo,invalidate_handle,signed') 1880 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 1881 1882 # make sure the expired association is gone 1883 self.failIf(self.store.getAssociation(self._normal_key, assoc_handle), 1884 "expired association is still retrievable.") 1885 1886 # make sure the new key is a dumb mode association 1887 self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle)) 1888 self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle)) 1889 self.failUnless(self.messages)
1890 1891
1892 - def test_signInvalidHandle(self):
1893 request = server.OpenIDRequest() 1894 request.namespace = OPENID2_NS 1895 assoc_handle = '{bogus-assoc}{notvalid}' 1896 1897 request.assoc_handle = assoc_handle 1898 response = server.OpenIDResponse(request) 1899 response.fields = Message.fromOpenIDArgs({ 1900 'foo': 'amsigned', 1901 'bar': 'notsigned', 1902 'azu': 'alsosigned', 1903 }) 1904 sresponse = self.signatory.sign(response) 1905 1906 new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle') 1907 self.failUnless(new_assoc_handle) 1908 self.failIfEqual(new_assoc_handle, assoc_handle) 1909 1910 self.failUnlessEqual( 1911 sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'), 1912 assoc_handle) 1913 1914 self.failUnlessEqual( 1915 sresponse.fields.getArg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,invalidate_handle,signed') 1916 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 1917 1918 # make sure the new key is a dumb mode association 1919 self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle)) 1920 self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle)) 1921 self.failIf(self.messages, self.messages)
1922 1923
1924 - def test_verify(self):
1925 assoc_handle = '{vroom}{zoom}' 1926 assoc = association.Association.fromExpiresIn( 1927 60, assoc_handle, 'sekrit', 'HMAC-SHA1') 1928 1929 self.store.storeAssociation(self._dumb_key, assoc) 1930 1931 signed = Message.fromPostArgs({ 1932 'openid.foo': 'bar', 1933 'openid.apple': 'orange', 1934 'openid.assoc_handle': assoc_handle, 1935 'openid.signed': 'apple,assoc_handle,foo,signed', 1936 'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco=', 1937 }) 1938 1939 verified = self.signatory.verify(assoc_handle, signed) 1940 self.failIf(self.messages, self.messages) 1941 self.failUnless(verified)
1942 1943
1944 - def test_verifyBadSig(self):
1945 assoc_handle = '{vroom}{zoom}' 1946 assoc = association.Association.fromExpiresIn( 1947 60, assoc_handle, 'sekrit', 'HMAC-SHA1') 1948 1949 self.store.storeAssociation(self._dumb_key, assoc) 1950 1951 signed = Message.fromPostArgs({ 1952 'openid.foo': 'bar', 1953 'openid.apple': 'orange', 1954 'openid.assoc_handle': assoc_handle, 1955 'openid.signed': 'apple,assoc_handle,foo,signed', 1956 'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco='.encode('rot13'), 1957 }) 1958 1959 verified = self.signatory.verify(assoc_handle, signed) 1960 self.failIf(self.messages, self.messages) 1961 self.failIf(verified)
1962
1963 - def test_verifyBadHandle(self):
1964 assoc_handle = '{vroom}{zoom}' 1965 signed = Message.fromPostArgs({ 1966 'foo': 'bar', 1967 'apple': 'orange', 1968 'openid.sig': "Ylu0KcIR7PvNegB/K41KpnRgJl0=", 1969 }) 1970 1971 verified = self.signatory.verify(assoc_handle, signed) 1972 self.failIf(verified) 1973 self.failUnless(self.messages)
1974 1975
1976 - def test_verifyAssocMismatch(self):
1977 """Attempt to validate sign-all message with a signed-list assoc.""" 1978 assoc_handle = '{vroom}{zoom}' 1979 assoc = association.Association.fromExpiresIn( 1980 60, assoc_handle, 'sekrit', 'HMAC-SHA1') 1981 1982 self.store.storeAssociation(self._dumb_key, assoc) 1983 1984 signed = Message.fromPostArgs({ 1985 'foo': 'bar', 1986 'apple': 'orange', 1987 'openid.sig': "d71xlHtqnq98DonoSgoK/nD+QRM=", 1988 }) 1989 1990 verified = self.signatory.verify(assoc_handle, signed) 1991 self.failIf(verified) 1992 self.failUnless(self.messages)
1993
1994 - def test_getAssoc(self):
1995 assoc_handle = self.makeAssoc(dumb=True) 1996 assoc = self.signatory.getAssociation(assoc_handle, True) 1997 self.failUnless(assoc) 1998 self.failUnlessEqual(assoc.handle, assoc_handle) 1999 self.failIf(self.messages, self.messages)
2000
2001 - def test_getAssocExpired(self):
2002 assoc_handle = self.makeAssoc(dumb=True, lifetime=-10) 2003 assoc = self.signatory.getAssociation(assoc_handle, True) 2004 self.failIf(assoc, assoc) 2005 self.failUnless(self.messages)
2006
2007 - def test_getAssocInvalid(self):
2008 ah = 'no-such-handle' 2009 self.failUnlessEqual( 2010 self.signatory.getAssociation(ah, dumb=False), None) 2011 self.failIf(self.messages, self.messages)
2012
2013 - def test_getAssocDumbVsNormal(self):
2014 """getAssociation(dumb=False) cannot get a dumb assoc""" 2015 assoc_handle = self.makeAssoc(dumb=True) 2016 self.failUnlessEqual( 2017 self.signatory.getAssociation(assoc_handle, dumb=False), None) 2018 self.failIf(self.messages, self.messages)
2019
2020 - def test_getAssocNormalVsDumb(self):
2021 """getAssociation(dumb=True) cannot get a shared assoc 2022 2023 From "Verifying Directly with the OpenID Provider":: 2024 2025 An OP MUST NOT verify signatures for associations that have shared 2026 MAC keys. 2027 """ 2028 assoc_handle = self.makeAssoc(dumb=False) 2029 self.failUnlessEqual( 2030 self.signatory.getAssociation(assoc_handle, dumb=True), None) 2031 self.failIf(self.messages, self.messages)
2032
2033 - def test_createAssociation(self):
2034 assoc = self.signatory.createAssociation(dumb=False) 2035 self.failUnless(self.signatory.getAssociation(assoc.handle, dumb=False)) 2036 self.failIf(self.messages, self.messages)
2037
2038 - def makeAssoc(self, dumb, lifetime=60):
2039 assoc_handle = '{bling}' 2040 assoc = association.Association.fromExpiresIn(lifetime, assoc_handle, 2041 'sekrit', 'HMAC-SHA1') 2042 2043 self.store.storeAssociation((dumb and self._dumb_key) or self._normal_key, assoc) 2044 return assoc_handle
2045
2046 - def test_invalidate(self):
2047 assoc_handle = '-squash-' 2048 assoc = association.Association.fromExpiresIn(60, assoc_handle, 2049 'sekrit', 'HMAC-SHA1') 2050 2051 self.store.storeAssociation(self._dumb_key, assoc) 2052 assoc = self.signatory.getAssociation(assoc_handle, dumb=True) 2053 self.failUnless(assoc) 2054 assoc = self.signatory.getAssociation(assoc_handle, dumb=True) 2055 self.failUnless(assoc) 2056 self.signatory.invalidate(assoc_handle, dumb=True) 2057 assoc = self.signatory.getAssociation(assoc_handle, dumb=True) 2058 self.failIf(assoc) 2059 self.failIf(self.messages, self.messages)
2060 2061 2062 2063 if __name__ == '__main__': 2064 unittest.main() 2065