1  """Tests for openid.server. 
   2  """ 
   3  from openid.server import server 
   4  from openid import association, cryptutil, oidutil 
   5  from openid.message import Message, OPENID_NS, OPENID2_NS, OPENID1_NS, \ 
   6       IDENTIFIER_SELECT, no_default, OPENID1_URL_LIMIT 
   7  from openid.store import memstore 
   8  import cgi 
   9   
  10  import unittest 
  11  import warnings 
  12   
  13  from urlparse import urlparse 
  14   
  15   
  16   
  17   
  18   
  19   
  20   
  21  ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7 
  22  ALT_GEN = 5 
  23   
  35   
  38          return_to = "http://rp.unittest/consumer" 
  39           
  40          args = Message.fromPostArgs({ 
  41              'openid.mode': 'monkeydance', 
  42              'openid.identity': 'http://wagu.unittest/', 
  43              'openid.return_to': return_to, 
  44              }) 
  45          e = server.ProtocolError(args, "plucky") 
  46          self.failUnless(e.hasReturnTo()) 
  47          expected_args = { 
  48              'openid.mode': ['error'], 
  49              'openid.error': ['plucky'], 
  50              } 
  51   
  52          rt_base, result_args = e.encodeToURL().split('?', 1) 
  53          result_args = cgi.parse_qs(result_args) 
  54          self.failUnlessEqual(result_args, expected_args) 
   55   
  57          return_to = "http://rp.unittest/consumer" 
  58           
  59          args = Message.fromPostArgs({ 
  60              'openid.ns': OPENID2_NS, 
  61              'openid.mode': 'monkeydance', 
  62              'openid.identity': 'http://wagu.unittest/', 
  63              'openid.claimed_id': 'http://wagu.unittest/', 
  64              'openid.return_to': return_to, 
  65              }) 
  66          e = server.ProtocolError(args, "plucky") 
  67          self.failUnless(e.hasReturnTo()) 
  68          expected_args = { 
  69              'openid.ns': [OPENID2_NS], 
  70              'openid.mode': ['error'], 
  71              'openid.error': ['plucky'], 
  72              } 
  73   
  74          rt_base, result_args = e.encodeToURL().split('?', 1) 
  75          result_args = cgi.parse_qs(result_args) 
  76          self.failUnlessEqual(result_args, expected_args) 
   77   
  79          return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT) 
  80           
  81          args = Message.fromPostArgs({ 
  82              'openid.ns': OPENID2_NS, 
  83              'openid.mode': 'monkeydance', 
  84              'openid.identity': 'http://wagu.unittest/', 
  85              'openid.claimed_id': 'http://wagu.unittest/', 
  86              'openid.return_to': return_to, 
  87              }) 
  88          e = server.ProtocolError(args, "plucky") 
  89          self.failUnless(e.hasReturnTo()) 
  90          expected_args = { 
  91              'openid.ns': [OPENID2_NS], 
  92              'openid.mode': ['error'], 
  93              'openid.error': ['plucky'], 
  94              } 
  95   
  96          self.failUnless(e.whichEncoding() == server.ENCODE_HTML_FORM) 
  97          self.failUnless(e.toFormMarkup() == e.toMessage().toFormMarkup( 
  98              args.getArg(OPENID_NS, 'return_to'))) 
   99   
 101          return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT) 
 102           
 103          args = Message.fromPostArgs({ 
 104              'openid.mode': 'monkeydance', 
 105              'openid.identity': 'http://wagu.unittest/', 
 106              'openid.return_to': return_to, 
 107              }) 
 108          e = server.ProtocolError(args, "plucky") 
 109          self.failUnless(e.hasReturnTo()) 
 110          expected_args = { 
 111              'openid.mode': ['error'], 
 112              'openid.error': ['plucky'], 
 113              } 
 114   
 115          self.failUnless(e.whichEncoding() == server.ENCODE_URL) 
 116   
 117          rt_base, result_args = e.encodeToURL().split('?', 1) 
 118          result_args = cgi.parse_qs(result_args) 
 119          self.failUnlessEqual(result_args, expected_args) 
  120   
 133   
 134   
  139   
 140   
 153   
 155          args = {} 
 156          r = self.decode(args) 
 157          self.failUnlessEqual(r, None) 
  158   
 160          args = { 
 161              'pony': 'spotted', 
 162              'sreg.mutant_power': 'decaffinator', 
 163              } 
 164          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  165   
 167          args = { 
 168              'openid.mode': 'twos-compliment', 
 169              'openid.pants': 'zippered', 
 170              } 
 171          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  172   
 174          args = { 
 175              'openid.mode': ['checkid_setup'], 
 176              'openid.identity': self.id_url, 
 177              'openid.assoc_handle': self.assoc_handle, 
 178              'openid.return_to': self.rt_url, 
 179              'openid.trust_root': self.tr_url, 
 180              } 
 181          try: 
 182              result = self.decode(args) 
 183          except TypeError, err: 
 184              self.failUnless(str(err).find('values') != -1, err) 
 185          else: 
 186              self.fail("Expected TypeError, but got result %s" % (result,)) 
  187   
 206   
 208          args = { 
 209              'openid.mode': 'checkid_setup', 
 210              'openid.identity': self.id_url, 
 211              'openid.assoc_handle': self.assoc_handle, 
 212              'openid.return_to': self.rt_url, 
 213              'openid.trust_root': self.tr_url, 
 214              } 
 215          r = self.decode(args) 
 216          self.failUnless(isinstance(r, server.CheckIDRequest)) 
 217          self.failUnlessEqual(r.mode, "checkid_setup") 
 218          self.failUnlessEqual(r.immediate, False) 
 219          self.failUnlessEqual(r.identity, self.id_url) 
 220          self.failUnlessEqual(r.trust_root, self.tr_url) 
 221          self.failUnlessEqual(r.return_to, self.rt_url) 
  222   
 224          args = { 
 225              'openid.ns': OPENID2_NS, 
 226              'openid.mode': 'checkid_setup', 
 227              'openid.identity': self.id_url, 
 228              'openid.claimed_id': self.claimed_id, 
 229              'openid.assoc_handle': self.assoc_handle, 
 230              'openid.return_to': self.rt_url, 
 231              'openid.realm': self.tr_url, 
 232              } 
 233          r = self.decode(args) 
 234          self.failUnless(isinstance(r, server.CheckIDRequest)) 
 235          self.failUnlessEqual(r.mode, "checkid_setup") 
 236          self.failUnlessEqual(r.immediate, False) 
 237          self.failUnlessEqual(r.identity, self.id_url) 
 238          self.failUnlessEqual(r.claimed_id, self.claimed_id) 
 239          self.failUnlessEqual(r.trust_root, self.tr_url) 
 240          self.failUnlessEqual(r.return_to, self.rt_url) 
  241   
 243          args = { 
 244              'openid.ns': OPENID2_NS, 
 245              'openid.mode': 'checkid_setup', 
 246              'openid.identity': self.id_url, 
 247              'openid.assoc_handle': self.assoc_handle, 
 248              'openid.return_to': self.rt_url, 
 249              'openid.realm': self.tr_url, 
 250              } 
 251          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  252   
 254          args = { 
 255              'openid.ns': OPENID2_NS, 
 256              'openid.mode': 'checkid_setup', 
 257              'openid.assoc_handle': self.assoc_handle, 
 258              'openid.return_to': self.rt_url, 
 259              'openid.realm': self.tr_url, 
 260              } 
 261          r = self.decode(args) 
 262          self.failUnless(isinstance(r, server.CheckIDRequest)) 
 263          self.failUnlessEqual(r.mode, "checkid_setup") 
 264          self.failUnlessEqual(r.immediate, False) 
 265          self.failUnlessEqual(r.identity, None) 
 266          self.failUnlessEqual(r.trust_root, self.tr_url) 
 267          self.failUnlessEqual(r.return_to, self.rt_url) 
  268   
 270          """Make sure an OpenID 1 request cannot be decoded if it lacks 
 271          a return_to. 
 272          """ 
 273          args = { 
 274              'openid.mode': 'checkid_setup', 
 275              'openid.identity': self.id_url, 
 276              'openid.assoc_handle': self.assoc_handle, 
 277              'openid.trust_root': self.tr_url, 
 278              } 
 279          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  280   
 282          """Make sure an OpenID 2 request with no return_to can be 
 283          decoded, and make sure a response to such a request raises 
 284          NoReturnToError. 
 285          """ 
 286          args = { 
 287              'openid.ns': OPENID2_NS, 
 288              'openid.mode': 'checkid_setup', 
 289              'openid.identity': self.id_url, 
 290              'openid.claimed_id': self.id_url, 
 291              'openid.assoc_handle': self.assoc_handle, 
 292              'openid.realm': self.tr_url, 
 293              } 
 294          self.failUnless(isinstance(self.decode(args), server.CheckIDRequest)) 
 295   
 296          req = self.decode(args) 
 297          self.assertRaises(server.NoReturnToError, req.answer, False) 
 298          self.assertRaises(server.NoReturnToError, req.encodeToURL, 'bogus') 
 299          self.assertRaises(server.NoReturnToError, req.getCancelURL) 
  300   
 302          """Make sure that an OpenID 2 request which lacks return_to 
 303          cannot be decoded if it lacks a realm.  Spec: This value 
 304          (openid.realm) MUST be sent if openid.return_to is omitted. 
 305          """ 
 306          args = { 
 307              'openid.ns': OPENID2_NS, 
 308              'openid.mode': 'checkid_setup', 
 309              'openid.identity': self.id_url, 
 310              'openid.assoc_handle': self.assoc_handle, 
 311              } 
 312          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  313   
 315          args = { 
 316              'openid.mode': 'checkid_setup', 
 317              'openid.identity': self.id_url, 
 318              'openid.assoc_handle': self.assoc_handle, 
 319              'openid.return_to': 'not a url', 
 320              } 
 321          try: 
 322              result = self.decode(args) 
 323          except server.ProtocolError, err: 
 324              self.failUnless(err.openid_message) 
 325          else: 
 326              self.fail("Expected ProtocolError, instead returned with %s" % 
 327                        (result,)) 
  328   
 330          args = { 
 331              'openid.mode': 'checkid_setup', 
 332              'openid.identity': self.id_url, 
 333              'openid.assoc_handle': self.assoc_handle, 
 334              'openid.return_to': self.rt_url, 
 335              'openid.trust_root': 'http://not-the-return-place.unittest/', 
 336              } 
 337          try: 
 338              result = self.decode(args) 
 339          except server.UntrustedReturnURL, err: 
 340              self.failUnless(err.openid_message) 
 341          else: 
 342              self.fail("Expected UntrustedReturnURL, instead returned with %s" % 
 343                        (result,)) 
  344   
 346          args = { 
 347              'openid.mode': 'check_authentication', 
 348              'openid.assoc_handle': '{dumb}{handle}', 
 349              'openid.sig': 'sigblob', 
 350              'openid.signed': 'identity,return_to,response_nonce,mode', 
 351              'openid.identity': 'signedval1', 
 352              'openid.return_to': 'signedval2', 
 353              'openid.response_nonce': 'signedval3', 
 354              'openid.baz': 'unsigned', 
 355              } 
 356          r = self.decode(args) 
 357          self.failUnless(isinstance(r, server.CheckAuthRequest)) 
 358          self.failUnlessEqual(r.mode, 'check_authentication') 
 359          self.failUnlessEqual(r.sig, 'sigblob') 
  360   
 361   
 363          args = { 
 364              'openid.mode': 'check_authentication', 
 365              'openid.assoc_handle': '{dumb}{handle}', 
 366              'openid.signed': 'foo,bar,mode', 
 367              'openid.foo': 'signedval1', 
 368              'openid.bar': 'signedval2', 
 369              'openid.baz': 'unsigned', 
 370              } 
 371          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  372   
 373   
 375          args = { 
 376              'openid.mode': 'check_authentication', 
 377              'openid.assoc_handle': '{dumb}{handle}', 
 378              'openid.invalidate_handle': '[[SMART_handle]]', 
 379              'openid.sig': 'sigblob', 
 380              'openid.signed': 'identity,return_to,response_nonce,mode', 
 381              'openid.identity': 'signedval1', 
 382              'openid.return_to': 'signedval2', 
 383              'openid.response_nonce': 'signedval3', 
 384              'openid.baz': 'unsigned', 
 385              } 
 386          r = self.decode(args) 
 387          self.failUnless(isinstance(r, server.CheckAuthRequest)) 
 388          self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]') 
  389   
 390   
 392          args = { 
 393              'openid.mode': 'associate', 
 394              'openid.session_type': 'DH-SHA1', 
 395              'openid.dh_consumer_public': "Rzup9265tw==", 
 396              } 
 397          r = self.decode(args) 
 398          self.failUnless(isinstance(r, server.AssociateRequest)) 
 399          self.failUnlessEqual(r.mode, "associate") 
 400          self.failUnlessEqual(r.session.session_type, "DH-SHA1") 
 401          self.failUnlessEqual(r.assoc_type, "HMAC-SHA1") 
 402          self.failUnless(r.session.consumer_pubkey) 
  403   
 405          """Trying DH assoc w/o public key""" 
 406          args = { 
 407              'openid.mode': 'associate', 
 408              'openid.session_type': 'DH-SHA1', 
 409              } 
 410           
 411          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  412   
 413   
 415          args = { 
 416              'openid.mode': 'associate', 
 417              'openid.session_type': 'DH-SHA1', 
 418              'openid.dh_consumer_public': "donkeydonkeydonkey", 
 419              } 
 420          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  421   
 422   
 424           
 425          args = { 
 426              'openid.mode': 'associate', 
 427              'openid.session_type': 'DH-SHA1', 
 428              'openid.dh_consumer_public': "Rzup9265tw==", 
 429              'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS), 
 430              'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) , 
 431              } 
 432          r = self.decode(args) 
 433          self.failUnless(isinstance(r, server.AssociateRequest)) 
 434          self.failUnlessEqual(r.mode, "associate") 
 435          self.failUnlessEqual(r.session.session_type, "DH-SHA1") 
 436          self.failUnlessEqual(r.assoc_type, "HMAC-SHA1") 
 437          self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS) 
 438          self.failUnlessEqual(r.session.dh.generator, ALT_GEN) 
 439          self.failUnless(r.session.consumer_pubkey) 
  440   
 441   
 443           
 444          args = { 
 445              'openid.mode': 'associate', 
 446              'openid.session_type': 'DH-SHA1', 
 447              'openid.dh_consumer_public': "Rzup9265tw==", 
 448              'openid.dh_modulus': 'pizza', 
 449              'openid.dh_gen': 'gnocchi', 
 450              } 
 451          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  452   
 453   
 455           
 456          args = { 
 457              'openid.mode': 'associate', 
 458              'openid.session_type': 'DH-SHA1', 
 459              'openid.dh_consumer_public': "Rzup9265tw==", 
 460              'openid.dh_modulus': 'pizza', 
 461              } 
 462          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  463   
 464   
 465   
 466   
 467   
 468   
 469   
 470   
 471   
 472   
 473   
 474   
 475   
 476   
 477   
 478   
 480          args = { 
 481              'openid.mode': 'associate', 
 482              'openid.session_type': 'FLCL6', 
 483              'openid.dh_consumer_public': "YQ==\n", 
 484              } 
 485          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  486   
 487   
 489          args = { 
 490              'openid.mode': 'associate', 
 491              } 
 492          r = self.decode(args) 
 493          self.failUnless(isinstance(r, server.AssociateRequest)) 
 494          self.failUnlessEqual(r.mode, "associate") 
 495          self.failUnlessEqual(r.session.session_type, "no-encryption") 
 496          self.failUnlessEqual(r.assoc_type, "HMAC-SHA1") 
  497   
 499          args = { 
 500              'openid.session_type': 'DH-SHA1', 
 501              'openid.dh_consumer_public': "my public keeey", 
 502              } 
 503          self.failUnlessRaises(server.ProtocolError, self.decode, args) 
  504   
 506          args = {'openid.ns': 'Tuesday', 
 507                  'openid.mode': 'associate'} 
 508   
 509          try: 
 510              r = self.decode(args) 
 511          except server.ProtocolError, err: 
 512               
 513               
 514              self.failUnless(err.openid_message) 
 515               
 516              self.failUnless('Tuesday' in str(err), str(err)) 
 517          else: 
 518              self.fail("Expected ProtocolError but returned with %r" % (r,)) 
   519   
 520   
 528   
 530          """ 
 531          Check that when an OpenID 2 response does not exceed the 
 532          OpenID 1 message size, a GET response (i.e., redirect) is 
 533          issued. 
 534          """ 
 535          request = server.CheckIDRequest( 
 536              identity = 'http://bombom.unittest/', 
 537              trust_root = 'http://burr.unittest/', 
 538              return_to = 'http://burr.unittest/999', 
 539              immediate = False, 
 540              op_endpoint = self.server.op_endpoint, 
 541              ) 
 542          request.message = Message(OPENID2_NS) 
 543          response = server.OpenIDResponse(request) 
 544          response.fields = Message.fromOpenIDArgs({ 
 545              'ns': OPENID2_NS, 
 546              'mode': 'id_res', 
 547              'identity': request.identity, 
 548              'claimed_id': request.identity, 
 549              'return_to': request.return_to, 
 550              }) 
 551   
 552          self.failIf(response.renderAsForm()) 
 553          self.failUnless(response.whichEncoding() == server.ENCODE_URL) 
 554          webresponse = self.encode(response) 
 555          self.failUnless(webresponse.headers.has_key('location')) 
  556   
 558          """ 
 559          Check that when an OpenID 2 response exceeds the OpenID 1 
 560          message size, a POST response (i.e., an HTML form) is 
 561          returned. 
 562          """ 
 563          request = server.CheckIDRequest( 
 564              identity = 'http://bombom.unittest/', 
 565              trust_root = 'http://burr.unittest/', 
 566              return_to = 'http://burr.unittest/999', 
 567              immediate = False, 
 568              op_endpoint = self.server.op_endpoint, 
 569              ) 
 570          request.message = Message(OPENID2_NS) 
 571          response = server.OpenIDResponse(request) 
 572          response.fields = Message.fromOpenIDArgs({ 
 573              'ns': OPENID2_NS, 
 574              'mode': 'id_res', 
 575              'identity': request.identity, 
 576              'claimed_id': request.identity, 
 577              'return_to': 'x' * OPENID1_URL_LIMIT, 
 578              }) 
 579   
 580          self.failUnless(response.renderAsForm()) 
 581          self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT) 
 582          self.failUnless(response.whichEncoding() == server.ENCODE_HTML_FORM) 
 583          webresponse = self.encode(response) 
 584          self.failUnlessEqual(webresponse.body, response.toFormMarkup()) 
  585   
 606   
 608          request = server.CheckIDRequest( 
 609              identity = 'http://bombom.unittest/', 
 610              trust_root = 'http://burr.unittest/', 
 611              return_to = 'http://burr.unittest/999', 
 612              immediate = False, 
 613              op_endpoint = self.server.op_endpoint, 
 614              ) 
 615          request.message = Message(OPENID2_NS) 
 616          response = server.OpenIDResponse(request) 
 617          response.fields = Message.fromOpenIDArgs({ 
 618              'ns': OPENID2_NS, 
 619              'mode': 'id_res', 
 620              'identity': request.identity, 
 621              'claimed_id': request.identity, 
 622              'return_to': 'x' * OPENID1_URL_LIMIT, 
 623              }) 
 624          html = response.toHTML() 
 625          self.failUnless('<html>' in html) 
 626          self.failUnless('</html>' in html) 
 627          self.failUnless('<body onload=' in html) 
 628          self.failUnless('<form' in html) 
 629          self.failUnless('http://bombom.unittest/' in html) 
  630   
 632          """ 
 633          Check that when an OpenID 1 response exceeds the OpenID 1 
 634          message size, a GET response is issued.  Technically, this 
 635          shouldn't be permitted by the library, but this test is in 
 636          place to preserve the status quo for OpenID 1. 
 637          """ 
 638          request = server.CheckIDRequest( 
 639              identity = 'http://bombom.unittest/', 
 640              trust_root = 'http://burr.unittest/', 
 641              return_to = 'http://burr.unittest/999', 
 642              immediate = False, 
 643              op_endpoint = self.server.op_endpoint, 
 644              ) 
 645          request.message = Message(OPENID2_NS) 
 646          response = server.OpenIDResponse(request) 
 647          response.fields = Message.fromOpenIDArgs({ 
 648              'mode': 'id_res', 
 649              'identity': request.identity, 
 650              'return_to': 'x' * OPENID1_URL_LIMIT, 
 651              }) 
 652   
 653          self.failIf(response.renderAsForm()) 
 654          self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT) 
 655          self.failUnless(response.whichEncoding() == server.ENCODE_URL) 
 656          webresponse = self.encode(response) 
 657          self.failUnlessEqual(webresponse.headers['location'], response.encodeToURL()) 
  658   
 660          request = server.CheckIDRequest( 
 661              identity = 'http://bombom.unittest/', 
 662              trust_root = 'http://burr.unittest/', 
 663              return_to = 'http://burr.unittest/999', 
 664              immediate = False, 
 665              op_endpoint = self.server.op_endpoint, 
 666              ) 
 667          request.message = Message(OPENID2_NS) 
 668          response = server.OpenIDResponse(request) 
 669          response.fields = Message.fromOpenIDArgs({ 
 670              'mode': 'id_res', 
 671              'identity': request.identity, 
 672              'return_to': request.return_to, 
 673              }) 
 674          webresponse = self.encode(response) 
 675          self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT) 
 676          self.failUnless(webresponse.headers.has_key('location')) 
 677   
 678          location = webresponse.headers['location'] 
 679          self.failUnless(location.startswith(request.return_to), 
 680                          "%s does not start with %s" % (location, 
 681                                                         request.return_to)) 
 682           
 683          q2 = dict(cgi.parse_qsl(urlparse(location)[4])) 
 684          expected = response.fields.toPostArgs() 
 685          self.failUnlessEqual(q2, expected) 
  686   
 703   
 719   
 733   
 750   
 757   
  768   
 769   
 770   
 794   
 811   
 813          webresponse = self.encode(self.response) 
 814          self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT) 
 815          self.failUnless(webresponse.headers.has_key('location')) 
 816   
 817          location = webresponse.headers['location'] 
 818          query = cgi.parse_qs(urlparse(location)[4]) 
 819          self.failUnless('openid.sig' in query) 
 820          self.failUnless('openid.assoc_handle' in query) 
 821          self.failUnless('openid.signed' in query) 
  822   
 824          self.encoder.signatory = None 
 825          self.failUnlessRaises(ValueError, self.encode, self.response) 
  826   
 844   
 857   
  861   
 875   
 877          self.request.trust_root = "http://foo.unittest/17" 
 878          self.request.return_to = "http://foo.unittest/39" 
 879          self.failIf(self.request.trustRootValid()) 
  880   
 882          self.request.trust_root = "http://foo.unittest/" 
 883          self.request.return_to = "http://foo.unittest/39" 
 884          self.failUnless(self.request.trustRootValid()) 
  885   
 898   
 900          request = server.CheckIDRequest( 
 901              identity = 'http://bambam.unittest/', 
 902              trust_root = 'http://bar.unittest/', 
 903              return_to = None, 
 904              immediate = False, 
 905              op_endpoint = self.server.op_endpoint, 
 906              ) 
 907   
 908          self.failUnless(request.trustRootValid()) 
  909   
 911          """Make sure that verifyReturnTo is calling the trustroot 
 912          function verifyReturnTo 
 913          """ 
 914          def withVerifyReturnTo(new_verify, callable): 
 915              old_verify = server.verifyReturnTo 
 916              try: 
 917                  server.verifyReturnTo = new_verify 
 918                  return callable() 
 919              finally: 
 920                  server.verifyReturnTo = old_verify 
  921   
 922           
 923          sentinel = Exception() 
 924          def vrfyExc(trust_root, return_to): 
 925              self.failUnlessEqual(self.request.trust_root, trust_root) 
 926              self.failUnlessEqual(self.request.return_to, return_to) 
 927              raise sentinel 
  928   
 929          try: 
 930              withVerifyReturnTo(vrfyExc, self.request.returnToVerified) 
 931          except Exception, e: 
 932              self.failUnless(e is sentinel, e) 
 933   
 934           
 935          def constVerify(val): 
 936              def verify(trust_root, return_to): 
 937                  self.failUnlessEqual(self.request.trust_root, trust_root) 
 938                  self.failUnlessEqual(self.request.return_to, return_to) 
 939                  return val 
 940              return verify 
 941   
 942          for val in [True, False]: 
 943              self.failUnlessEqual( 
 944                  val, 
 945                  withVerifyReturnTo(constVerify(val), 
 946                                     self.request.returnToVerified)) 
 947   
 948 -    def _expectAnswer(self, answer, identity=None, claimed_id=None): 
  949          expected_list = [ 
 950              ('mode', 'id_res'), 
 951              ('return_to', self.request.return_to), 
 952              ('op_endpoint', self.op_endpoint), 
 953              ] 
 954          if identity: 
 955              expected_list.append(('identity', identity)) 
 956              if claimed_id: 
 957                  expected_list.append(('claimed_id', claimed_id)) 
 958              else: 
 959                  expected_list.append(('claimed_id', identity)) 
 960   
 961          for k, expected in expected_list: 
 962              actual = answer.fields.getArg(OPENID_NS, k) 
 963              self.failUnlessEqual(actual, expected, "%s: expected %s, got %s" % (k, expected, actual)) 
 964   
 965          self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce')) 
 966          self.failUnless(answer.fields.getOpenIDNamespace() == OPENID2_NS) 
 967   
 968           
 969          self.failUnlessEqual(len(answer.fields.toPostArgs()), 
 970                               len(expected_list) + 2, 
 971                               answer.fields.toPostArgs()) 
  972   
 974          """Check the fields specified by "Positive Assertions" 
 975   
 976          including mode=id_res, identity, claimed_id, op_endpoint, return_to 
 977          """ 
 978          answer = self.request.answer(True) 
 979          self.failUnlessEqual(answer.request, self.request) 
 980          self._expectAnswer(answer, self.request.identity) 
  981   
 983          self.request.claimed_id = 'http://delegating.unittest/' 
 984          answer = self.request.answer(True) 
 985          self._expectAnswer(answer, self.request.identity, 
 986                             self.request.claimed_id) 
  987   
 989           
 990           
 991          self.request.claimed_id = 'http://delegating.unittest/' 
 992          answer = self.request.answer(True, identity='http://bambam.unittest/') 
 993          self._expectAnswer(answer, self.request.identity, 
 994                             self.request.claimed_id) 
  995   
 997          self.request.identity = None 
 998          answer = self.request.answer(True) 
 999          self.failUnlessEqual(answer.request, self.request) 
1000          self._expectAnswer(answer) 
 1001   
1003          self.request.identity = None 
1004           
1005          self.failUnlessRaises( 
1006              ValueError, self.request.answer, True, identity="=V") 
 1007   
1009          self.request.identity = IDENTIFIER_SELECT 
1010          selected_id = 'http://anon.unittest/9861' 
1011          answer = self.request.answer(True, identity=selected_id) 
1012          self._expectAnswer(answer, selected_id) 
 1013   
1015          """Answer an IDENTIFIER_SELECT case with a delegated identifier. 
1016          """ 
1017           
1018          self.request.identity = IDENTIFIER_SELECT 
1019          selected_id = 'http://anon.unittest/9861' 
1020          claimed_id = 'http://monkeyhat.unittest/' 
1021          answer = self.request.answer(True, identity=selected_id, 
1022                                       claimed_id=claimed_id) 
1023          self._expectAnswer(answer, selected_id, claimed_id) 
 1024   
1026          """claimed_id parameter doesn't exist in OpenID 1. 
1027          """ 
1028          self.request.message = Message(OPENID1_NS) 
1029           
1030          self.request.identity = IDENTIFIER_SELECT 
1031          selected_id = 'http://anon.unittest/9861' 
1032          claimed_id = 'http://monkeyhat.unittest/' 
1033          self.failUnlessRaises(server.VersionError, 
1034                                self.request.answer, True, 
1035                                identity=selected_id, 
1036                                claimed_id=claimed_id) 
 1037   
1039           
1040          self.failUnlessRaises(ValueError, self.request.answer, True, 
1041                                identity="http://pebbles.unittest/") 
 1042   
1044           
1045           
1046           
1047          non_normalized = 'http://bambam.unittest' 
1048          normalized = non_normalized + '/' 
1049   
1050          self.request.identity = non_normalized 
1051          self.request.claimed_id = non_normalized 
1052   
1053          answer = self.request.answer(True, identity=normalized) 
1054   
1055           
1056           
1057          self._expectAnswer(answer, identity=non_normalized, 
1058                             claimed_id=non_normalized) 
 1059   
1061          self.request.message = Message(OPENID1_NS) 
1062          self.request.identity = None 
1063          self.failUnlessRaises(ValueError, self.request.answer, True, 
1064                                identity=None) 
 1065   
1067          self.request.op_endpoint = None 
1068          self.failUnlessRaises(RuntimeError, self.request.answer, True) 
 1069   
1071          msg = Message(OPENID1_NS) 
1072          msg.setArg(OPENID_NS, 'return_to', 'bogus') 
1073          msg.setArg(OPENID_NS, 'trust_root', 'bogus') 
1074          msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 
1075          msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 
1076   
1077          self.failUnlessRaises(server.ProtocolError, 
1078                                server.CheckIDRequest.fromMessage, 
1079                                msg, self.server) 
 1080   
1092   
1104   
1106          """Ignore openid.realm in OpenID 1""" 
1107          msg = Message(OPENID1_NS) 
1108          msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 
1109          msg.setArg(OPENID_NS, 'trust_root', 'http://real_trust_root/') 
1110          msg.setArg(OPENID_NS, 'realm', 'http://fake_trust_root/') 
1111          msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo') 
1112          msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 
1113          msg.setArg(OPENID_NS, 'identity', 'george') 
1114   
1115          result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 
1116   
1117          self.failUnless(result.trust_root == 'http://real_trust_root/') 
 1118   
1120          """Ignore openid.trust_root in OpenID 2""" 
1121          msg = Message(OPENID2_NS) 
1122          msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 
1123          msg.setArg(OPENID_NS, 'realm', 'http://real_trust_root/') 
1124          msg.setArg(OPENID_NS, 'trust_root', 'http://fake_trust_root/') 
1125          msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo') 
1126          msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 
1127          msg.setArg(OPENID_NS, 'identity', 'george') 
1128          msg.setArg(OPENID_NS, 'claimed_id', 'george') 
1129   
1130          result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 
1131   
1132          self.failUnless(result.trust_root == 'http://real_trust_root/') 
 1133   
1135          self.request.trust_root = None 
1136          answer = self.request.answer(True) 
1137          self.failUnlessEqual(answer.request, self.request) 
1138          self._expectAnswer(answer, self.request.identity) 
 1139   
1141          msg = Message(OPENID2_NS) 
1142          msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 
1143          msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo') 
1144          msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 
1145          msg.setArg(OPENID_NS, 'identity', 'george') 
1146          msg.setArg(OPENID_NS, 'claimed_id', 'george') 
1147   
1148          result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 
1149   
1150          self.failUnlessEqual(result.trust_root, 'http://real_trust_root/foo') 
 1151   
1153          return_to = u'http://someplace.invalid/?go=thing' 
1154          msg = Message.fromPostArgs({ 
1155                  u'openid.assoc_handle': u'{blah}{blah}{OZivdQ==}', 
1156                  u'openid.claimed_id': u'http://delegated.invalid/', 
1157                  u'openid.identity': u'http://op-local.example.com/', 
1158                  u'openid.mode': u'checkid_setup', 
1159                  u'openid.ns': u'http://openid.net/signon/1.0', 
1160                  u'openid.return_to': return_to, 
1161                  u'openid.trust_root': u''}) 
1162   
1163          result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint) 
1164   
1165          self.failUnlessEqual(result.trust_root, return_to) 
 1166   
1168          msg = Message(OPENID2_NS) 
1169          msg.setArg(OPENID_NS, 'mode', 'checkid_setup') 
1170          msg.setArg(OPENID_NS, 'assoc_handle', 'bogus') 
1171          msg.setArg(OPENID_NS, 'identity', 'george') 
1172          msg.setArg(OPENID_NS, 'claimed_id', 'george') 
1173   
1174          self.failUnlessRaises(server.ProtocolError, 
1175                                server.CheckIDRequest.fromMessage, 
1176                                msg, self.server.op_endpoint) 
 1177   
1179          """Test .allow() with an OpenID 1.x Message on a CheckIDRequest 
1180          built without an op_endpoint parameter. 
1181          """ 
1182          identity = 'http://bambam.unittest/' 
1183          reqmessage = Message.fromOpenIDArgs({ 
1184              'identity': identity, 
1185              'trust_root': 'http://bar.unittest/', 
1186              'return_to': 'http://bar.unittest/999', 
1187              }) 
1188          self.request = server.CheckIDRequest.fromMessage(reqmessage, None) 
1189          answer = self.request.answer(True) 
1190   
1191          expected_list = [ 
1192              ('mode', 'id_res'), 
1193              ('return_to', self.request.return_to), 
1194              ('identity', identity), 
1195              ] 
1196   
1197          for k, expected in expected_list: 
1198              actual = answer.fields.getArg(OPENID_NS, k) 
1199              self.failUnlessEqual( 
1200                  expected, actual, 
1201                  "%s: expected %s, got %s" % (k, expected, actual)) 
1202   
1203          self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce')) 
1204          self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS) 
1205          self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS)) 
1206   
1207           
1208          self.failUnlessEqual(len(answer.fields.toPostArgs()), 
1209                               len(expected_list) + 1, 
1210                               answer.fields.toPostArgs()) 
 1211   
1234   
1251   
1257   
1271   
1273          url = self.request.getCancelURL() 
1274          rt, query_string = url.split('?') 
1275          self.failUnlessEqual(self.request.return_to, rt) 
1276          query = dict(cgi.parse_qsl(query_string)) 
1277          self.failUnlessEqual(query, {'openid.mode':'cancel', 
1278                                       'openid.ns':OPENID2_NS}) 
 1279   
1281          self.request.mode = 'checkid_immediate' 
1282          self.request.immediate = True 
1283          self.failUnlessRaises(ValueError, self.request.getCancelURL) 
 1284   
1285   
1286   
1288   
1290          self.op_endpoint = 'http://endpoint.unittest/ext' 
1291          self.store = memstore.MemoryStore() 
1292          self.server = server.Server(self.store, self.op_endpoint) 
1293          self.request = server.CheckIDRequest( 
1294              identity = 'http://bambam.unittest/', 
1295              trust_root = 'http://bar.unittest/', 
1296              return_to = 'http://bar.unittest/999', 
1297              immediate = False, 
1298              op_endpoint = self.server.op_endpoint, 
1299              ) 
1300          self.request.message = Message(OPENID2_NS) 
1301          self.response = server.OpenIDResponse(self.request) 
1302          self.response.fields.setArg(OPENID_NS, 'mode', 'id_res') 
1303          self.response.fields.setArg(OPENID_NS, 'blue', 'star') 
 1304   
1305   
1316   
1317   
 1328   
1329   
1330   
1332      isValid = True 
1333   
1336   
1337 -    def verify(self, assoc_handle, message): 
 1343   
1345          if (dumb, assoc_handle) in self.assocs: 
1346               
1347               
1348              return True 
1349          else: 
1350              return None 
 1351   
1353          if (dumb, assoc_handle) in self.assocs: 
1354              self.assocs.remove((dumb, assoc_handle)) 
  1355   
1356   
1369   
1371          r = self.request.answer(self.signatory) 
1372          self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'}) 
1373          self.failUnlessEqual(r.request, self.request) 
 1374   
1376          self.signatory.isValid = False 
1377          r = self.request.answer(self.signatory) 
1378          self.failUnlessEqual(r.fields.getArgs(OPENID_NS), 
1379                               {'is_valid': 'false'}) 
 1380   
1382          """Don't validate the same response twice. 
1383   
1384          From "Checking the Nonce":: 
1385   
1386              When using "check_authentication", the OP MUST ensure that an 
1387              assertion has not yet been accepted with the same value for 
1388              "openid.response_nonce". 
1389   
1390          In this implementation, the assoc_handle is only valid once.  And 
1391          nonces are a signed component of the message, so they can't be used 
1392          with another handle without breaking the sig. 
1393          """ 
1394          r = self.request.answer(self.signatory) 
1395          r = self.request.answer(self.signatory) 
1396          self.failUnlessEqual(r.fields.getArgs(OPENID_NS), 
1397                               {'is_valid': 'false'}) 
 1398   
1400          self.request.invalidate_handle = "bogusHandle" 
1401          r = self.request.answer(self.signatory) 
1402          self.failUnlessEqual(r.fields.getArgs(OPENID_NS), 
1403                               {'is_valid': 'true', 
1404                                'invalidate_handle': "bogusHandle"}) 
1405          self.failUnlessEqual(r.request, self.request) 
 1406   
1408          assoc_handle = 'goodhandle' 
1409          self.signatory.assocs.append((False, 'goodhandle')) 
1410          self.request.invalidate_handle = assoc_handle 
1411          r = self.request.answer(self.signatory) 
1412          self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'}) 
  1413   
1414   
1416       
1417       
1418   
1424   
1426          self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1') 
1427          from openid.dh import DiffieHellman 
1428          from openid.server.server import DiffieHellmanSHA1ServerSession 
1429          consumer_dh = DiffieHellman.fromDefaults() 
1430          cpub = consumer_dh.public 
1431          server_dh = DiffieHellman.fromDefaults() 
1432          session = DiffieHellmanSHA1ServerSession(server_dh, cpub) 
1433          self.request = server.AssociateRequest(session, 'HMAC-SHA1') 
1434          response = self.request.answer(self.assoc) 
1435          rfg = lambda f: response.fields.getArg(OPENID_NS, f) 
1436          self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 
1437          self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 
1438          self.failIf(rfg("mac_key")) 
1439          self.failUnlessEqual(rfg("session_type"), "DH-SHA1") 
1440          self.failUnless(rfg("enc_mac_key")) 
1441          self.failUnless(rfg("dh_server_public")) 
1442   
1443          enc_key = rfg("enc_mac_key").decode('base64') 
1444          spub = cryptutil.base64ToLong(rfg("dh_server_public")) 
1445          secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha1) 
1446          self.failUnlessEqual(secret, self.assoc.secret) 
 1447   
1448   
1449      if not cryptutil.SHA256_AVAILABLE: 
1450          warnings.warn("Not running SHA256 tests.") 
1451      else: 
1453              self.assoc = self.signatory.createAssociation( 
1454                  dumb=False, assoc_type='HMAC-SHA256') 
1455              from openid.dh import DiffieHellman 
1456              from openid.server.server import DiffieHellmanSHA256ServerSession 
1457              consumer_dh = DiffieHellman.fromDefaults() 
1458              cpub = consumer_dh.public 
1459              server_dh = DiffieHellman.fromDefaults() 
1460              session = DiffieHellmanSHA256ServerSession(server_dh, cpub) 
1461              self.request = server.AssociateRequest(session, 'HMAC-SHA256') 
1462              response = self.request.answer(self.assoc) 
1463              rfg = lambda f: response.fields.getArg(OPENID_NS, f) 
1464              self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA256") 
1465              self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 
1466              self.failIf(rfg("mac_key")) 
1467              self.failUnlessEqual(rfg("session_type"), "DH-SHA256") 
1468              self.failUnless(rfg("enc_mac_key")) 
1469              self.failUnless(rfg("dh_server_public")) 
1470   
1471              enc_key = rfg("enc_mac_key").decode('base64') 
1472              spub = cryptutil.base64ToLong(rfg("dh_server_public")) 
1473              secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha256) 
1474              self.failUnlessEqual(secret, self.assoc.secret) 
 1475   
1500   
1525   
1527   
1528          contact = 'user@example.invalid' 
1529          reference = 'Trac ticket number MAX_INT' 
1530          error = 'poltergeist' 
1531   
1532          openid1_args = { 
1533              'openid.identitiy': 'invalid', 
1534              'openid.mode': 'checkid_setup', 
1535              } 
1536   
1537          openid2_args = dict(openid1_args) 
1538          openid2_args.update({'openid.ns': OPENID2_NS}) 
1539   
1540           
1541   
1542          openid1_msg = Message.fromPostArgs(openid1_args) 
1543          p = server.ProtocolError(openid1_msg, error, 
1544                                   contact=contact, reference=reference) 
1545          reply = p.toMessage() 
1546   
1547          self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference) 
1548          self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact) 
1549   
1550          openid2_msg = Message.fromPostArgs(openid2_args) 
1551          p = server.ProtocolError(openid2_msg, error, 
1552                                   contact=contact, reference=reference) 
1553          reply = p.toMessage() 
1554   
1555          self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference) 
1556          self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact) 
 1557   
1559          expires_in_str = msg.getArg(OPENID_NS, 'expires_in', no_default) 
1560          expires_in = int(expires_in_str) 
1561   
1562           
1563           
1564          slop = 1  
1565          difference = expected_expires_in - expires_in 
1566   
1567          error_message = ('"expires_in" value not within %s of expected: ' 
1568                           'expected=%s, actual=%s' % 
1569                           (slop, expected_expires_in, expires_in)) 
1570          self.failUnless(0 <= difference <= slop, error_message) 
 1571   
1572 -    def test_plaintext(self): 
 1573          self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1') 
1574          response = self.request.answer(self.assoc) 
1575          rfg = lambda f: response.fields.getArg(OPENID_NS, f) 
1576   
1577          self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 
1578          self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 
1579   
1580          self.failUnlessExpiresInMatches( 
1581              response.fields, self.signatory.SECRET_LIFETIME) 
1582   
1583          self.failUnlessEqual( 
1584              rfg("mac_key"), oidutil.toBase64(self.assoc.secret)) 
1585          self.failIf(rfg("session_type")) 
1586          self.failIf(rfg("enc_mac_key")) 
1587          self.failIf(rfg("dh_server_public")) 
 1588   
1590           
1591           
1592          args = { 
1593              'openid.ns': OPENID2_NS, 
1594              'openid.mode': 'associate', 
1595              'openid.assoc_type': 'HMAC-SHA1', 
1596              'openid.session_type': 'no-encryption', 
1597              } 
1598          self.request = server.AssociateRequest.fromMessage( 
1599              Message.fromPostArgs(args)) 
1600   
1601          self.failIf(self.request.message.isOpenID1()) 
1602   
1603          self.assoc = self.signatory.createAssociation( 
1604              dumb=False, assoc_type='HMAC-SHA1') 
1605          response = self.request.answer(self.assoc) 
1606          rfg = lambda f: response.fields.getArg(OPENID_NS, f) 
1607   
1608          self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 
1609          self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 
1610   
1611          self.failUnlessExpiresInMatches( 
1612              response.fields, self.signatory.SECRET_LIFETIME) 
1613   
1614          self.failUnlessEqual( 
1615              rfg("mac_key"), oidutil.toBase64(self.assoc.secret)) 
1616   
1617          self.failUnlessEqual(rfg("session_type"), "no-encryption") 
1618          self.failIf(rfg("enc_mac_key")) 
1619          self.failIf(rfg("dh_server_public")) 
 1620   
1622          self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA256') 
1623          response = self.request.answer(self.assoc) 
1624          rfg = lambda f: response.fields.getArg(OPENID_NS, f) 
1625   
1626          self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1") 
1627          self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle) 
1628   
1629          self.failUnlessExpiresInMatches( 
1630              response.fields, self.signatory.SECRET_LIFETIME) 
1631   
1632          self.failUnlessEqual( 
1633              rfg("mac_key"), oidutil.toBase64(self.assoc.secret)) 
1634          self.failIf(rfg("session_type")) 
1635          self.failIf(rfg("enc_mac_key")) 
1636          self.failIf(rfg("dh_server_public")) 
 1637   
1639          allowed_assoc = 'COLD-PET-RAT' 
1640          allowed_sess = 'FROG-BONES' 
1641          message = 'This is a unit test' 
1642   
1643           
1644           
1645          self.request.message = Message(OPENID2_NS) 
1646   
1647          response = self.request.answerUnsupported( 
1648              message=message, 
1649              preferred_session_type=allowed_sess, 
1650              preferred_association_type=allowed_assoc, 
1651              ) 
1652          rfg = lambda f: response.fields.getArg(OPENID_NS, f) 
1653          self.failUnlessEqual(rfg('error_code'), 'unsupported-type') 
1654          self.failUnlessEqual(rfg('assoc_type'), allowed_assoc) 
1655          self.failUnlessEqual(rfg('error'), message) 
1656          self.failUnlessEqual(rfg('session_type'), allowed_sess) 
 1657   
 1671   
1678   
1697   
1703   
1726   
1728          """Request an assoc type that is not supported when there are 
1729          supported types. 
1730   
1731          Should give back an error message with a fallback type. 
1732          """ 
1733          self.server.negotiator.setAllowedTypes([('HMAC-SHA256', 'DH-SHA256')]) 
1734   
1735          msg = Message.fromPostArgs({ 
1736              'openid.ns': OPENID2_NS, 
1737              'openid.session_type': 'no-encryption', 
1738              }) 
1739   
1740          request = server.AssociateRequest.fromMessage(msg) 
1741          response = self.server.openid_associate(request) 
1742   
1743          self.failUnless(response.fields.hasKey(OPENID_NS, "error")) 
1744          self.failUnless(response.fields.hasKey(OPENID_NS, "error_code")) 
1745          self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle")) 
1746          self.failUnlessEqual(response.fields.getArg(OPENID_NS, "assoc_type"), 
1747                               'HMAC-SHA256') 
1748          self.failUnlessEqual(response.fields.getArg(OPENID_NS, "session_type"), 
1749                               'DH-SHA256') 
 1750   
1751      if not cryptutil.SHA256_AVAILABLE: 
1752          warnings.warn("Not running SHA256 tests.") 
1753      else: 
1770   
1779   
1784   
1792   
1794          request = server.OpenIDRequest() 
1795          assoc_handle = '{assoc}{lookatme}' 
1796          self.store.storeAssociation( 
1797              self._normal_key, 
1798              association.Association.fromExpiresIn(60, assoc_handle, 
1799                                                    'sekrit', 'HMAC-SHA1')) 
1800          request.assoc_handle = assoc_handle 
1801          request.namespace = OPENID1_NS 
1802          response = server.OpenIDResponse(request) 
1803          response.fields = Message.fromOpenIDArgs({ 
1804              'foo': 'amsigned', 
1805              'bar': 'notsigned', 
1806              'azu': 'alsosigned', 
1807              }) 
1808          sresponse = self.signatory.sign(response) 
1809          self.failUnlessEqual( 
1810              sresponse.fields.getArg(OPENID_NS, 'assoc_handle'), 
1811              assoc_handle) 
1812          self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'), 
1813                               'assoc_handle,azu,bar,foo,signed') 
1814          self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 
1815          self.failIf(self.messages, self.messages) 
 1816   
1818          request = server.OpenIDRequest() 
1819          request.assoc_handle = None 
1820          request.namespace = OPENID2_NS 
1821          response = server.OpenIDResponse(request) 
1822          response.fields = Message.fromOpenIDArgs({ 
1823              'foo': 'amsigned', 
1824              'bar': 'notsigned', 
1825              'azu': 'alsosigned', 
1826              'ns':OPENID2_NS, 
1827              }) 
1828          sresponse = self.signatory.sign(response) 
1829          assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle') 
1830          self.failUnless(assoc_handle) 
1831          assoc = self.signatory.getAssociation(assoc_handle, dumb=True) 
1832          self.failUnless(assoc) 
1833          self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'), 
1834                               'assoc_handle,azu,bar,foo,ns,signed') 
1835          self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 
1836          self.failIf(self.messages, self.messages) 
 1837   
1839          """Sign a response to a message with an expired handle (using invalidate_handle). 
1840   
1841          From "Verifying with an Association":: 
1842   
1843              If an authentication request included an association handle for an 
1844              association between the OP and the Relying party, and the OP no 
1845              longer wishes to use that handle (because it has expired or the 
1846              secret has been compromised, for instance), the OP will send a 
1847              response that must be verified directly with the OP, as specified 
1848              in Section 11.3.2. In that instance, the OP will include the field 
1849              "openid.invalidate_handle" set to the association handle that the 
1850              Relying Party included with the original request. 
1851          """ 
1852          request = server.OpenIDRequest() 
1853          request.namespace = OPENID2_NS 
1854          assoc_handle = '{assoc}{lookatme}' 
1855          self.store.storeAssociation( 
1856              self._normal_key, 
1857              association.Association.fromExpiresIn(-10, assoc_handle, 
1858                                                    'sekrit', 'HMAC-SHA1')) 
1859          self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle)) 
1860   
1861          request.assoc_handle = assoc_handle 
1862          response = server.OpenIDResponse(request) 
1863          response.fields = Message.fromOpenIDArgs({ 
1864              'foo': 'amsigned', 
1865              'bar': 'notsigned', 
1866              'azu': 'alsosigned', 
1867              }) 
1868          sresponse = self.signatory.sign(response) 
1869   
1870          new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle') 
1871          self.failUnless(new_assoc_handle) 
1872          self.failIfEqual(new_assoc_handle, assoc_handle) 
1873   
1874          self.failUnlessEqual( 
1875              sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'), 
1876              assoc_handle) 
1877   
1878          self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'), 
1879                               'assoc_handle,azu,bar,foo,invalidate_handle,signed') 
1880          self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 
1881   
1882           
1883          self.failIf(self.store.getAssociation(self._normal_key, assoc_handle), 
1884                      "expired association is still retrievable.") 
1885   
1886           
1887          self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle)) 
1888          self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle)) 
1889          self.failUnless(self.messages) 
 1890   
1891   
1893          request = server.OpenIDRequest() 
1894          request.namespace = OPENID2_NS 
1895          assoc_handle = '{bogus-assoc}{notvalid}' 
1896   
1897          request.assoc_handle = assoc_handle 
1898          response = server.OpenIDResponse(request) 
1899          response.fields = Message.fromOpenIDArgs({ 
1900              'foo': 'amsigned', 
1901              'bar': 'notsigned', 
1902              'azu': 'alsosigned', 
1903              }) 
1904          sresponse = self.signatory.sign(response) 
1905   
1906          new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle') 
1907          self.failUnless(new_assoc_handle) 
1908          self.failIfEqual(new_assoc_handle, assoc_handle) 
1909   
1910          self.failUnlessEqual( 
1911              sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'), 
1912              assoc_handle) 
1913   
1914          self.failUnlessEqual( 
1915              sresponse.fields.getArg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,invalidate_handle,signed') 
1916          self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig')) 
1917   
1918           
1919          self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle)) 
1920          self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle)) 
1921          self.failIf(self.messages, self.messages) 
 1922   
1923   
1925          assoc_handle = '{vroom}{zoom}' 
1926          assoc = association.Association.fromExpiresIn( 
1927              60, assoc_handle, 'sekrit', 'HMAC-SHA1') 
1928   
1929          self.store.storeAssociation(self._dumb_key, assoc) 
1930   
1931          signed = Message.fromPostArgs({ 
1932              'openid.foo': 'bar', 
1933              'openid.apple': 'orange', 
1934              'openid.assoc_handle': assoc_handle, 
1935              'openid.signed': 'apple,assoc_handle,foo,signed', 
1936              'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco=', 
1937              }) 
1938   
1939          verified = self.signatory.verify(assoc_handle, signed) 
1940          self.failIf(self.messages, self.messages) 
1941          self.failUnless(verified) 
 1942   
1943   
1945          assoc_handle = '{vroom}{zoom}' 
1946          assoc = association.Association.fromExpiresIn( 
1947              60, assoc_handle, 'sekrit', 'HMAC-SHA1') 
1948   
1949          self.store.storeAssociation(self._dumb_key, assoc) 
1950   
1951          signed = Message.fromPostArgs({ 
1952              'openid.foo': 'bar', 
1953              'openid.apple': 'orange', 
1954              'openid.assoc_handle': assoc_handle, 
1955              'openid.signed': 'apple,assoc_handle,foo,signed', 
1956              'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco='.encode('rot13'), 
1957              }) 
1958   
1959          verified = self.signatory.verify(assoc_handle, signed) 
1960          self.failIf(self.messages, self.messages) 
1961          self.failIf(verified) 
 1962   
1964          assoc_handle = '{vroom}{zoom}' 
1965          signed = Message.fromPostArgs({ 
1966              'foo': 'bar', 
1967              'apple': 'orange', 
1968              'openid.sig': "Ylu0KcIR7PvNegB/K41KpnRgJl0=", 
1969              }) 
1970   
1971          verified = self.signatory.verify(assoc_handle, signed) 
1972          self.failIf(verified) 
1973          self.failUnless(self.messages) 
 1974   
1975   
1977          """Attempt to validate sign-all message with a signed-list assoc.""" 
1978          assoc_handle = '{vroom}{zoom}' 
1979          assoc = association.Association.fromExpiresIn( 
1980              60, assoc_handle, 'sekrit', 'HMAC-SHA1') 
1981   
1982          self.store.storeAssociation(self._dumb_key, assoc) 
1983   
1984          signed = Message.fromPostArgs({ 
1985              'foo': 'bar', 
1986              'apple': 'orange', 
1987              'openid.sig': "d71xlHtqnq98DonoSgoK/nD+QRM=", 
1988              }) 
1989   
1990          verified = self.signatory.verify(assoc_handle, signed) 
1991          self.failIf(verified) 
1992          self.failUnless(self.messages) 
 1993   
1995          assoc_handle = self.makeAssoc(dumb=True) 
1996          assoc = self.signatory.getAssociation(assoc_handle, True) 
1997          self.failUnless(assoc) 
1998          self.failUnlessEqual(assoc.handle, assoc_handle) 
1999          self.failIf(self.messages, self.messages) 
 2000   
2002          assoc_handle = self.makeAssoc(dumb=True, lifetime=-10) 
2003          assoc = self.signatory.getAssociation(assoc_handle, True) 
2004          self.failIf(assoc, assoc) 
2005          self.failUnless(self.messages) 
 2006   
2008          ah = 'no-such-handle' 
2009          self.failUnlessEqual( 
2010              self.signatory.getAssociation(ah, dumb=False), None) 
2011          self.failIf(self.messages, self.messages) 
 2012   
2014          """getAssociation(dumb=False) cannot get a dumb assoc""" 
2015          assoc_handle = self.makeAssoc(dumb=True) 
2016          self.failUnlessEqual( 
2017              self.signatory.getAssociation(assoc_handle, dumb=False), None) 
2018          self.failIf(self.messages, self.messages) 
 2019   
2021          """getAssociation(dumb=True) cannot get a shared assoc 
2022   
2023          From "Verifying Directly with the OpenID Provider":: 
2024   
2025              An OP MUST NOT verify signatures for associations that have shared 
2026              MAC keys. 
2027          """ 
2028          assoc_handle = self.makeAssoc(dumb=False) 
2029          self.failUnlessEqual( 
2030              self.signatory.getAssociation(assoc_handle, dumb=True), None) 
2031          self.failIf(self.messages, self.messages) 
 2032   
2037   
2045   
 2060   
2061   
2062   
2063  if __name__ == '__main__': 
2064      unittest.main() 
2065