1 import urlparse
2 import cgi
3 import time
4 import warnings
5
6 from openid.message import Message, OPENID_NS, OPENID2_NS, IDENTIFIER_SELECT, \
7 OPENID1_NS, BARE_NS
8 from openid import cryptutil, dh, oidutil, kvform
9 from openid.store.nonce import mkNonce, split as splitNonce
10 from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_2_0_TYPE, \
11 OPENID_1_1_TYPE
12 from openid.consumer.consumer import \
13 AuthRequest, GenericConsumer, SUCCESS, FAILURE, CANCEL, SETUP_NEEDED, \
14 SuccessResponse, FailureResponse, SetupNeededResponse, CancelResponse, \
15 DiffieHellmanSHA1ConsumerSession, Consumer, PlainTextConsumerSession, \
16 SetupNeededError, DiffieHellmanSHA256ConsumerSession, ServerError, \
17 ProtocolError, _httpResponseToMessage
18 from openid import association
19 from openid.server.server import \
20 PlainTextServerSession, DiffieHellmanSHA1ServerSession
21 from openid.yadis.manager import Discovery
22 from openid.yadis.discover import DiscoveryFailure
23 from openid.dh import DiffieHellman
24
25 from openid.fetchers import HTTPResponse, HTTPFetchingError
26 from openid import fetchers
27 from openid.store import memstore
28
29 from support import CatchLogs
30
31 assocs = [
32 ('another 20-byte key.', 'Snarky'),
33 ('\x00' * 20, 'Zeros'),
34 ]
35
41
43 q = {}
44 for (k, v) in cgi.parse_qsl(qs):
45 assert not q.has_key(k)
46 q[k] = v
47 return q
48
49 -def associate(qs, assoc_secret, assoc_handle):
50 """Do the server's half of the associate call, using the given
51 secret and handle."""
52 q = parseQuery(qs)
53 assert q['openid.mode'] == 'associate'
54 assert q['openid.assoc_type'] == 'HMAC-SHA1'
55 reply_dict = {
56 'assoc_type':'HMAC-SHA1',
57 'assoc_handle':assoc_handle,
58 'expires_in':'600',
59 }
60
61 if q.get('openid.session_type') == 'DH-SHA1':
62 assert len(q) == 6 or len(q) == 4
63 message = Message.fromPostArgs(q)
64 session = DiffieHellmanSHA1ServerSession.fromMessage(message)
65 reply_dict['session_type'] = 'DH-SHA1'
66 else:
67 assert len(q) == 2
68 session = PlainTextServerSession.fromQuery(q)
69
70 reply_dict.update(session.answer(assoc_secret))
71 return kvform.dictToKV(reply_dict)
72
73
74 GOODSIG = "[A Good Signature]"
75
76
86
87
91
92
94 - def __init__(self, user_url, user_page, (assoc_secret, assoc_handle)):
95 self.get_responses = {user_url:self.response(user_url, 200, user_page)}
96 self.assoc_secret = assoc_secret
97 self.assoc_handle = assoc_handle
98 self.num_assocs = 0
99
103
104 - def fetch(self, url, body=None, headers=None):
105 if body is None:
106 if url in self.get_responses:
107 return self.get_responses[url]
108 else:
109 try:
110 body.index('openid.mode=associate')
111 except ValueError:
112 pass
113 else:
114 assert body.find('DH-SHA1') != -1
115 response = associate(
116 body, self.assoc_secret, self.assoc_handle)
117 self.num_assocs += 1
118 return self.response(url, 200, response)
119
120 return self.response(url, 404, 'Not found')
121
128
131
132 -def _test_success(server_url, user_url, delegate_url, links, immediate=False):
133 store = memstore.MemoryStore()
134 if immediate:
135 mode = 'checkid_immediate'
136 else:
137 mode = 'checkid_setup'
138
139 endpoint = OpenIDServiceEndpoint()
140 endpoint.claimed_id = user_url
141 endpoint.server_url = server_url
142 endpoint.local_id = delegate_url
143 endpoint.type_uris = [OPENID_1_1_TYPE]
144
145 fetcher = TestFetcher(None, None, assocs[0])
146 fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
147
148 def run():
149 trust_root = consumer_url
150
151 consumer = GenericConsumer(store)
152 setConsumerSession(consumer)
153
154 request = consumer.begin(endpoint)
155 return_to = consumer_url
156
157 m = request.getMessage(trust_root, return_to, immediate)
158
159 redirect_url = request.redirectURL(trust_root, return_to, immediate)
160
161 parsed = urlparse.urlparse(redirect_url)
162 qs = parsed[4]
163 q = parseQuery(qs)
164 new_return_to = q['openid.return_to']
165 del q['openid.return_to']
166 assert q == {
167 'openid.mode':mode,
168 'openid.identity':delegate_url,
169 'openid.trust_root':trust_root,
170 'openid.assoc_handle':fetcher.assoc_handle,
171 }, (q, user_url, delegate_url, mode)
172
173 assert new_return_to.startswith(return_to)
174 assert redirect_url.startswith(server_url)
175
176 parsed = urlparse.urlparse(new_return_to)
177 query = parseQuery(parsed[4])
178 query.update({
179 'openid.mode':'id_res',
180 'openid.return_to':new_return_to,
181 'openid.identity':delegate_url,
182 'openid.assoc_handle':fetcher.assoc_handle,
183 })
184
185 assoc = store.getAssociation(server_url, fetcher.assoc_handle)
186
187 message = Message.fromPostArgs(query)
188 message = assoc.signMessage(message)
189 info = consumer.complete(message, request.endpoint, new_return_to)
190 assert info.status == SUCCESS, info.message
191 assert info.identity_url == user_url
192
193 assert fetcher.num_assocs == 0
194 run()
195 assert fetcher.num_assocs == 1
196
197
198 run()
199 assert fetcher.num_assocs == 1
200
201
202 store.removeAssociation(server_url, fetcher.assoc_handle)
203 run()
204 assert fetcher.num_assocs == 2
205
206
207 run()
208 assert fetcher.num_assocs == 2
209
210 import unittest
211
212 http_server_url = 'http://server.example.com/'
213 consumer_url = 'http://consumer.example.com/'
214 https_server_url = 'https://server.example.com/'
215
248
249
252
253
256 self.store_sentinel = object()
257
259 oidc = GenericConsumer(self.store_sentinel)
260 self.failUnless(oidc.store is self.store_sentinel)
261
264
265
266 -class TestIdRes(unittest.TestCase, CatchLogs):
267 consumer_class = GenericConsumer
268
280
282 """Set the discovery verification to a no-op for test cases in
283 which we don't care."""
284 def dummyVerifyDiscover(_, endpoint):
285 return endpoint
286 self.consumer._verifyDiscoveryResults = dummyVerifyDiscover
287
289 def checkReturnTo(unused1, unused2):
290 return True
291 self.consumer._checkReturnTo = checkReturnTo
292 complete = self.consumer.complete
293 def callCompleteWithoutReturnTo(message, endpoint):
294 return complete(message, endpoint, None)
295 self.consumer.complete = callCompleteWithoutReturnTo
296
299 TestIdRes.setUp(self)
300 self.assoc = GoodAssociation()
301 self.assoc.handle = "{not_dumb}"
302 self.store.storeAssociation(self.endpoint.server_url, self.assoc)
303
304 self.message = Message.fromPostArgs({
305 'openid.mode': 'id_res',
306 'openid.identity': '=example',
307 'openid.sig': GOODSIG,
308 'openid.assoc_handle': self.assoc.handle,
309 'openid.signed': 'mode,identity,assoc_handle,signed',
310 'frobboz': 'banzit',
311 })
312
313
318
319
325
326
335
343
353
362
363
377
379 """Testing GenericConsumer.complete.
380
381 Other TestIdRes subclasses test more specific aspects.
382 """
383
385 message = Message.fromOpenIDArgs({'mode': 'id_res'})
386 setup_url_sentinel = object()
387
388 def raiseSetupNeeded(msg):
389 self.failUnless(msg is message)
390 raise SetupNeededError(setup_url_sentinel)
391
392 self.consumer._checkSetupNeeded = raiseSetupNeeded
393
394 response = self.consumer.complete(message, None, None)
395 self.failUnlessEqual(SETUP_NEEDED, response.status)
396 self.failUnless(setup_url_sentinel is response.setup_url)
397
404
410
421
436
438 msg = 'an error message'
439 contact = 'me'
440 reference = 'support ticket'
441 message = Message.fromPostArgs({'openid.mode': 'error',
442 'openid.error': msg, 'openid.reference': reference,
443 'openid.contact': contact, 'openid.ns': OPENID2_NS,
444 })
445 r = self.consumer.complete(message, self.endpoint, None)
446 self.failUnlessEqual(r.status, FAILURE)
447 self.failUnless(r.identity_url == self.endpoint.claimed_id)
448 self.failUnless(r.contact == contact)
449 self.failUnless(r.reference == reference)
450 self.failUnlessEqual(r.message, msg)
451
457
465
467 class VerifiedError(Exception): pass
468
469 def discoverAndVerify(claimed_id, _to_match_endpoints):
470 raise VerifiedError
471
472 self.consumer._discoverAndVerify = discoverAndVerify
473 self.disableReturnToChecking()
474
475 message = Message.fromPostArgs(
476 {'openid.mode': 'id_res',
477 'openid.return_to': 'return_to (just anything)',
478 'openid.identity': 'something wrong (not self.consumer_id)',
479 'openid.assoc_handle': 'does not matter',
480 'openid.sig': GOODSIG,
481 'openid.signed': 'identity,return_to',
482 })
483 self.consumer.store = GoodAssocStore()
484
485 self.failUnlessRaises(VerifiedError,
486 self.consumer.complete,
487 message, self.endpoint)
488
489 self.failUnlessLogMatches('Error attempting to use stored',
490 'Attempting discovery')
491
493
495 self.store = GoodAssocStore()
496 self.consumer = GenericConsumer(self.store)
497 self.server_url = "http://idp.unittest/"
498 CatchLogs.setUp(self)
499
500 claimed_id = 'bogus.claimed'
501
502 self.message = Message.fromOpenIDArgs(
503 {'mode': 'id_res',
504 'return_to': 'return_to (just anything)',
505 'identity': claimed_id,
506 'assoc_handle': 'does not matter',
507 'sig': GOODSIG,
508 'response_nonce': mkNonce(),
509 'signed': 'identity,return_to,response_nonce,assoc_handle,claimed_id,op_endpoint',
510 'claimed_id': claimed_id,
511 'op_endpoint': self.server_url,
512 'ns':OPENID2_NS,
513 })
514
515 self.endpoint = OpenIDServiceEndpoint()
516 self.endpoint.server_url = self.server_url
517 self.endpoint.claimed_id = claimed_id
518 self.consumer._checkReturnTo = lambda unused1, unused2 : True
519
522
523
525 def _vrfy(resp_msg, endpoint=None):
526 return endpoint
527
528 self.consumer._verifyDiscoveryResults = _vrfy
529 r = self.consumer.complete(self.message, self.endpoint, None)
530 self.failUnlessSuccess(r)
531
532
540
541
546
547
552
553
558
559
564
565
569
570
571
576
579
589
595
601
607
609 """Make sure that the handle is invalidated when is_valid is false
610
611 From "Verifying directly with the OpenID Provider"::
612
613 If the OP responds with "is_valid" set to "true", and
614 "invalidate_handle" is present, the Relying Party SHOULD
615 NOT send further authentication requests with that handle.
616 """
617 self._createAssoc()
618 response = Message.fromOpenIDArgs({
619 'is_valid':'false',
620 'invalidate_handle':'handle',
621 })
622 r = self.consumer._processCheckAuthResponse(response, self.server_url)
623 self.failIf(r)
624 self.failUnless(
625 self.consumer.store.getAssociation(self.server_url) is None)
626
638
651
653 """invalidate_handle with a handle that exists
654
655 From "Verifying directly with the OpenID Provider"::
656
657 If the OP responds with "is_valid" set to "true", and
658 "invalidate_handle" is present, the Relying Party SHOULD
659 NOT send further authentication requests with that handle.
660 """
661 self._createAssoc()
662 response = Message.fromOpenIDArgs({
663 'is_valid':'true',
664 'invalidate_handle':'handle',
665 })
666 r = self.consumer._processCheckAuthResponse(response, self.server_url)
667 self.failUnless(r)
668 self.failUnless(
669 self.consumer.store.getAssociation(self.server_url) is None)
670
673 try:
674 self.consumer._checkSetupNeeded(message)
675 except SetupNeededError, why:
676 self.failUnlessEqual(expected_setup_url, why.user_setup_url)
677 else:
678 self.fail("Expected to find an immediate-mode response")
679
689
691 """Extra stuff along with setup_url still trigger Setup Needed"""
692 setup_url = 'http://unittest/setup-here'
693 message = Message.fromPostArgs({
694 'openid.mode': 'id_res',
695 'openid.user_setup_url': setup_url,
696 'openid.identity': 'bogus',
697 })
698 self.failUnless(message.isOpenID1())
699 self.failUnlessSetupNeeded(setup_url, message)
700
709
719
731
742
753
754 test_openid1Success = mkSuccessTest(
755 {'return_to':'return',
756 'assoc_handle':'assoc handle',
757 'sig':'a signature',
758 'identity':'someone',
759 },
760 ['return_to', 'identity'])
761
762 test_openid2Success = mkSuccessTest(
763 {'ns':OPENID2_NS,
764 'return_to':'return',
765 'assoc_handle':'assoc handle',
766 'sig':'a signature',
767 'op_endpoint':'my favourite server',
768 'response_nonce':'use only once',
769 },
770 ['return_to', 'response_nonce', 'assoc_handle', 'op_endpoint'])
771
772 test_openid2Success_identifiers = mkSuccessTest(
773 {'ns':OPENID2_NS,
774 'return_to':'return',
775 'assoc_handle':'assoc handle',
776 'sig':'a signature',
777 'claimed_id':'i claim to be me',
778 'identity':'my server knows me as me',
779 'op_endpoint':'my favourite server',
780 'response_nonce':'use only once',
781 },
782 ['return_to', 'response_nonce', 'identity',
783 'claimed_id', 'assoc_handle', 'op_endpoint'])
784
786 def test(self):
787 message = Message.fromOpenIDArgs(openid_args)
788 try:
789 self.consumer._idResCheckForFields(message)
790 except ProtocolError, why:
791 self.failUnless(why[0].startswith('Missing required'))
792 else:
793 self.fail('Expected an error, but none occurred')
794 return test
795
805 return test
806
807 test_openid1Missing_returnToSig = mkMissingSignedTest(
808 {'return_to':'return',
809 'assoc_handle':'assoc handle',
810 'sig':'a signature',
811 'identity':'someone',
812 'signed':'identity',
813 })
814
815 test_openid1Missing_identitySig = mkMissingSignedTest(
816 {'return_to':'return',
817 'assoc_handle':'assoc handle',
818 'sig':'a signature',
819 'identity':'someone',
820 'signed':'return_to'
821 })
822
823 test_openid2Missing_opEndpointSig = mkMissingSignedTest(
824 {'ns':OPENID2_NS,
825 'return_to':'return',
826 'assoc_handle':'assoc handle',
827 'sig':'a signature',
828 'identity':'someone',
829 'op_endpoint':'the endpoint',
830 'signed':'return_to,identity,assoc_handle'
831 })
832
833 test_openid1MissingReturnTo = mkMissingFieldTest(
834 {'assoc_handle':'assoc handle',
835 'sig':'a signature',
836 'identity':'someone',
837 })
838
839 test_openid1MissingAssocHandle = mkMissingFieldTest(
840 {'return_to':'return',
841 'sig':'a signature',
842 'identity':'someone',
843 })
844
845
846
848
854
857
866
873
882
889
899
901 """remove the nonce from the store
902
903 From "Checking the Nonce"::
904
905 When the Relying Party checks the signature on an assertion, the
906
907 Relying Party SHOULD ensure that an assertion has not yet
908 been accepted with the same value for "openid.response_nonce"
909 from the same OP Endpoint URL.
910 """
911 nonce = mkNonce()
912 stamp, salt = splitNonce(nonce)
913 self.store.useNonce(self.server_url, stamp, salt)
914 self.response = Message.fromOpenIDArgs(
915 {'response_nonce': nonce,
916 'ns':OPENID2_NS,
917 })
918 self.failUnlessRaises(ProtocolError, self.consumer._idResCheckNonce,
919 self.response, self.endpoint)
920
930
938
945
949
951 """We're not testing nonce-checking, so just return success
952 when it asks."""
953 return True
954
956 consumer_class = CheckAuthDetectingConsumer
957
962
964 message = Message.fromPostArgs({
965 'openid.return_to':self.return_to,
966 'openid.identity':self.server_id,
967 'openid.assoc_handle':'not_found',
968 'openid.sig': GOODSIG,
969 'openid.signed': 'identity,return_to',
970 })
971 self.disableReturnToChecking()
972 try:
973 result = self.consumer._doIdRes(message, self.endpoint, None)
974 except CheckAuthHappened:
975 pass
976 else:
977 self.fail('_checkAuth did not happen. Result was: %r %s' %
978 (result, self.messages))
979
981
982
983 issued = time.time()
984 lifetime = 1000
985 assoc = association.Association(
986 'handle', 'secret', issued, lifetime, 'HMAC-SHA1')
987 self.store.storeAssociation(self.server_url, assoc)
988 self.disableReturnToChecking()
989 message = Message.fromPostArgs({
990 'openid.return_to':self.return_to,
991 'openid.identity':self.server_id,
992 'openid.assoc_handle':'not_found',
993 'openid.sig': GOODSIG,
994 'openid.signed': 'identity,return_to',
995 })
996 try:
997 result = self.consumer._doIdRes(message, self.endpoint, None)
998 except CheckAuthHappened:
999 pass
1000 else:
1001 self.fail('_checkAuth did not happen. Result was: %r' % (result,))
1002
1004
1005
1006 issued = time.time() - 10
1007 lifetime = 0
1008 handle = 'handle'
1009 assoc = association.Association(
1010 handle, 'secret', issued, lifetime, 'HMAC-SHA1')
1011 self.failUnless(assoc.expiresIn <= 0)
1012 self.store.storeAssociation(self.server_url, assoc)
1013
1014 message = Message.fromPostArgs({
1015 'openid.return_to':self.return_to,
1016 'openid.identity':self.server_id,
1017 'openid.assoc_handle':handle,
1018 'openid.sig': GOODSIG,
1019 'openid.signed': 'identity,return_to',
1020 })
1021 self.disableReturnToChecking()
1022 self.failUnlessRaises(ProtocolError, self.consumer._doIdRes,
1023 message, self.endpoint, None)
1024
1026 lifetime = 1000
1027
1028 good_issued = time.time() - 10
1029 good_handle = 'handle'
1030 good_assoc = association.Association(
1031 good_handle, 'secret', good_issued, lifetime, 'HMAC-SHA1')
1032 self.store.storeAssociation(self.server_url, good_assoc)
1033
1034 bad_issued = time.time() - 5
1035 bad_handle = 'handle2'
1036 bad_assoc = association.Association(
1037 bad_handle, 'secret', bad_issued, lifetime, 'HMAC-SHA1')
1038 self.store.storeAssociation(self.server_url, bad_assoc)
1039
1040 query = {
1041 'return_to':self.return_to,
1042 'identity':self.server_id,
1043 'assoc_handle':good_handle,
1044 }
1045
1046 message = Message.fromOpenIDArgs(query)
1047 message = good_assoc.signMessage(message)
1048 self.disableReturnToChecking()
1049 info = self.consumer._doIdRes(message, self.endpoint, None)
1050 self.failUnlessEqual(info.status, SUCCESS, info.message)
1051 self.failUnlessEqual(self.consumer_id, info.identity_url)
1052
1053
1054
1056 """Verifying the Return URL paramaters.
1057 From the specification "Verifying the Return URL"::
1058
1059 To verify that the "openid.return_to" URL matches the URL that is
1060 processing this assertion:
1061
1062 - The URL scheme, authority, and path MUST be the same between the
1063 two URLs.
1064
1065 - Any query parameters that are present in the "openid.return_to"
1066 URL MUST also be present with the same values in the
1067 accepting URL.
1068
1069 XXX: So far we have only tested the second item on the list above.
1070 XXX: _verifyReturnToArgs is not invoked anywhere.
1071 """
1072
1076
1078 query = {
1079 'openid.mode': 'id_res',
1080 'openid.return_to': 'http://example.com/?foo=bar',
1081 'foo': 'bar',
1082 }
1083
1084 self.consumer._verifyReturnToArgs(query)
1085
1087 query = {
1088 'openid.mode': 'id_res',
1089 'openid.return_to': 'http://example.com/',
1090 'foo': 'bar',
1091 }
1092
1093 self.failUnlessRaises(ProtocolError,
1094 self.consumer._verifyReturnToArgs, query)
1095
1097 query = {
1098 'openid.mode': 'id_res',
1099 'openid.return_to': 'http://example.com/?foo=bar',
1100 }
1101
1102 self.failUnlessRaises(ValueError,
1103 self.consumer._verifyReturnToArgs, query)
1104
1105 query['foo'] = 'baz'
1106
1107 self.failUnlessRaises(ValueError,
1108 self.consumer._verifyReturnToArgs, query)
1109
1110
1112 query = {'openid.mode': 'id_res'}
1113 self.failUnlessRaises(ValueError,
1114 self.consumer._verifyReturnToArgs, query)
1115
1117 """Test GenericConsumer.complete()'s handling of bad return_to
1118 values.
1119 """
1120 return_to = "http://some.url/path?foo=bar"
1121
1122
1123
1124
1125 bad_return_tos = [
1126
1127 "https://some.url/path?foo=bar",
1128
1129 "http://some.url.invalid/path?foo=bar",
1130
1131 "http://some.url/path_extra?foo=bar",
1132
1133 "http://some.url/path?foo=bar2",
1134 "http://some.url/path?foo2=bar",
1135 ]
1136
1137 m = Message(OPENID1_NS)
1138 m.setArg(OPENID_NS, 'mode', 'cancel')
1139 m.setArg(BARE_NS, 'foo', 'bar')
1140 endpoint = None
1141
1142 for bad in bad_return_tos:
1143 m.setArg(OPENID_NS, 'return_to', bad)
1144 self.failIf(self.consumer._checkReturnTo(m, return_to))
1145
1147 """Test GenericConsumer.complete()'s handling of good
1148 return_to values.
1149 """
1150 return_to = "http://some.url/path"
1151
1152 good_return_tos = [
1153 (return_to, {}),
1154 (return_to + "?another=arg", {(BARE_NS, 'another'): 'arg'}),
1155 (return_to + "?another=arg#fragment", {(BARE_NS, 'another'): 'arg'}),
1156 ("HTTP"+return_to[4:], {}),
1157 (return_to.replace('url','URL'), {}),
1158 ("http://some.url:80/path", {}),
1159 ("http://some.url/p%61th", {}),
1160 ("http://some.url/./path", {}),
1161 ]
1162
1163 endpoint = None
1164
1165 for good, extra in good_return_tos:
1166 m = Message(OPENID1_NS)
1167 m.setArg(OPENID_NS, 'mode', 'cancel')
1168
1169 for ns, key in extra:
1170 m.setArg(ns, key, extra[(ns, key)])
1171
1172 m.setArg(OPENID_NS, 'return_to', good)
1173 result = self.consumer.complete(m, endpoint, return_to)
1174 self.failUnless(isinstance(result, CancelResponse), \
1175 "Expected CancelResponse, got %r for %s" % (result, good,))
1176
1181
1182 - def fetch(self, url, body=None, headers=None):
1185
1189
1190 - def fetch(self, url, body=None, headers=None):
1192
1194 - def _makeKVPost(self, args, _):
1195 assert args == {
1196 'openid.mode':'check_authentication',
1197 'openid.signed':'foo',
1198 'openid.ns':OPENID1_NS
1199 }, args
1200 return None
1201
1203 consumer_class = GenericConsumer
1204
1214
1218
1228
1236
1237
1239 query = Message.fromOpenIDArgs({
1240 'mode': 'id_res',
1241 'sig': 'rabbits',
1242 'identity': '=example',
1243 'assoc_handle': 'munchkins',
1244 'ns.sreg': 'urn:sreg',
1245 'sreg.email': 'bogus@example.com',
1246 'signed': 'identity,mode,ns.sreg,sreg.email',
1247 'foo': 'bar',
1248 })
1249 args = self.consumer._createCheckAuthRequest(query)
1250 self.failUnless(args.isOpenID1())
1251 for signed_arg in query.getArg(OPENID_NS, 'signed').split(','):
1252 self.failUnless(args.getAliasedArg(signed_arg), signed_arg)
1253
1255 args = {'openid.assoc_handle': 'fa1f5ff0-cde4-11dc-a183-3714bfd55ca8',
1256 'openid.claimed_id': 'http://binkley.lan/user/test01',
1257 'openid.identity': 'http://test01.binkley.lan/',
1258 'openid.mode': 'id_res',
1259 'openid.ns': 'http://specs.openid.net/auth/2.0',
1260 'openid.ns.pape': 'http://specs.openid.net/extensions/pape/1.0',
1261 'openid.op_endpoint': 'http://binkley.lan/server',
1262 'openid.pape.auth_policies': 'none',
1263 'openid.pape.auth_time': '2008-01-28T20:42:36Z',
1264 'openid.pape.nist_auth_level': '0',
1265 'openid.response_nonce': '2008-01-28T21:07:04Z99Q=',
1266 'openid.return_to': 'http://binkley.lan:8001/process?janrain_nonce=2008-01-28T21%3A07%3A02Z0tMIKx',
1267 'openid.sig': 'YJlWH4U6SroB1HoPkmEKx9AyGGg=',
1268 'openid.signed': 'assoc_handle,identity,response_nonce,return_to,claimed_id,op_endpoint,pape.auth_time,ns.pape,pape.nist_auth_level,pape.auth_policies'
1269 }
1270 self.failUnlessEqual(OPENID2_NS, args['openid.ns'])
1271 incoming = Message.fromPostArgs(args)
1272 self.failUnless(incoming.isOpenID2())
1273 car = self.consumer._createCheckAuthRequest(incoming)
1274 expected_args = args.copy()
1275 expected_args['openid.mode'] = 'check_authentication'
1276 expected =Message.fromPostArgs(expected_args)
1277 self.failUnless(expected.isOpenID2())
1278 self.failUnlessEqual(expected, car)
1279 self.failUnlessEqual(expected_args, car.toPostArgs())
1280
1281
1282
1344
1345
1350
1352 resp = mkSuccess(self.endpoint, {
1353 'ns.sreg':'urn:sreg',
1354 'ns.unittest':'urn:unittest',
1355 'unittest.one':'1',
1356 'unittest.two':'2',
1357 'sreg.nickname':'j3h',
1358 'return_to':'return_to',
1359 })
1360 utargs = resp.extensionResponse('urn:unittest', False)
1361 self.failUnlessEqual(utargs, {'one':'1', 'two':'2'})
1362 sregargs = resp.extensionResponse('urn:sreg', False)
1363 self.failUnlessEqual(sregargs, {'nickname':'j3h'})
1364
1366 args = {
1367 'ns.sreg':'urn:sreg',
1368 'ns.unittest':'urn:unittest',
1369 'unittest.one':'1',
1370 'unittest.two':'2',
1371 'sreg.nickname':'j3h',
1372 'sreg.dob':'yesterday',
1373 'return_to':'return_to',
1374 'signed': 'sreg.nickname,unittest.one,sreg.dob',
1375 }
1376
1377 signed_list = ['openid.sreg.nickname',
1378 'openid.unittest.one',
1379 'openid.sreg.dob',]
1380
1381
1382
1383 msg = Message.fromOpenIDArgs(args)
1384 resp = SuccessResponse(self.endpoint, msg, signed_list)
1385
1386
1387 sregargs = resp.extensionResponse('urn:sreg', True)
1388 self.failUnlessEqual(sregargs, {'nickname':'j3h', 'dob': 'yesterday'})
1389
1390
1391
1392 utargs = resp.extensionResponse('urn:unittest', True)
1393 self.failUnlessEqual(utargs, None)
1394
1398
1402
1407
1413
1419
1420 - def begin(self, service):
1421 auth_req = AuthRequest(service, self.assoc)
1422 self.endpoint = service
1423 return auth_req
1424
1425 - def complete(self, message, endpoint, return_to):
1428
1430 """Tests for high-level consumer.Consumer functions.
1431
1432 Its GenericConsumer component is stubbed out with StubConsumer.
1433 """
1444
1454
1456 class DummyDisco(object):
1457 def __init__(self, *ignored):
1458 pass
1459
1460 getNextService = dummy_getNextService
1461
1462 import openid.consumer.consumer
1463 old_discovery = openid.consumer.consumer.Discovery
1464 try:
1465 openid.consumer.consumer.Discovery = DummyDisco
1466 callable()
1467 finally:
1468 openid.consumer.consumer.Discovery = old_discovery
1469
1471 """Make sure that the discovery HTTP failure case behaves properly
1472 """
1473 def getNextService(self, ignored):
1474 raise HTTPFetchingError("Unit test")
1475
1476 def test():
1477 try:
1478 self.consumer.begin('unused in this test')
1479 except DiscoveryFailure, why:
1480 self.failUnless(why[0].startswith('Error fetching'))
1481 self.failIf(why[0].find('Unit test') == -1)
1482 else:
1483 self.fail('Expected DiscoveryFailure')
1484
1485 self.withDummyDiscovery(test, getNextService)
1486
1488 def getNextService(self, ignored):
1489 return None
1490
1491 url = 'http://a.user.url/'
1492 def test():
1493 try:
1494 self.consumer.begin(url)
1495 except DiscoveryFailure, why:
1496 self.failUnless(why[0].startswith('No usable OpenID'))
1497 self.failIf(why[0].find(url) == -1)
1498 else:
1499 self.fail('Expected DiscoveryFailure')
1500
1501 self.withDummyDiscovery(test, getNextService)
1502
1503
1517
1519 text = "failed complete"
1520
1521 def checkEndpoint(message, endpoint, return_to):
1522 self.failUnless(endpoint is None)
1523 return FailureResponse(endpoint, text)
1524
1525 self.consumer.consumer.complete = checkEndpoint
1526
1527 response = self.consumer.complete({}, None)
1528 self.failUnlessEqual(response.status, FAILURE)
1529 self.failUnlessEqual(response.message, text)
1530 self.failUnless(response.identity_url is None)
1531
1532 - def _doResp(self, auth_req, exp_resp):
1554
1556 """Set up a transaction without discovery"""
1557 auth_req = self.consumer.beginWithoutDiscovery(self.endpoint)
1558 resp = self._doResp(auth_req, exp_resp)
1559
1560 self.failIf(self.session)
1561 return resp
1562
1565
1568
1573
1575 setup_url = 'http://setup.url/'
1576 resp = self._doRespNoDisco(
1577 SetupNeededResponse(self.endpoint, setup_url))
1578 self.failUnless(resp.setup_url is setup_url)
1579
1580
1581
1595
1596
1599
1602
1603
1608
1610 setup_url = 'http://setup.url/'
1611 resp = self._doRespDisco(
1612 False,
1613 SetupNeededResponse(self.endpoint, setup_url))
1614 self.failUnless(resp.setup_url is setup_url)
1615
1617 """
1618 Be sure that the session gets cleaned up when the response is
1619 successful and has a different URL than the one in the
1620 request.
1621 """
1622
1623 self.identity_url = 'http://idp.url/'
1624 self.endpoint.claimed_id = self.endpoint.local_id = IDENTIFIER_SELECT
1625
1626
1627
1628 resp_endpoint = OpenIDServiceEndpoint()
1629 resp_endpoint.claimed_id = "http://user.url/"
1630
1631 resp = self._doRespDisco(
1632 True,
1633 mkSuccess(resp_endpoint, {}))
1634 self.failUnless(self.discovery.getManager(force=True) is None)
1635
1644
1645
1646
1648
1654
1655
1659
1660
1662 identifier = '=directed_identifier'
1663 message = Message.fromPostArgs({
1664 'openid.identity': '=directed_identifier',
1665 'openid.return_to': 'x',
1666 'openid.assoc_handle': 'z',
1667 'openid.signed': 'identity,return_to',
1668 'openid.sig': GOODSIG,
1669 })
1670
1671 discovered_endpoint = OpenIDServiceEndpoint()
1672 discovered_endpoint.claimed_id = identifier
1673 discovered_endpoint.server_url = self.endpoint.server_url
1674 discovered_endpoint.local_id = identifier
1675 iverified = []
1676 def verifyDiscoveryResults(identifier, endpoint):
1677 self.failUnless(endpoint is self.endpoint)
1678 iverified.append(discovered_endpoint)
1679 return discovered_endpoint
1680 self.consumer._verifyDiscoveryResults = verifyDiscoveryResults
1681 self.consumer._idResCheckNonce = lambda *args: True
1682 self.consumer._checkReturnTo = lambda unused1, unused2 : True
1683 response = self.consumer._doIdRes(message, self.endpoint, None)
1684
1685 self.failUnlessSuccess(response)
1686 self.failUnlessEqual(response.identity_url, "=directed_identifier")
1687
1688
1689 self.failUnlessEqual(iverified, [discovered_endpoint])
1690
1691
1693
1694 message = Message.fromPostArgs({
1695 'openid.identity': '=directed_identifier',
1696 'openid.return_to': 'x',
1697 'openid.assoc_handle': 'z',
1698 'openid.signed': 'identity,return_to',
1699 'openid.sig': GOODSIG,
1700 })
1701 def verifyDiscoveryResults(identifier, endpoint):
1702 raise DiscoveryFailure("PHREAK!", None)
1703 self.consumer._verifyDiscoveryResults = verifyDiscoveryResults
1704 self.consumer._checkReturnTo = lambda unused1, unused2 : True
1705 self.failUnlessRaises(DiscoveryFailure, self.consumer._doIdRes,
1706 message, self.endpoint, None)
1707
1708
1712
1713
1714
1774
1775
1777 text = "verify failed"
1778
1779 def discoverAndVerify(claimed_id, to_match_endpoints):
1780 self.failUnlessEqual(claimed_id, self.identifier)
1781 for to_match in to_match_endpoints:
1782 self.failUnlessEqual(claimed_id, to_match.claimed_id)
1783 raise ProtocolError(text)
1784
1785 self.consumer._discoverAndVerify = discoverAndVerify
1786
1787
1788 endpoint = OpenIDServiceEndpoint()
1789 endpoint.type_uris = [OPENID_2_0_TYPE]
1790 endpoint.claimed_id = self.identifier
1791 endpoint.server_url = self.server_url
1792 endpoint.local_id = "http://unittest/juan-carlos"
1793
1794 try:
1795 r = self.consumer._verifyDiscoveryResults(self.message, endpoint)
1796 except ProtocolError, e:
1797 self.failUnlessEqual(str(e), text)
1798 else:
1799 self.fail("Exepected ProtocolError, %r returned" % (r,))
1800
1807
1808
1811
1812
1815 class DummyEndpoint(object):
1816 use_compatibility = False
1817
1818 def compatibilityMode(self):
1819 return self.use_compatibility
1820
1821 self.endpoint = DummyEndpoint()
1822 self.consumer = GenericConsumer(store=None)
1823 self.assoc_type = 'HMAC-SHA1'
1824
1826 session_type = 'no-encryption'
1827 session, args = self.consumer._createAssociateRequest(
1828 self.endpoint, self.assoc_type, session_type)
1829
1830 self.failUnless(isinstance(session, PlainTextConsumerSession))
1831 expected = Message.fromOpenIDArgs(
1832 {'ns':OPENID2_NS,
1833 'session_type':session_type,
1834 'mode':'associate',
1835 'assoc_type':self.assoc_type,
1836 })
1837
1838 self.failUnlessEqual(expected, args)
1839
1841 self.endpoint.use_compatibility = True
1842 session_type = 'no-encryption'
1843 session, args = self.consumer._createAssociateRequest(
1844 self.endpoint, self.assoc_type, session_type)
1845
1846 self.failUnless(isinstance(session, PlainTextConsumerSession))
1847 self.failUnlessEqual(Message.fromOpenIDArgs({'mode':'associate',
1848 'assoc_type':self.assoc_type,
1849 }), args)
1850
1852
1853
1854 setConsumerSession(self.consumer)
1855
1856 self.endpoint.use_compatibility = True
1857 session_type = 'DH-SHA1'
1858 session, args = self.consumer._createAssociateRequest(
1859 self.endpoint, self.assoc_type, session_type)
1860
1861 self.failUnless(isinstance(session, DiffieHellmanSHA1ConsumerSession))
1862
1863
1864
1865 self.failUnless(args.getArg(OPENID1_NS, 'dh_consumer_public'))
1866 args.delArg(OPENID1_NS, 'dh_consumer_public')
1867
1868
1869
1870 expected = Message.fromOpenIDArgs({'mode':'associate',
1871 'session_type':'DH-SHA1',
1872 'assoc_type':self.assoc_type,
1873 'dh_modulus': 'BfvStQ==',
1874 'dh_gen': 'Ag==',
1875 })
1876
1877 self.failUnlessEqual(expected, args)
1878
1879
1880
1882 session_cls = None
1883 message_namespace = None
1884
1903
1905 self.msg.setArg(OPENID_NS, 'dh_server_public', self.dh_server_public)
1906 self.msg.setArg(OPENID_NS, 'enc_mac_key', self.enc_mac_key)
1907
1908 extracted = self.consumer_session.extractSecret(self.msg)
1909 self.failUnlessEqual(extracted, self.secret)
1910
1912 self.msg.setArg(OPENID_NS, 'enc_mac_key', self.enc_mac_key)
1913
1914 self.failUnlessRaises(KeyError, self.consumer_session.extractSecret, self.msg)
1915
1917 self.msg.setArg(OPENID_NS, 'dh_server_public', self.dh_server_public)
1918
1919 self.failUnlessRaises(KeyError, self.consumer_session.extractSecret, self.msg)
1920
1922 self.msg.setArg(OPENID_NS, 'dh_server_public', 'n o t b a s e 6 4.')
1923 self.msg.setArg(OPENID_NS, 'enc_mac_key', self.enc_mac_key)
1924
1925 self.failUnlessRaises(ValueError, self.consumer_session.extractSecret, self.msg)
1926
1928 self.msg.setArg(OPENID_NS, 'dh_server_public', self.dh_server_public)
1929 self.msg.setArg(OPENID_NS, 'enc_mac_key', 'n o t base 64')
1930
1931 self.failUnlessRaises(ValueError, self.consumer_session.extractSecret, self.msg)
1932
1933 -class TestOpenID1SHA1(TestDiffieHellmanResponseParameters, unittest.TestCase):
1936
1937 -class TestOpenID2SHA1(TestDiffieHellmanResponseParameters, unittest.TestCase):
1940
1941 if cryptutil.SHA256_AVAILABLE:
1945 else:
1946 warnings.warn("Not running SHA256 association session tests.")
1947
1951
1953 """_getAssociation is never called when the store is None"""
1954 def notCalled(unused):
1955 self.fail('This method was unexpectedly called')
1956
1957 endpoint = OpenIDServiceEndpoint()
1958 endpoint.claimed_id = 'identity_url'
1959
1960 self.consumer._getAssociation = notCalled
1961 auth_request = self.consumer.begin(endpoint)
1962
1963
1964
1965
1966
1968 endpoint = 'unused'
1969
1971 raise ValueError('Should trigger ProtocolError')
1972
1986
1987
1996
1998 self.failUnlessRaises(
1999 DiscoveryFailure,
2000 self.consumer._discoverAndVerify,
2001 'http://claimed-id.com/',
2002 [self.to_match])
2003
2005 """Discovery returning no results results in a
2006 DiscoveryFailure exception"""
2007 self.discovery_result = (None, [])
2008 self.failUnlessDiscoveryFailure()
2009
2011 """If no discovered endpoint matches the values from the
2012 assertion, then we end up raising a ProtocolError
2013 """
2014 self.discovery_result = (None, ['unused'])
2015 def raiseProtocolError(unused1, unused2):
2016 raise ProtocolError('unit test')
2017 self.consumer._verifyDiscoverySingle = raiseProtocolError
2018 self.failUnlessDiscoveryFailure()
2019
2021 """If an endpoint matches, we return it
2022 """
2023
2024 matching_endpoint = 'matching endpoint'
2025 self.discovery_result = (None, [matching_endpoint])
2026
2027
2028 def returnTrue(unused1, unused2):
2029 return True
2030 self.consumer._verifyDiscoverySingle = returnTrue
2031
2032
2033
2034 result = self.consumer._discoverAndVerify(
2035 'http://claimed.id/', [self.to_match])
2036 self.failUnlessEqual(matching_endpoint, result)
2037
2038 from openid.extension import Extension
2045
2054
2055
2056
2057 -class TestKVPost(unittest.TestCase):
2059 self.server_url = 'http://unittest/%s' % (self.id(),)
2060
2061 - def test_200(self):
2062 from openid.fetchers import HTTPResponse
2063 response = HTTPResponse()
2064 response.status = 200
2065 response.body = "foo:bar\nbaz:quux\n"
2066 r = _httpResponseToMessage(response, self.server_url)
2067 expected_msg = Message.fromOpenIDArgs({'foo':'bar','baz':'quux'})
2068 self.failUnlessEqual(expected_msg, r)
2069
2070
2071 - def test_400(self):
2072 response = HTTPResponse()
2073 response.status = 400
2074 response.body = "error:bonk\nerror_code:7\n"
2075 try:
2076 r = _httpResponseToMessage(response, self.server_url)
2077 except ServerError, e:
2078 self.failUnlessEqual(e.error_text, 'bonk')
2079 self.failUnlessEqual(e.error_code, '7')
2080 else:
2081 self.fail("Expected ServerError, got return %r" % (r,))
2082
2083
2084 - def test_500(self):
2085
2086 response = HTTPResponse()
2087 response.status = 500
2088 response.body = "foo:bar\nbaz:quux\n"
2089 self.failUnlessRaises(fetchers.HTTPFetchingError,
2090 _httpResponseToMessage, response,
2091 self.server_url)
2092
2093
2094
2095
2096 if __name__ == '__main__':
2097 unittest.main()
2098