Package openid :: Package test :: Module test_negotiation
[frames] | no frames]

Source Code for Module openid.test.test_negotiation

  1   
  2  import unittest 
  3  from support import CatchLogs 
  4   
  5  from openid.message import Message, OPENID2_NS, OPENID1_NS, OPENID_NS 
  6  from openid import association 
  7  from openid.consumer.consumer import GenericConsumer, ServerError 
  8  from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_2_0_TYPE 
  9   
10 -class ErrorRaisingConsumer(GenericConsumer):
11 """ 12 A consumer whose _requestAssocation will return predefined results 13 instead of trying to actually perform association requests. 14 """ 15 16 # The list of objects to be returned by successive calls to 17 # _requestAssocation. Each call will pop the first element from 18 # this list and return it to _negotiateAssociation. If the 19 # element is a Message object, it will be wrapped in a ServerError 20 # exception. Otherwise it will be returned as-is. 21 return_messages = [] 22
23 - def _requestAssociation(self, endpoint, assoc_type, session_type):
24 m = self.return_messages.pop(0) 25 if isinstance(m, Message): 26 raise ServerError.fromMessage(m) 27 else: 28 return m
29
30 -class TestOpenID2SessionNegotiation(unittest.TestCase, CatchLogs):
31 """ 32 Test the session type negotiation behavior of an OpenID 2 33 consumer. 34 """
35 - def setUp(self):
42
43 - def testBadResponse(self):
44 """ 45 Test the case where the response to an associate request is a 46 server error or is otherwise undecipherable. 47 """ 48 self.consumer.return_messages = [Message(self.endpoint.preferredNamespace())] 49 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 50 self.failUnlessLogMatches('Server error when requesting an association')
51
52 - def testEmptyAssocType(self):
53 """ 54 Test the case where the association type (assoc_type) returned 55 in an unsupported-type response is absent. 56 """ 57 msg = Message(self.endpoint.preferredNamespace()) 58 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 59 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 60 # not set: msg.delArg(OPENID_NS, 'assoc_type') 61 msg.setArg(OPENID_NS, 'session_type', 'new-session-type') 62 63 self.consumer.return_messages = [msg] 64 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 65 66 self.failUnlessLogMatches('Unsupported association type', 67 'Server responded with unsupported association ' + 68 'session but did not supply a fallback.')
69
70 - def testEmptySessionType(self):
71 """ 72 Test the case where the session type (session_type) returned 73 in an unsupported-type response is absent. 74 """ 75 msg = Message(self.endpoint.preferredNamespace()) 76 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 77 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 78 msg.setArg(OPENID_NS, 'assoc_type', 'new-assoc-type') 79 # not set: msg.setArg(OPENID_NS, 'session_type', None) 80 81 self.consumer.return_messages = [msg] 82 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 83 84 self.failUnlessLogMatches('Unsupported association type', 85 'Server responded with unsupported association ' + 86 'session but did not supply a fallback.')
87
88 - def testNotAllowed(self):
89 """ 90 Test the case where an unsupported-type response specifies a 91 preferred (assoc_type, session_type) combination that is not 92 allowed by the consumer's SessionNegotiator. 93 """ 94 allowed_types = [] 95 96 negotiator = association.SessionNegotiator(allowed_types) 97 self.consumer.negotiator = negotiator 98 99 msg = Message(self.endpoint.preferredNamespace()) 100 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 101 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 102 msg.setArg(OPENID_NS, 'assoc_type', 'not-allowed') 103 msg.setArg(OPENID_NS, 'session_type', 'not-allowed') 104 105 self.consumer.return_messages = [msg] 106 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 107 108 self.failUnlessLogMatches('Unsupported association type', 109 'Server sent unsupported session/association type:')
110
111 - def testUnsupportedWithRetry(self):
112 """ 113 Test the case where an unsupported-type response triggers a 114 retry to get an association with the new preferred type. 115 """ 116 msg = Message(self.endpoint.preferredNamespace()) 117 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 118 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 119 msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1') 120 msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1') 121 122 assoc = association.Association( 123 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1') 124 125 self.consumer.return_messages = [msg, assoc] 126 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc) 127 128 self.failUnlessLogMatches('Unsupported association type')
129
131 """ 132 Test the case where an unsupported-typ response triggers a 133 retry, but the retry fails and None is returned instead. 134 """ 135 msg = Message(self.endpoint.preferredNamespace()) 136 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 137 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 138 msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1') 139 msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1') 140 141 self.consumer.return_messages = [msg, 142 Message(self.endpoint.preferredNamespace())] 143 144 self.failUnlessEqual(self.consumer._negotiateAssociation(self.endpoint), None) 145 146 self.failUnlessLogMatches('Unsupported association type', 147 'Server %s refused' % (self.endpoint.server_url))
148
149 - def testValid(self):
150 """ 151 Test the valid case, wherein an association is returned on the 152 first attempt to get one. 153 """ 154 assoc = association.Association( 155 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1') 156 157 self.consumer.return_messages = [assoc] 158 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc) 159 self.failUnlessLogEmpty()
160
161 -class TestOpenID1SessionNegotiation(unittest.TestCase, CatchLogs):
162 """ 163 Tests for the OpenID 1 consumer association session behavior. See 164 the docs for TestOpenID2SessionNegotiation. Notice that this 165 class is not a subclass of the OpenID 2 tests. Instead, it uses 166 many of the same inputs but inspects the log messages logged with 167 oidutil.log. See the calls to self.failUnlessLogMatches. Some of 168 these tests pass openid2-style messages to the openid 1 169 association processing logic to be sure it ignores the extra data. 170 """
171 - def setUp(self):
172 CatchLogs.setUp(self) 173 self.consumer = ErrorRaisingConsumer(store=None) 174 175 self.endpoint = OpenIDServiceEndpoint() 176 self.endpoint.type_uris = [OPENID1_NS] 177 self.endpoint.server_url = 'bogus'
178
179 - def testBadResponse(self):
180 self.consumer.return_messages = [Message(self.endpoint.preferredNamespace())] 181 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 182 self.failUnlessLogMatches('Server error when requesting an association')
183
184 - def testEmptyAssocType(self):
185 msg = Message(self.endpoint.preferredNamespace()) 186 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 187 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 188 # not set: msg.setArg(OPENID_NS, 'assoc_type', None) 189 msg.setArg(OPENID_NS, 'session_type', 'new-session-type') 190 191 self.consumer.return_messages = [msg] 192 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 193 194 self.failUnlessLogMatches('Server error when requesting an association')
195
196 - def testEmptySessionType(self):
197 msg = Message(self.endpoint.preferredNamespace()) 198 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 199 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 200 msg.setArg(OPENID_NS, 'assoc_type', 'new-assoc-type') 201 # not set: msg.setArg(OPENID_NS, 'session_type', None) 202 203 self.consumer.return_messages = [msg] 204 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 205 206 self.failUnlessLogMatches('Server error when requesting an association')
207
208 - def testNotAllowed(self):
209 allowed_types = [] 210 211 negotiator = association.SessionNegotiator(allowed_types) 212 self.consumer.negotiator = negotiator 213 214 msg = Message(self.endpoint.preferredNamespace()) 215 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 216 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 217 msg.setArg(OPENID_NS, 'assoc_type', 'not-allowed') 218 msg.setArg(OPENID_NS, 'session_type', 'not-allowed') 219 220 self.consumer.return_messages = [msg] 221 self.assertEqual(self.consumer._negotiateAssociation(self.endpoint), None) 222 223 self.failUnlessLogMatches('Server error when requesting an association')
224
225 - def testUnsupportedWithRetry(self):
226 msg = Message(self.endpoint.preferredNamespace()) 227 msg.setArg(OPENID_NS, 'error', 'Unsupported type') 228 msg.setArg(OPENID_NS, 'error_code', 'unsupported-type') 229 msg.setArg(OPENID_NS, 'assoc_type', 'HMAC-SHA1') 230 msg.setArg(OPENID_NS, 'session_type', 'DH-SHA1') 231 232 assoc = association.Association( 233 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1') 234 235 self.consumer.return_messages = [msg, assoc] 236 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is None) 237 238 self.failUnlessLogMatches('Server error when requesting an association')
239
240 - def testValid(self):
241 assoc = association.Association( 242 'handle', 'secret', 'issued', 10000, 'HMAC-SHA1') 243 244 self.consumer.return_messages = [assoc] 245 self.failUnless(self.consumer._negotiateAssociation(self.endpoint) is assoc) 246 self.failUnlessLogEmpty()
247
248 -class TestNegotiatorBehaviors(unittest.TestCase, CatchLogs):
249 - def setUp(self):
250 self.allowed_types = [ 251 ('HMAC-SHA1', 'no-encryption'), 252 ('HMAC-SHA256', 'no-encryption'), 253 ] 254 255 self.n = association.SessionNegotiator(self.allowed_types)
256
258 self.assertRaises(ValueError, self.n.addAllowedType, 'invalid')
259
261 self.assertRaises(ValueError, self.n.addAllowedType, 'assoc1', 'invalid')
262
264 assoc_type = 'HMAC-SHA1' 265 self.failUnless(self.n.addAllowedType(assoc_type) is None) 266 267 for typ in association.getSessionTypes(assoc_type): 268 self.failUnless((assoc_type, typ) in self.n.allowed_types)
269 270 if __name__ == '__main__': 271 unittest.main() 272