1 """Tests for openid.server.
2 """
3 from openid.server import server
4 from openid import association, cryptutil, oidutil
5 from openid.message import Message, OPENID_NS, OPENID2_NS, OPENID1_NS, \
6 IDENTIFIER_SELECT, no_default, OPENID1_URL_LIMIT
7 from openid.store import memstore
8 import cgi
9
10 import unittest
11 import warnings
12
13 from urlparse import urlparse
14
15
16
17
18
19
20
21 ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7
22 ALT_GEN = 5
23
35
38 return_to = "http://rp.unittest/consumer"
39
40 args = Message.fromPostArgs({
41 'openid.mode': 'monkeydance',
42 'openid.identity': 'http://wagu.unittest/',
43 'openid.return_to': return_to,
44 })
45 e = server.ProtocolError(args, "plucky")
46 self.failUnless(e.hasReturnTo())
47 expected_args = {
48 'openid.mode': ['error'],
49 'openid.error': ['plucky'],
50 }
51
52 rt_base, result_args = e.encodeToURL().split('?', 1)
53 result_args = cgi.parse_qs(result_args)
54 self.failUnlessEqual(result_args, expected_args)
55
57 return_to = "http://rp.unittest/consumer"
58
59 args = Message.fromPostArgs({
60 'openid.ns': OPENID2_NS,
61 'openid.mode': 'monkeydance',
62 'openid.identity': 'http://wagu.unittest/',
63 'openid.claimed_id': 'http://wagu.unittest/',
64 'openid.return_to': return_to,
65 })
66 e = server.ProtocolError(args, "plucky")
67 self.failUnless(e.hasReturnTo())
68 expected_args = {
69 'openid.ns': [OPENID2_NS],
70 'openid.mode': ['error'],
71 'openid.error': ['plucky'],
72 }
73
74 rt_base, result_args = e.encodeToURL().split('?', 1)
75 result_args = cgi.parse_qs(result_args)
76 self.failUnlessEqual(result_args, expected_args)
77
79 return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT)
80
81 args = Message.fromPostArgs({
82 'openid.ns': OPENID2_NS,
83 'openid.mode': 'monkeydance',
84 'openid.identity': 'http://wagu.unittest/',
85 'openid.claimed_id': 'http://wagu.unittest/',
86 'openid.return_to': return_to,
87 })
88 e = server.ProtocolError(args, "plucky")
89 self.failUnless(e.hasReturnTo())
90 expected_args = {
91 'openid.ns': [OPENID2_NS],
92 'openid.mode': ['error'],
93 'openid.error': ['plucky'],
94 }
95
96 self.failUnless(e.whichEncoding() == server.ENCODE_HTML_FORM)
97 self.failUnless(e.toFormMarkup() == e.toMessage().toFormMarkup(
98 args.getArg(OPENID_NS, 'return_to')))
99
101 return_to = "http://rp.unittest/consumer" + ('x' * OPENID1_URL_LIMIT)
102
103 args = Message.fromPostArgs({
104 'openid.mode': 'monkeydance',
105 'openid.identity': 'http://wagu.unittest/',
106 'openid.return_to': return_to,
107 })
108 e = server.ProtocolError(args, "plucky")
109 self.failUnless(e.hasReturnTo())
110 expected_args = {
111 'openid.mode': ['error'],
112 'openid.error': ['plucky'],
113 }
114
115 self.failUnless(e.whichEncoding() == server.ENCODE_URL)
116
117 rt_base, result_args = e.encodeToURL().split('?', 1)
118 result_args = cgi.parse_qs(result_args)
119 self.failUnlessEqual(result_args, expected_args)
120
133
134
139
140
153
155 args = {}
156 r = self.decode(args)
157 self.failUnlessEqual(r, None)
158
160 args = {
161 'pony': 'spotted',
162 'sreg.mutant_power': 'decaffinator',
163 }
164 self.failUnlessRaises(server.ProtocolError, self.decode, args)
165
167 args = {
168 'openid.mode': 'twos-compliment',
169 'openid.pants': 'zippered',
170 }
171 self.failUnlessRaises(server.ProtocolError, self.decode, args)
172
174 args = {
175 'openid.mode': ['checkid_setup'],
176 'openid.identity': self.id_url,
177 'openid.assoc_handle': self.assoc_handle,
178 'openid.return_to': self.rt_url,
179 'openid.trust_root': self.tr_url,
180 }
181 try:
182 result = self.decode(args)
183 except TypeError, err:
184 self.failUnless(str(err).find('values') != -1, err)
185 else:
186 self.fail("Expected TypeError, but got result %s" % (result,))
187
206
208 args = {
209 'openid.mode': 'checkid_setup',
210 'openid.identity': self.id_url,
211 'openid.assoc_handle': self.assoc_handle,
212 'openid.return_to': self.rt_url,
213 'openid.trust_root': self.tr_url,
214 }
215 r = self.decode(args)
216 self.failUnless(isinstance(r, server.CheckIDRequest))
217 self.failUnlessEqual(r.mode, "checkid_setup")
218 self.failUnlessEqual(r.immediate, False)
219 self.failUnlessEqual(r.identity, self.id_url)
220 self.failUnlessEqual(r.trust_root, self.tr_url)
221 self.failUnlessEqual(r.return_to, self.rt_url)
222
224 args = {
225 'openid.ns': OPENID2_NS,
226 'openid.mode': 'checkid_setup',
227 'openid.identity': self.id_url,
228 'openid.claimed_id': self.claimed_id,
229 'openid.assoc_handle': self.assoc_handle,
230 'openid.return_to': self.rt_url,
231 'openid.realm': self.tr_url,
232 }
233 r = self.decode(args)
234 self.failUnless(isinstance(r, server.CheckIDRequest))
235 self.failUnlessEqual(r.mode, "checkid_setup")
236 self.failUnlessEqual(r.immediate, False)
237 self.failUnlessEqual(r.identity, self.id_url)
238 self.failUnlessEqual(r.claimed_id, self.claimed_id)
239 self.failUnlessEqual(r.trust_root, self.tr_url)
240 self.failUnlessEqual(r.return_to, self.rt_url)
241
243 args = {
244 'openid.ns': OPENID2_NS,
245 'openid.mode': 'checkid_setup',
246 'openid.identity': self.id_url,
247 'openid.assoc_handle': self.assoc_handle,
248 'openid.return_to': self.rt_url,
249 'openid.realm': self.tr_url,
250 }
251 self.failUnlessRaises(server.ProtocolError, self.decode, args)
252
254 args = {
255 'openid.ns': OPENID2_NS,
256 'openid.mode': 'checkid_setup',
257 'openid.assoc_handle': self.assoc_handle,
258 'openid.return_to': self.rt_url,
259 'openid.realm': self.tr_url,
260 }
261 r = self.decode(args)
262 self.failUnless(isinstance(r, server.CheckIDRequest))
263 self.failUnlessEqual(r.mode, "checkid_setup")
264 self.failUnlessEqual(r.immediate, False)
265 self.failUnlessEqual(r.identity, None)
266 self.failUnlessEqual(r.trust_root, self.tr_url)
267 self.failUnlessEqual(r.return_to, self.rt_url)
268
270 """Make sure an OpenID 1 request cannot be decoded if it lacks
271 a return_to.
272 """
273 args = {
274 'openid.mode': 'checkid_setup',
275 'openid.identity': self.id_url,
276 'openid.assoc_handle': self.assoc_handle,
277 'openid.trust_root': self.tr_url,
278 }
279 self.failUnlessRaises(server.ProtocolError, self.decode, args)
280
282 """Make sure an OpenID 2 request with no return_to can be
283 decoded, and make sure a response to such a request raises
284 NoReturnToError.
285 """
286 args = {
287 'openid.ns': OPENID2_NS,
288 'openid.mode': 'checkid_setup',
289 'openid.identity': self.id_url,
290 'openid.claimed_id': self.id_url,
291 'openid.assoc_handle': self.assoc_handle,
292 'openid.realm': self.tr_url,
293 }
294 self.failUnless(isinstance(self.decode(args), server.CheckIDRequest))
295
296 req = self.decode(args)
297 self.assertRaises(server.NoReturnToError, req.answer, False)
298 self.assertRaises(server.NoReturnToError, req.encodeToURL, 'bogus')
299 self.assertRaises(server.NoReturnToError, req.getCancelURL)
300
302 """Make sure that an OpenID 2 request which lacks return_to
303 cannot be decoded if it lacks a realm. Spec: This value
304 (openid.realm) MUST be sent if openid.return_to is omitted.
305 """
306 args = {
307 'openid.ns': OPENID2_NS,
308 'openid.mode': 'checkid_setup',
309 'openid.identity': self.id_url,
310 'openid.assoc_handle': self.assoc_handle,
311 }
312 self.failUnlessRaises(server.ProtocolError, self.decode, args)
313
315 args = {
316 'openid.mode': 'checkid_setup',
317 'openid.identity': self.id_url,
318 'openid.assoc_handle': self.assoc_handle,
319 'openid.return_to': 'not a url',
320 }
321 try:
322 result = self.decode(args)
323 except server.ProtocolError, err:
324 self.failUnless(err.openid_message)
325 else:
326 self.fail("Expected ProtocolError, instead returned with %s" %
327 (result,))
328
330 args = {
331 'openid.mode': 'checkid_setup',
332 'openid.identity': self.id_url,
333 'openid.assoc_handle': self.assoc_handle,
334 'openid.return_to': self.rt_url,
335 'openid.trust_root': 'http://not-the-return-place.unittest/',
336 }
337 try:
338 result = self.decode(args)
339 except server.UntrustedReturnURL, err:
340 self.failUnless(err.openid_message)
341 else:
342 self.fail("Expected UntrustedReturnURL, instead returned with %s" %
343 (result,))
344
346 args = {
347 'openid.mode': 'check_authentication',
348 'openid.assoc_handle': '{dumb}{handle}',
349 'openid.sig': 'sigblob',
350 'openid.signed': 'identity,return_to,response_nonce,mode',
351 'openid.identity': 'signedval1',
352 'openid.return_to': 'signedval2',
353 'openid.response_nonce': 'signedval3',
354 'openid.baz': 'unsigned',
355 }
356 r = self.decode(args)
357 self.failUnless(isinstance(r, server.CheckAuthRequest))
358 self.failUnlessEqual(r.mode, 'check_authentication')
359 self.failUnlessEqual(r.sig, 'sigblob')
360
361
363 args = {
364 'openid.mode': 'check_authentication',
365 'openid.assoc_handle': '{dumb}{handle}',
366 'openid.signed': 'foo,bar,mode',
367 'openid.foo': 'signedval1',
368 'openid.bar': 'signedval2',
369 'openid.baz': 'unsigned',
370 }
371 self.failUnlessRaises(server.ProtocolError, self.decode, args)
372
373
375 args = {
376 'openid.mode': 'check_authentication',
377 'openid.assoc_handle': '{dumb}{handle}',
378 'openid.invalidate_handle': '[[SMART_handle]]',
379 'openid.sig': 'sigblob',
380 'openid.signed': 'identity,return_to,response_nonce,mode',
381 'openid.identity': 'signedval1',
382 'openid.return_to': 'signedval2',
383 'openid.response_nonce': 'signedval3',
384 'openid.baz': 'unsigned',
385 }
386 r = self.decode(args)
387 self.failUnless(isinstance(r, server.CheckAuthRequest))
388 self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]')
389
390
392 args = {
393 'openid.mode': 'associate',
394 'openid.session_type': 'DH-SHA1',
395 'openid.dh_consumer_public': "Rzup9265tw==",
396 }
397 r = self.decode(args)
398 self.failUnless(isinstance(r, server.AssociateRequest))
399 self.failUnlessEqual(r.mode, "associate")
400 self.failUnlessEqual(r.session.session_type, "DH-SHA1")
401 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
402 self.failUnless(r.session.consumer_pubkey)
403
405 """Trying DH assoc w/o public key"""
406 args = {
407 'openid.mode': 'associate',
408 'openid.session_type': 'DH-SHA1',
409 }
410
411 self.failUnlessRaises(server.ProtocolError, self.decode, args)
412
413
415 args = {
416 'openid.mode': 'associate',
417 'openid.session_type': 'DH-SHA1',
418 'openid.dh_consumer_public': "donkeydonkeydonkey",
419 }
420 self.failUnlessRaises(server.ProtocolError, self.decode, args)
421
422
424
425 args = {
426 'openid.mode': 'associate',
427 'openid.session_type': 'DH-SHA1',
428 'openid.dh_consumer_public': "Rzup9265tw==",
429 'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS),
430 'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) ,
431 }
432 r = self.decode(args)
433 self.failUnless(isinstance(r, server.AssociateRequest))
434 self.failUnlessEqual(r.mode, "associate")
435 self.failUnlessEqual(r.session.session_type, "DH-SHA1")
436 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
437 self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS)
438 self.failUnlessEqual(r.session.dh.generator, ALT_GEN)
439 self.failUnless(r.session.consumer_pubkey)
440
441
443
444 args = {
445 'openid.mode': 'associate',
446 'openid.session_type': 'DH-SHA1',
447 'openid.dh_consumer_public': "Rzup9265tw==",
448 'openid.dh_modulus': 'pizza',
449 'openid.dh_gen': 'gnocchi',
450 }
451 self.failUnlessRaises(server.ProtocolError, self.decode, args)
452
453
455
456 args = {
457 'openid.mode': 'associate',
458 'openid.session_type': 'DH-SHA1',
459 'openid.dh_consumer_public': "Rzup9265tw==",
460 'openid.dh_modulus': 'pizza',
461 }
462 self.failUnlessRaises(server.ProtocolError, self.decode, args)
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
480 args = {
481 'openid.mode': 'associate',
482 'openid.session_type': 'FLCL6',
483 'openid.dh_consumer_public': "YQ==\n",
484 }
485 self.failUnlessRaises(server.ProtocolError, self.decode, args)
486
487
489 args = {
490 'openid.mode': 'associate',
491 }
492 r = self.decode(args)
493 self.failUnless(isinstance(r, server.AssociateRequest))
494 self.failUnlessEqual(r.mode, "associate")
495 self.failUnlessEqual(r.session.session_type, "no-encryption")
496 self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
497
499 args = {
500 'openid.session_type': 'DH-SHA1',
501 'openid.dh_consumer_public': "my public keeey",
502 }
503 self.failUnlessRaises(server.ProtocolError, self.decode, args)
504
506 args = {'openid.ns': 'Tuesday',
507 'openid.mode': 'associate'}
508
509 try:
510 r = self.decode(args)
511 except server.ProtocolError, err:
512
513
514 self.failUnless(err.openid_message)
515
516 self.failUnless('Tuesday' in str(err), str(err))
517 else:
518 self.fail("Expected ProtocolError but returned with %r" % (r,))
519
520
528
530 """
531 Check that when an OpenID 2 response does not exceed the
532 OpenID 1 message size, a GET response (i.e., redirect) is
533 issued.
534 """
535 request = server.CheckIDRequest(
536 identity = 'http://bombom.unittest/',
537 trust_root = 'http://burr.unittest/',
538 return_to = 'http://burr.unittest/999',
539 immediate = False,
540 op_endpoint = self.server.op_endpoint,
541 )
542 request.message = Message(OPENID2_NS)
543 response = server.OpenIDResponse(request)
544 response.fields = Message.fromOpenIDArgs({
545 'ns': OPENID2_NS,
546 'mode': 'id_res',
547 'identity': request.identity,
548 'claimed_id': request.identity,
549 'return_to': request.return_to,
550 })
551
552 self.failIf(response.renderAsForm())
553 self.failUnless(response.whichEncoding() == server.ENCODE_URL)
554 webresponse = self.encode(response)
555 self.failUnless(webresponse.headers.has_key('location'))
556
558 """
559 Check that when an OpenID 2 response exceeds the OpenID 1
560 message size, a POST response (i.e., an HTML form) is
561 returned.
562 """
563 request = server.CheckIDRequest(
564 identity = 'http://bombom.unittest/',
565 trust_root = 'http://burr.unittest/',
566 return_to = 'http://burr.unittest/999',
567 immediate = False,
568 op_endpoint = self.server.op_endpoint,
569 )
570 request.message = Message(OPENID2_NS)
571 response = server.OpenIDResponse(request)
572 response.fields = Message.fromOpenIDArgs({
573 'ns': OPENID2_NS,
574 'mode': 'id_res',
575 'identity': request.identity,
576 'claimed_id': request.identity,
577 'return_to': 'x' * OPENID1_URL_LIMIT,
578 })
579
580 self.failUnless(response.renderAsForm())
581 self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT)
582 self.failUnless(response.whichEncoding() == server.ENCODE_HTML_FORM)
583 webresponse = self.encode(response)
584 self.failUnlessEqual(webresponse.body, response.toFormMarkup())
585
606
608 request = server.CheckIDRequest(
609 identity = 'http://bombom.unittest/',
610 trust_root = 'http://burr.unittest/',
611 return_to = 'http://burr.unittest/999',
612 immediate = False,
613 op_endpoint = self.server.op_endpoint,
614 )
615 request.message = Message(OPENID2_NS)
616 response = server.OpenIDResponse(request)
617 response.fields = Message.fromOpenIDArgs({
618 'ns': OPENID2_NS,
619 'mode': 'id_res',
620 'identity': request.identity,
621 'claimed_id': request.identity,
622 'return_to': 'x' * OPENID1_URL_LIMIT,
623 })
624 html = response.toHTML()
625 self.failUnless('<html>' in html)
626 self.failUnless('</html>' in html)
627 self.failUnless('<body onload=' in html)
628 self.failUnless('<form' in html)
629 self.failUnless('http://bombom.unittest/' in html)
630
632 """
633 Check that when an OpenID 1 response exceeds the OpenID 1
634 message size, a GET response is issued. Technically, this
635 shouldn't be permitted by the library, but this test is in
636 place to preserve the status quo for OpenID 1.
637 """
638 request = server.CheckIDRequest(
639 identity = 'http://bombom.unittest/',
640 trust_root = 'http://burr.unittest/',
641 return_to = 'http://burr.unittest/999',
642 immediate = False,
643 op_endpoint = self.server.op_endpoint,
644 )
645 request.message = Message(OPENID2_NS)
646 response = server.OpenIDResponse(request)
647 response.fields = Message.fromOpenIDArgs({
648 'mode': 'id_res',
649 'identity': request.identity,
650 'return_to': 'x' * OPENID1_URL_LIMIT,
651 })
652
653 self.failIf(response.renderAsForm())
654 self.failUnless(len(response.encodeToURL()) > OPENID1_URL_LIMIT)
655 self.failUnless(response.whichEncoding() == server.ENCODE_URL)
656 webresponse = self.encode(response)
657 self.failUnlessEqual(webresponse.headers['location'], response.encodeToURL())
658
660 request = server.CheckIDRequest(
661 identity = 'http://bombom.unittest/',
662 trust_root = 'http://burr.unittest/',
663 return_to = 'http://burr.unittest/999',
664 immediate = False,
665 op_endpoint = self.server.op_endpoint,
666 )
667 request.message = Message(OPENID2_NS)
668 response = server.OpenIDResponse(request)
669 response.fields = Message.fromOpenIDArgs({
670 'mode': 'id_res',
671 'identity': request.identity,
672 'return_to': request.return_to,
673 })
674 webresponse = self.encode(response)
675 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
676 self.failUnless(webresponse.headers.has_key('location'))
677
678 location = webresponse.headers['location']
679 self.failUnless(location.startswith(request.return_to),
680 "%s does not start with %s" % (location,
681 request.return_to))
682
683 q2 = dict(cgi.parse_qsl(urlparse(location)[4]))
684 expected = response.fields.toPostArgs()
685 self.failUnlessEqual(q2, expected)
686
703
719
733
750
757
768
769
770
794
811
813 webresponse = self.encode(self.response)
814 self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
815 self.failUnless(webresponse.headers.has_key('location'))
816
817 location = webresponse.headers['location']
818 query = cgi.parse_qs(urlparse(location)[4])
819 self.failUnless('openid.sig' in query)
820 self.failUnless('openid.assoc_handle' in query)
821 self.failUnless('openid.signed' in query)
822
824 self.encoder.signatory = None
825 self.failUnlessRaises(ValueError, self.encode, self.response)
826
844
857
861
875
877 self.request.trust_root = "http://foo.unittest/17"
878 self.request.return_to = "http://foo.unittest/39"
879 self.failIf(self.request.trustRootValid())
880
882 self.request.trust_root = "http://foo.unittest/"
883 self.request.return_to = "http://foo.unittest/39"
884 self.failUnless(self.request.trustRootValid())
885
898
900 request = server.CheckIDRequest(
901 identity = 'http://bambam.unittest/',
902 trust_root = 'http://bar.unittest/',
903 return_to = None,
904 immediate = False,
905 op_endpoint = self.server.op_endpoint,
906 )
907
908 self.failUnless(request.trustRootValid())
909
911 """Make sure that verifyReturnTo is calling the trustroot
912 function verifyReturnTo
913 """
914 def withVerifyReturnTo(new_verify, callable):
915 old_verify = server.verifyReturnTo
916 try:
917 server.verifyReturnTo = new_verify
918 return callable()
919 finally:
920 server.verifyReturnTo = old_verify
921
922
923 sentinel = Exception()
924 def vrfyExc(trust_root, return_to):
925 self.failUnlessEqual(self.request.trust_root, trust_root)
926 self.failUnlessEqual(self.request.return_to, return_to)
927 raise sentinel
928
929 try:
930 withVerifyReturnTo(vrfyExc, self.request.returnToVerified)
931 except Exception, e:
932 self.failUnless(e is sentinel, e)
933
934
935 def constVerify(val):
936 def verify(trust_root, return_to):
937 self.failUnlessEqual(self.request.trust_root, trust_root)
938 self.failUnlessEqual(self.request.return_to, return_to)
939 return val
940 return verify
941
942 for val in [True, False]:
943 self.failUnlessEqual(
944 val,
945 withVerifyReturnTo(constVerify(val),
946 self.request.returnToVerified))
947
948 - def _expectAnswer(self, answer, identity=None, claimed_id=None):
949 expected_list = [
950 ('mode', 'id_res'),
951 ('return_to', self.request.return_to),
952 ('op_endpoint', self.op_endpoint),
953 ]
954 if identity:
955 expected_list.append(('identity', identity))
956 if claimed_id:
957 expected_list.append(('claimed_id', claimed_id))
958 else:
959 expected_list.append(('claimed_id', identity))
960
961 for k, expected in expected_list:
962 actual = answer.fields.getArg(OPENID_NS, k)
963 self.failUnlessEqual(actual, expected, "%s: expected %s, got %s" % (k, expected, actual))
964
965 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce'))
966 self.failUnless(answer.fields.getOpenIDNamespace() == OPENID2_NS)
967
968
969 self.failUnlessEqual(len(answer.fields.toPostArgs()),
970 len(expected_list) + 2,
971 answer.fields.toPostArgs())
972
974 """Check the fields specified by "Positive Assertions"
975
976 including mode=id_res, identity, claimed_id, op_endpoint, return_to
977 """
978 answer = self.request.answer(True)
979 self.failUnlessEqual(answer.request, self.request)
980 self._expectAnswer(answer, self.request.identity)
981
983 self.request.claimed_id = 'http://delegating.unittest/'
984 answer = self.request.answer(True)
985 self._expectAnswer(answer, self.request.identity,
986 self.request.claimed_id)
987
989
990
991 self.request.claimed_id = 'http://delegating.unittest/'
992 answer = self.request.answer(True, identity='http://bambam.unittest/')
993 self._expectAnswer(answer, self.request.identity,
994 self.request.claimed_id)
995
997 self.request.identity = None
998 answer = self.request.answer(True)
999 self.failUnlessEqual(answer.request, self.request)
1000 self._expectAnswer(answer)
1001
1003 self.request.identity = None
1004
1005 self.failUnlessRaises(
1006 ValueError, self.request.answer, True, identity="=V")
1007
1009 self.request.identity = IDENTIFIER_SELECT
1010 selected_id = 'http://anon.unittest/9861'
1011 answer = self.request.answer(True, identity=selected_id)
1012 self._expectAnswer(answer, selected_id)
1013
1015 """Answer an IDENTIFIER_SELECT case with a delegated identifier.
1016 """
1017
1018 self.request.identity = IDENTIFIER_SELECT
1019 selected_id = 'http://anon.unittest/9861'
1020 claimed_id = 'http://monkeyhat.unittest/'
1021 answer = self.request.answer(True, identity=selected_id,
1022 claimed_id=claimed_id)
1023 self._expectAnswer(answer, selected_id, claimed_id)
1024
1026 """claimed_id parameter doesn't exist in OpenID 1.
1027 """
1028 self.request.message = Message(OPENID1_NS)
1029
1030 self.request.identity = IDENTIFIER_SELECT
1031 selected_id = 'http://anon.unittest/9861'
1032 claimed_id = 'http://monkeyhat.unittest/'
1033 self.failUnlessRaises(server.VersionError,
1034 self.request.answer, True,
1035 identity=selected_id,
1036 claimed_id=claimed_id)
1037
1039
1040 self.failUnlessRaises(ValueError, self.request.answer, True,
1041 identity="http://pebbles.unittest/")
1042
1044
1045
1046
1047 non_normalized = 'http://bambam.unittest'
1048 normalized = non_normalized + '/'
1049
1050 self.request.identity = non_normalized
1051 self.request.claimed_id = non_normalized
1052
1053 answer = self.request.answer(True, identity=normalized)
1054
1055
1056
1057 self._expectAnswer(answer, identity=non_normalized,
1058 claimed_id=non_normalized)
1059
1061 self.request.message = Message(OPENID1_NS)
1062 self.request.identity = None
1063 self.failUnlessRaises(ValueError, self.request.answer, True,
1064 identity=None)
1065
1067 self.request.op_endpoint = None
1068 self.failUnlessRaises(RuntimeError, self.request.answer, True)
1069
1071 msg = Message(OPENID1_NS)
1072 msg.setArg(OPENID_NS, 'return_to', 'bogus')
1073 msg.setArg(OPENID_NS, 'trust_root', 'bogus')
1074 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1075 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1076
1077 self.failUnlessRaises(server.ProtocolError,
1078 server.CheckIDRequest.fromMessage,
1079 msg, self.server)
1080
1092
1104
1106 """Ignore openid.realm in OpenID 1"""
1107 msg = Message(OPENID1_NS)
1108 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1109 msg.setArg(OPENID_NS, 'trust_root', 'http://real_trust_root/')
1110 msg.setArg(OPENID_NS, 'realm', 'http://fake_trust_root/')
1111 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo')
1112 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1113 msg.setArg(OPENID_NS, 'identity', 'george')
1114
1115 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1116
1117 self.failUnless(result.trust_root == 'http://real_trust_root/')
1118
1120 """Ignore openid.trust_root in OpenID 2"""
1121 msg = Message(OPENID2_NS)
1122 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1123 msg.setArg(OPENID_NS, 'realm', 'http://real_trust_root/')
1124 msg.setArg(OPENID_NS, 'trust_root', 'http://fake_trust_root/')
1125 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo')
1126 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1127 msg.setArg(OPENID_NS, 'identity', 'george')
1128 msg.setArg(OPENID_NS, 'claimed_id', 'george')
1129
1130 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1131
1132 self.failUnless(result.trust_root == 'http://real_trust_root/')
1133
1135 self.request.trust_root = None
1136 answer = self.request.answer(True)
1137 self.failUnlessEqual(answer.request, self.request)
1138 self._expectAnswer(answer, self.request.identity)
1139
1141 msg = Message(OPENID2_NS)
1142 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1143 msg.setArg(OPENID_NS, 'return_to', 'http://real_trust_root/foo')
1144 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1145 msg.setArg(OPENID_NS, 'identity', 'george')
1146 msg.setArg(OPENID_NS, 'claimed_id', 'george')
1147
1148 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1149
1150 self.failUnlessEqual(result.trust_root, 'http://real_trust_root/foo')
1151
1153 return_to = u'http://someplace.invalid/?go=thing'
1154 msg = Message.fromPostArgs({
1155 u'openid.assoc_handle': u'{blah}{blah}{OZivdQ==}',
1156 u'openid.claimed_id': u'http://delegated.invalid/',
1157 u'openid.identity': u'http://op-local.example.com/',
1158 u'openid.mode': u'checkid_setup',
1159 u'openid.ns': u'http://openid.net/signon/1.0',
1160 u'openid.return_to': return_to,
1161 u'openid.trust_root': u''})
1162
1163 result = server.CheckIDRequest.fromMessage(msg, self.server.op_endpoint)
1164
1165 self.failUnlessEqual(result.trust_root, return_to)
1166
1168 msg = Message(OPENID2_NS)
1169 msg.setArg(OPENID_NS, 'mode', 'checkid_setup')
1170 msg.setArg(OPENID_NS, 'assoc_handle', 'bogus')
1171 msg.setArg(OPENID_NS, 'identity', 'george')
1172 msg.setArg(OPENID_NS, 'claimed_id', 'george')
1173
1174 self.failUnlessRaises(server.ProtocolError,
1175 server.CheckIDRequest.fromMessage,
1176 msg, self.server.op_endpoint)
1177
1179 """Test .allow() with an OpenID 1.x Message on a CheckIDRequest
1180 built without an op_endpoint parameter.
1181 """
1182 identity = 'http://bambam.unittest/'
1183 reqmessage = Message.fromOpenIDArgs({
1184 'identity': identity,
1185 'trust_root': 'http://bar.unittest/',
1186 'return_to': 'http://bar.unittest/999',
1187 })
1188 self.request = server.CheckIDRequest.fromMessage(reqmessage, None)
1189 answer = self.request.answer(True)
1190
1191 expected_list = [
1192 ('mode', 'id_res'),
1193 ('return_to', self.request.return_to),
1194 ('identity', identity),
1195 ]
1196
1197 for k, expected in expected_list:
1198 actual = answer.fields.getArg(OPENID_NS, k)
1199 self.failUnlessEqual(
1200 expected, actual,
1201 "%s: expected %s, got %s" % (k, expected, actual))
1202
1203 self.failUnless(answer.fields.hasKey(OPENID_NS, 'response_nonce'))
1204 self.failUnlessEqual(answer.fields.getOpenIDNamespace(), OPENID1_NS)
1205 self.failUnless(answer.fields.namespaces.isImplicit(OPENID1_NS))
1206
1207
1208 self.failUnlessEqual(len(answer.fields.toPostArgs()),
1209 len(expected_list) + 1,
1210 answer.fields.toPostArgs())
1211
1234
1251
1257
1271
1273 url = self.request.getCancelURL()
1274 rt, query_string = url.split('?')
1275 self.failUnlessEqual(self.request.return_to, rt)
1276 query = dict(cgi.parse_qsl(query_string))
1277 self.failUnlessEqual(query, {'openid.mode':'cancel',
1278 'openid.ns':OPENID2_NS})
1279
1281 self.request.mode = 'checkid_immediate'
1282 self.request.immediate = True
1283 self.failUnlessRaises(ValueError, self.request.getCancelURL)
1284
1285
1286
1288
1290 self.op_endpoint = 'http://endpoint.unittest/ext'
1291 self.store = memstore.MemoryStore()
1292 self.server = server.Server(self.store, self.op_endpoint)
1293 self.request = server.CheckIDRequest(
1294 identity = 'http://bambam.unittest/',
1295 trust_root = 'http://bar.unittest/',
1296 return_to = 'http://bar.unittest/999',
1297 immediate = False,
1298 op_endpoint = self.server.op_endpoint,
1299 )
1300 self.request.message = Message(OPENID2_NS)
1301 self.response = server.OpenIDResponse(self.request)
1302 self.response.fields.setArg(OPENID_NS, 'mode', 'id_res')
1303 self.response.fields.setArg(OPENID_NS, 'blue', 'star')
1304
1305
1316
1317
1328
1329
1330
1332 isValid = True
1333
1336
1337 - def verify(self, assoc_handle, message):
1343
1345 if (dumb, assoc_handle) in self.assocs:
1346
1347
1348 return True
1349 else:
1350 return None
1351
1353 if (dumb, assoc_handle) in self.assocs:
1354 self.assocs.remove((dumb, assoc_handle))
1355
1356
1369
1371 r = self.request.answer(self.signatory)
1372 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'})
1373 self.failUnlessEqual(r.request, self.request)
1374
1376 self.signatory.isValid = False
1377 r = self.request.answer(self.signatory)
1378 self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
1379 {'is_valid': 'false'})
1380
1382 """Don't validate the same response twice.
1383
1384 From "Checking the Nonce"::
1385
1386 When using "check_authentication", the OP MUST ensure that an
1387 assertion has not yet been accepted with the same value for
1388 "openid.response_nonce".
1389
1390 In this implementation, the assoc_handle is only valid once. And
1391 nonces are a signed component of the message, so they can't be used
1392 with another handle without breaking the sig.
1393 """
1394 r = self.request.answer(self.signatory)
1395 r = self.request.answer(self.signatory)
1396 self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
1397 {'is_valid': 'false'})
1398
1400 self.request.invalidate_handle = "bogusHandle"
1401 r = self.request.answer(self.signatory)
1402 self.failUnlessEqual(r.fields.getArgs(OPENID_NS),
1403 {'is_valid': 'true',
1404 'invalidate_handle': "bogusHandle"})
1405 self.failUnlessEqual(r.request, self.request)
1406
1408 assoc_handle = 'goodhandle'
1409 self.signatory.assocs.append((False, 'goodhandle'))
1410 self.request.invalidate_handle = assoc_handle
1411 r = self.request.answer(self.signatory)
1412 self.failUnlessEqual(r.fields.getArgs(OPENID_NS), {'is_valid': 'true'})
1413
1414
1416
1417
1418
1424
1426 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1')
1427 from openid.dh import DiffieHellman
1428 from openid.server.server import DiffieHellmanSHA1ServerSession
1429 consumer_dh = DiffieHellman.fromDefaults()
1430 cpub = consumer_dh.public
1431 server_dh = DiffieHellman.fromDefaults()
1432 session = DiffieHellmanSHA1ServerSession(server_dh, cpub)
1433 self.request = server.AssociateRequest(session, 'HMAC-SHA1')
1434 response = self.request.answer(self.assoc)
1435 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1436 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1437 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1438 self.failIf(rfg("mac_key"))
1439 self.failUnlessEqual(rfg("session_type"), "DH-SHA1")
1440 self.failUnless(rfg("enc_mac_key"))
1441 self.failUnless(rfg("dh_server_public"))
1442
1443 enc_key = rfg("enc_mac_key").decode('base64')
1444 spub = cryptutil.base64ToLong(rfg("dh_server_public"))
1445 secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha1)
1446 self.failUnlessEqual(secret, self.assoc.secret)
1447
1448
1449 if not cryptutil.SHA256_AVAILABLE:
1450 warnings.warn("Not running SHA256 tests.")
1451 else:
1453 self.assoc = self.signatory.createAssociation(
1454 dumb=False, assoc_type='HMAC-SHA256')
1455 from openid.dh import DiffieHellman
1456 from openid.server.server import DiffieHellmanSHA256ServerSession
1457 consumer_dh = DiffieHellman.fromDefaults()
1458 cpub = consumer_dh.public
1459 server_dh = DiffieHellman.fromDefaults()
1460 session = DiffieHellmanSHA256ServerSession(server_dh, cpub)
1461 self.request = server.AssociateRequest(session, 'HMAC-SHA256')
1462 response = self.request.answer(self.assoc)
1463 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1464 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA256")
1465 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1466 self.failIf(rfg("mac_key"))
1467 self.failUnlessEqual(rfg("session_type"), "DH-SHA256")
1468 self.failUnless(rfg("enc_mac_key"))
1469 self.failUnless(rfg("dh_server_public"))
1470
1471 enc_key = rfg("enc_mac_key").decode('base64')
1472 spub = cryptutil.base64ToLong(rfg("dh_server_public"))
1473 secret = consumer_dh.xorSecret(spub, enc_key, cryptutil.sha256)
1474 self.failUnlessEqual(secret, self.assoc.secret)
1475
1500
1525
1527
1528 contact = 'user@example.invalid'
1529 reference = 'Trac ticket number MAX_INT'
1530 error = 'poltergeist'
1531
1532 openid1_args = {
1533 'openid.identitiy': 'invalid',
1534 'openid.mode': 'checkid_setup',
1535 }
1536
1537 openid2_args = dict(openid1_args)
1538 openid2_args.update({'openid.ns': OPENID2_NS})
1539
1540
1541
1542 openid1_msg = Message.fromPostArgs(openid1_args)
1543 p = server.ProtocolError(openid1_msg, error,
1544 contact=contact, reference=reference)
1545 reply = p.toMessage()
1546
1547 self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference)
1548 self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact)
1549
1550 openid2_msg = Message.fromPostArgs(openid2_args)
1551 p = server.ProtocolError(openid2_msg, error,
1552 contact=contact, reference=reference)
1553 reply = p.toMessage()
1554
1555 self.failUnlessEqual(reply.getArg(OPENID_NS, 'reference'), reference)
1556 self.failUnlessEqual(reply.getArg(OPENID_NS, 'contact'), contact)
1557
1559 expires_in_str = msg.getArg(OPENID_NS, 'expires_in', no_default)
1560 expires_in = int(expires_in_str)
1561
1562
1563
1564 slop = 1
1565 difference = expected_expires_in - expires_in
1566
1567 error_message = ('"expires_in" value not within %s of expected: '
1568 'expected=%s, actual=%s' %
1569 (slop, expected_expires_in, expires_in))
1570 self.failUnless(0 <= difference <= slop, error_message)
1571
1572 - def test_plaintext(self):
1573 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA1')
1574 response = self.request.answer(self.assoc)
1575 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1576
1577 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1578 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1579
1580 self.failUnlessExpiresInMatches(
1581 response.fields, self.signatory.SECRET_LIFETIME)
1582
1583 self.failUnlessEqual(
1584 rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
1585 self.failIf(rfg("session_type"))
1586 self.failIf(rfg("enc_mac_key"))
1587 self.failIf(rfg("dh_server_public"))
1588
1590
1591
1592 args = {
1593 'openid.ns': OPENID2_NS,
1594 'openid.mode': 'associate',
1595 'openid.assoc_type': 'HMAC-SHA1',
1596 'openid.session_type': 'no-encryption',
1597 }
1598 self.request = server.AssociateRequest.fromMessage(
1599 Message.fromPostArgs(args))
1600
1601 self.failIf(self.request.message.isOpenID1())
1602
1603 self.assoc = self.signatory.createAssociation(
1604 dumb=False, assoc_type='HMAC-SHA1')
1605 response = self.request.answer(self.assoc)
1606 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1607
1608 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1609 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1610
1611 self.failUnlessExpiresInMatches(
1612 response.fields, self.signatory.SECRET_LIFETIME)
1613
1614 self.failUnlessEqual(
1615 rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
1616
1617 self.failUnlessEqual(rfg("session_type"), "no-encryption")
1618 self.failIf(rfg("enc_mac_key"))
1619 self.failIf(rfg("dh_server_public"))
1620
1622 self.assoc = self.signatory.createAssociation(dumb=False, assoc_type='HMAC-SHA256')
1623 response = self.request.answer(self.assoc)
1624 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1625
1626 self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
1627 self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
1628
1629 self.failUnlessExpiresInMatches(
1630 response.fields, self.signatory.SECRET_LIFETIME)
1631
1632 self.failUnlessEqual(
1633 rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
1634 self.failIf(rfg("session_type"))
1635 self.failIf(rfg("enc_mac_key"))
1636 self.failIf(rfg("dh_server_public"))
1637
1639 allowed_assoc = 'COLD-PET-RAT'
1640 allowed_sess = 'FROG-BONES'
1641 message = 'This is a unit test'
1642
1643
1644
1645 self.request.message = Message(OPENID2_NS)
1646
1647 response = self.request.answerUnsupported(
1648 message=message,
1649 preferred_session_type=allowed_sess,
1650 preferred_association_type=allowed_assoc,
1651 )
1652 rfg = lambda f: response.fields.getArg(OPENID_NS, f)
1653 self.failUnlessEqual(rfg('error_code'), 'unsupported-type')
1654 self.failUnlessEqual(rfg('assoc_type'), allowed_assoc)
1655 self.failUnlessEqual(rfg('error'), message)
1656 self.failUnlessEqual(rfg('session_type'), allowed_sess)
1657
1671
1678
1697
1703
1726
1728 """Request an assoc type that is not supported when there are
1729 supported types.
1730
1731 Should give back an error message with a fallback type.
1732 """
1733 self.server.negotiator.setAllowedTypes([('HMAC-SHA256', 'DH-SHA256')])
1734
1735 msg = Message.fromPostArgs({
1736 'openid.ns': OPENID2_NS,
1737 'openid.session_type': 'no-encryption',
1738 })
1739
1740 request = server.AssociateRequest.fromMessage(msg)
1741 response = self.server.openid_associate(request)
1742
1743 self.failUnless(response.fields.hasKey(OPENID_NS, "error"))
1744 self.failUnless(response.fields.hasKey(OPENID_NS, "error_code"))
1745 self.failIf(response.fields.hasKey(OPENID_NS, "assoc_handle"))
1746 self.failUnlessEqual(response.fields.getArg(OPENID_NS, "assoc_type"),
1747 'HMAC-SHA256')
1748 self.failUnlessEqual(response.fields.getArg(OPENID_NS, "session_type"),
1749 'DH-SHA256')
1750
1751 if not cryptutil.SHA256_AVAILABLE:
1752 warnings.warn("Not running SHA256 tests.")
1753 else:
1770
1779
1784
1792
1794 request = server.OpenIDRequest()
1795 assoc_handle = '{assoc}{lookatme}'
1796 self.store.storeAssociation(
1797 self._normal_key,
1798 association.Association.fromExpiresIn(60, assoc_handle,
1799 'sekrit', 'HMAC-SHA1'))
1800 request.assoc_handle = assoc_handle
1801 request.namespace = OPENID1_NS
1802 response = server.OpenIDResponse(request)
1803 response.fields = Message.fromOpenIDArgs({
1804 'foo': 'amsigned',
1805 'bar': 'notsigned',
1806 'azu': 'alsosigned',
1807 })
1808 sresponse = self.signatory.sign(response)
1809 self.failUnlessEqual(
1810 sresponse.fields.getArg(OPENID_NS, 'assoc_handle'),
1811 assoc_handle)
1812 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
1813 'assoc_handle,azu,bar,foo,signed')
1814 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1815 self.failIf(self.messages, self.messages)
1816
1818 request = server.OpenIDRequest()
1819 request.assoc_handle = None
1820 request.namespace = OPENID2_NS
1821 response = server.OpenIDResponse(request)
1822 response.fields = Message.fromOpenIDArgs({
1823 'foo': 'amsigned',
1824 'bar': 'notsigned',
1825 'azu': 'alsosigned',
1826 'ns':OPENID2_NS,
1827 })
1828 sresponse = self.signatory.sign(response)
1829 assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
1830 self.failUnless(assoc_handle)
1831 assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1832 self.failUnless(assoc)
1833 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
1834 'assoc_handle,azu,bar,foo,ns,signed')
1835 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1836 self.failIf(self.messages, self.messages)
1837
1839 """Sign a response to a message with an expired handle (using invalidate_handle).
1840
1841 From "Verifying with an Association"::
1842
1843 If an authentication request included an association handle for an
1844 association between the OP and the Relying party, and the OP no
1845 longer wishes to use that handle (because it has expired or the
1846 secret has been compromised, for instance), the OP will send a
1847 response that must be verified directly with the OP, as specified
1848 in Section 11.3.2. In that instance, the OP will include the field
1849 "openid.invalidate_handle" set to the association handle that the
1850 Relying Party included with the original request.
1851 """
1852 request = server.OpenIDRequest()
1853 request.namespace = OPENID2_NS
1854 assoc_handle = '{assoc}{lookatme}'
1855 self.store.storeAssociation(
1856 self._normal_key,
1857 association.Association.fromExpiresIn(-10, assoc_handle,
1858 'sekrit', 'HMAC-SHA1'))
1859 self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle))
1860
1861 request.assoc_handle = assoc_handle
1862 response = server.OpenIDResponse(request)
1863 response.fields = Message.fromOpenIDArgs({
1864 'foo': 'amsigned',
1865 'bar': 'notsigned',
1866 'azu': 'alsosigned',
1867 })
1868 sresponse = self.signatory.sign(response)
1869
1870 new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
1871 self.failUnless(new_assoc_handle)
1872 self.failIfEqual(new_assoc_handle, assoc_handle)
1873
1874 self.failUnlessEqual(
1875 sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'),
1876 assoc_handle)
1877
1878 self.failUnlessEqual(sresponse.fields.getArg(OPENID_NS, 'signed'),
1879 'assoc_handle,azu,bar,foo,invalidate_handle,signed')
1880 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1881
1882
1883 self.failIf(self.store.getAssociation(self._normal_key, assoc_handle),
1884 "expired association is still retrievable.")
1885
1886
1887 self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
1888 self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
1889 self.failUnless(self.messages)
1890
1891
1893 request = server.OpenIDRequest()
1894 request.namespace = OPENID2_NS
1895 assoc_handle = '{bogus-assoc}{notvalid}'
1896
1897 request.assoc_handle = assoc_handle
1898 response = server.OpenIDResponse(request)
1899 response.fields = Message.fromOpenIDArgs({
1900 'foo': 'amsigned',
1901 'bar': 'notsigned',
1902 'azu': 'alsosigned',
1903 })
1904 sresponse = self.signatory.sign(response)
1905
1906 new_assoc_handle = sresponse.fields.getArg(OPENID_NS, 'assoc_handle')
1907 self.failUnless(new_assoc_handle)
1908 self.failIfEqual(new_assoc_handle, assoc_handle)
1909
1910 self.failUnlessEqual(
1911 sresponse.fields.getArg(OPENID_NS, 'invalidate_handle'),
1912 assoc_handle)
1913
1914 self.failUnlessEqual(
1915 sresponse.fields.getArg(OPENID_NS, 'signed'), 'assoc_handle,azu,bar,foo,invalidate_handle,signed')
1916 self.failUnless(sresponse.fields.getArg(OPENID_NS, 'sig'))
1917
1918
1919 self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
1920 self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
1921 self.failIf(self.messages, self.messages)
1922
1923
1925 assoc_handle = '{vroom}{zoom}'
1926 assoc = association.Association.fromExpiresIn(
1927 60, assoc_handle, 'sekrit', 'HMAC-SHA1')
1928
1929 self.store.storeAssociation(self._dumb_key, assoc)
1930
1931 signed = Message.fromPostArgs({
1932 'openid.foo': 'bar',
1933 'openid.apple': 'orange',
1934 'openid.assoc_handle': assoc_handle,
1935 'openid.signed': 'apple,assoc_handle,foo,signed',
1936 'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco=',
1937 })
1938
1939 verified = self.signatory.verify(assoc_handle, signed)
1940 self.failIf(self.messages, self.messages)
1941 self.failUnless(verified)
1942
1943
1945 assoc_handle = '{vroom}{zoom}'
1946 assoc = association.Association.fromExpiresIn(
1947 60, assoc_handle, 'sekrit', 'HMAC-SHA1')
1948
1949 self.store.storeAssociation(self._dumb_key, assoc)
1950
1951 signed = Message.fromPostArgs({
1952 'openid.foo': 'bar',
1953 'openid.apple': 'orange',
1954 'openid.assoc_handle': assoc_handle,
1955 'openid.signed': 'apple,assoc_handle,foo,signed',
1956 'openid.sig': 'uXoT1qm62/BB09Xbj98TQ8mlBco='.encode('rot13'),
1957 })
1958
1959 verified = self.signatory.verify(assoc_handle, signed)
1960 self.failIf(self.messages, self.messages)
1961 self.failIf(verified)
1962
1964 assoc_handle = '{vroom}{zoom}'
1965 signed = Message.fromPostArgs({
1966 'foo': 'bar',
1967 'apple': 'orange',
1968 'openid.sig': "Ylu0KcIR7PvNegB/K41KpnRgJl0=",
1969 })
1970
1971 verified = self.signatory.verify(assoc_handle, signed)
1972 self.failIf(verified)
1973 self.failUnless(self.messages)
1974
1975
1977 """Attempt to validate sign-all message with a signed-list assoc."""
1978 assoc_handle = '{vroom}{zoom}'
1979 assoc = association.Association.fromExpiresIn(
1980 60, assoc_handle, 'sekrit', 'HMAC-SHA1')
1981
1982 self.store.storeAssociation(self._dumb_key, assoc)
1983
1984 signed = Message.fromPostArgs({
1985 'foo': 'bar',
1986 'apple': 'orange',
1987 'openid.sig': "d71xlHtqnq98DonoSgoK/nD+QRM=",
1988 })
1989
1990 verified = self.signatory.verify(assoc_handle, signed)
1991 self.failIf(verified)
1992 self.failUnless(self.messages)
1993
1995 assoc_handle = self.makeAssoc(dumb=True)
1996 assoc = self.signatory.getAssociation(assoc_handle, True)
1997 self.failUnless(assoc)
1998 self.failUnlessEqual(assoc.handle, assoc_handle)
1999 self.failIf(self.messages, self.messages)
2000
2002 assoc_handle = self.makeAssoc(dumb=True, lifetime=-10)
2003 assoc = self.signatory.getAssociation(assoc_handle, True)
2004 self.failIf(assoc, assoc)
2005 self.failUnless(self.messages)
2006
2008 ah = 'no-such-handle'
2009 self.failUnlessEqual(
2010 self.signatory.getAssociation(ah, dumb=False), None)
2011 self.failIf(self.messages, self.messages)
2012
2014 """getAssociation(dumb=False) cannot get a dumb assoc"""
2015 assoc_handle = self.makeAssoc(dumb=True)
2016 self.failUnlessEqual(
2017 self.signatory.getAssociation(assoc_handle, dumb=False), None)
2018 self.failIf(self.messages, self.messages)
2019
2021 """getAssociation(dumb=True) cannot get a shared assoc
2022
2023 From "Verifying Directly with the OpenID Provider"::
2024
2025 An OP MUST NOT verify signatures for associations that have shared
2026 MAC keys.
2027 """
2028 assoc_handle = self.makeAssoc(dumb=False)
2029 self.failUnlessEqual(
2030 self.signatory.getAssociation(assoc_handle, dumb=True), None)
2031 self.failIf(self.messages, self.messages)
2032
2037
2045
2060
2061
2062
2063 if __name__ == '__main__':
2064 unittest.main()
2065