1 """An implementation of the OpenID Provider Authentication Policy
2 Extension 1.0, Draft 5
3
4 @see: http://openid.net/developers/specs/
5
6 @since: 2.1.0
7 """
8
9 __all__ = [
10 'Request',
11 'Response',
12 'ns_uri',
13 'AUTH_PHISHING_RESISTANT',
14 'AUTH_MULTI_FACTOR',
15 'AUTH_MULTI_FACTOR_PHYSICAL',
16 'LEVELS_NIST',
17 'LEVELS_JISA',
18 ]
19
20 from openid.extension import Extension
21 import warnings
22 import re
23
24 ns_uri = "http://specs.openid.net/extensions/pape/1.0"
25
26 AUTH_MULTI_FACTOR_PHYSICAL = \
27 'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical'
28 AUTH_MULTI_FACTOR = \
29 'http://schemas.openid.net/pape/policies/2007/06/multi-factor'
30 AUTH_PHISHING_RESISTANT = \
31 'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant'
32 AUTH_NONE = \
33 'http://schemas.openid.net/pape/policies/2007/06/none'
34
35 TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$')
36
37 LEVELS_NIST = 'http://csrc.nist.gov/publications/nistpubs/800-63/SP800-63V1_0_2.pdf'
38 LEVELS_JISA = 'http://www.jisa.or.jp/spec/auth_level.html'
39
41 _default_auth_level_aliases = {
42 'nist': LEVELS_NIST,
43 'jisa': LEVELS_JISA,
44 }
45
47 self.auth_level_aliases = self._default_auth_level_aliases.copy()
48
50 """Add an auth level URI alias to this request.
51
52 @param auth_level_uri: The auth level URI to send in the
53 request.
54
55 @param alias: The namespace alias to use for this auth level
56 in this message. May be None if the alias is not
57 important.
58 """
59 if alias is None:
60 try:
61 alias = self._getAlias(auth_level_uri)
62 except KeyError:
63 alias = self._generateAlias()
64 else:
65 existing_uri = self.auth_level_aliases.get(alias)
66 if existing_uri is not None and existing_uri != auth_level_uri:
67 raise KeyError('Attempting to redefine alias %r from %r to %r',
68 alias, existing_uri, auth_level_uri)
69
70 self.auth_level_aliases[alias] = auth_level_uri
71
73 """Return an unused auth level alias"""
74 for i in xrange(1000):
75 alias = 'cust%d' % (i,)
76 if alias not in self.auth_level_aliases:
77 return alias
78
79 raise RuntimeError('Could not find an unused alias (tried 1000!)')
80
82 """Return the alias for the specified auth level URI.
83
84 @raises KeyError: if no alias is defined
85 """
86 for (alias, existing_uri) in self.auth_level_aliases.iteritems():
87 if auth_level_uri == existing_uri:
88 return alias
89
90 raise KeyError(auth_level_uri)
91
93 """A Provider Authentication Policy request, sent from a relying
94 party to a provider
95
96 @ivar preferred_auth_policies: The authentication policies that
97 the relying party prefers
98 @type preferred_auth_policies: [str]
99
100 @ivar max_auth_age: The maximum time, in seconds, that the relying
101 party wants to allow to have elapsed before the user must
102 re-authenticate
103 @type max_auth_age: int or NoneType
104
105 @ivar preferred_auth_level_types: Ordered list of authentication
106 level namespace URIs
107
108 @type preferred_auth_level_types: [str]
109 """
110
111 ns_alias = 'pape'
112
113 - def __init__(self, preferred_auth_policies=None, max_auth_age=None,
114 preferred_auth_level_types=None):
115 super(Request, self).__init__()
116 if preferred_auth_policies is None:
117 preferred_auth_policies = []
118
119 self.preferred_auth_policies = preferred_auth_policies
120 self.max_auth_age = max_auth_age
121 self.preferred_auth_level_types = []
122
123 if preferred_auth_level_types is not None:
124 for auth_level in preferred_auth_level_types:
125 self.addAuthLevel(auth_level)
126
128 return bool(self.preferred_auth_policies or
129 self.max_auth_age is not None or
130 self.preferred_auth_level_types)
131
133 """Add an acceptable authentication policy URI to this request
134
135 This method is intended to be used by the relying party to add
136 acceptable authentication types to the request.
137
138 @param policy_uri: The identifier for the preferred type of
139 authentication.
140 @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-05.html#auth_policies
141 """
142 if policy_uri not in self.preferred_auth_policies:
143 self.preferred_auth_policies.append(policy_uri)
144
146 self._addAuthLevelAlias(auth_level_uri, alias)
147 if auth_level_uri not in self.preferred_auth_level_types:
148 self.preferred_auth_level_types.append(auth_level_uri)
149
151 """@see: C{L{Extension.getExtensionArgs}}
152 """
153 ns_args = {
154 'preferred_auth_policies':' '.join(self.preferred_auth_policies),
155 }
156
157 if self.max_auth_age is not None:
158 ns_args['max_auth_age'] = str(self.max_auth_age)
159
160 if self.preferred_auth_level_types:
161 preferred_types = []
162
163 for auth_level_uri in self.preferred_auth_level_types:
164 alias = self._getAlias(auth_level_uri)
165 ns_args['auth_level.ns.%s' % (alias,)] = auth_level_uri
166 preferred_types.append(alias)
167
168 ns_args['preferred_auth_level_types'] = ' '.join(preferred_types)
169
170 return ns_args
171
173 """Instantiate a Request object from the arguments in a
174 C{checkid_*} OpenID message
175 """
176 self = cls()
177 args = request.message.getArgs(self.ns_uri)
178 is_openid1 = request.message.isOpenID1()
179
180 if args == {}:
181 return None
182
183 self.parseExtensionArgs(args, is_openid1)
184 return self
185
186 fromOpenIDRequest = classmethod(fromOpenIDRequest)
187
189 """Set the state of this request to be that expressed in these
190 PAPE arguments
191
192 @param args: The PAPE arguments without a namespace
193
194 @param strict: Whether to raise an exception if the input is
195 out of spec or otherwise malformed. If strict is false,
196 malformed input will be ignored.
197
198 @param is_openid1: Whether the input should be treated as part
199 of an OpenID1 request
200
201 @rtype: None
202
203 @raises ValueError: When the max_auth_age is not parseable as
204 an integer
205 """
206
207
208 self.preferred_auth_policies = []
209
210 policies_str = args.get('preferred_auth_policies')
211 if policies_str:
212 for uri in policies_str.split(' '):
213 if uri not in self.preferred_auth_policies:
214 self.preferred_auth_policies.append(uri)
215
216
217 max_auth_age_str = args.get('max_auth_age')
218 self.max_auth_age = None
219
220 if max_auth_age_str:
221 try:
222 self.max_auth_age = int(max_auth_age_str)
223 except ValueError:
224 if strict:
225 raise
226
227
228 preferred_auth_level_types = args.get('preferred_auth_level_types')
229 if preferred_auth_level_types:
230 aliases = preferred_auth_level_types.strip().split()
231
232 for alias in aliases:
233 key = 'auth_level.ns.%s' % (alias,)
234 try:
235 uri = args[key]
236 except KeyError:
237 if is_openid1:
238 uri = self._default_auth_level_aliases.get(alias)
239 else:
240 uri = None
241
242 if uri is None:
243 if strict:
244 raise ValueError('preferred auth level %r is not '
245 'defined in this message' % (alias,))
246 else:
247 self.addAuthLevel(uri, alias)
248
250 """Given a list of authentication policy URIs that a provider
251 supports, this method returns the subsequence of those types
252 that are preferred by the relying party.
253
254 @param supported_types: A sequence of authentication policy
255 type URIs that are supported by a provider
256
257 @returns: The sub-sequence of the supported types that are
258 preferred by the relying party. This list will be ordered
259 in the order that the types appear in the supported_types
260 sequence, and may be empty if the provider does not prefer
261 any of the supported authentication types.
262
263 @returntype: [str]
264 """
265 return filter(self.preferred_auth_policies.__contains__,
266 supported_types)
267
268 Request.ns_uri = ns_uri
269
270
272 """A Provider Authentication Policy response, sent from a provider
273 to a relying party
274
275 @ivar auth_policies: List of authentication policies conformed to
276 by this OpenID assertion, represented as policy URIs
277 """
278
279 ns_alias = 'pape'
280
281 - def __init__(self, auth_policies=None, auth_time=None,
282 auth_levels=None):
283 super(Response, self).__init__()
284 if auth_policies:
285 self.auth_policies = auth_policies
286 else:
287 self.auth_policies = []
288
289 self.auth_time = auth_time
290 self.auth_levels = {}
291
292 if auth_levels is None:
293 auth_levels = {}
294
295 for uri, level in auth_levels.iteritems():
296 self.setAuthLevel(uri, level)
297
299 """Set the value for the given auth level type.
300
301 @param level: string representation of an authentication level
302 valid for level_uri
303
304 @param alias: An optional namespace alias for the given auth
305 level URI. May be omitted if the alias is not
306 significant. The library will use a reasonable default for
307 widely-used auth level types.
308 """
309 self._addAuthLevelAlias(level_uri, alias)
310 self.auth_levels[level_uri] = level
311
313 """Return the auth level for the specified auth level
314 identifier
315
316 @returns: A string that should map to the auth levels defined
317 for the auth level type
318
319 @raises KeyError: If the auth level type is not present in
320 this message
321 """
322 return self.auth_levels[level_uri]
323
329
330 nist_auth_level = property(
331 _getNISTAuthLevel,
332 doc="Backward-compatibility accessor for the NIST auth level")
333
335 """Add a authentication policy to this response
336
337 This method is intended to be used by the provider to add a
338 policy that the provider conformed to when authenticating the user.
339
340 @param policy_uri: The identifier for the preferred type of
341 authentication.
342 @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies
343 """
344 if policy_uri == AUTH_NONE:
345 raise RuntimeError(
346 'To send no policies, do not set any on the response.')
347
348 if policy_uri not in self.auth_policies:
349 self.auth_policies.append(policy_uri)
350
352 """Create a C{L{Response}} object from a successful OpenID
353 library response
354 (C{L{openid.consumer.consumer.SuccessResponse}}) response
355 message
356
357 @param success_response: A SuccessResponse from consumer.complete()
358 @type success_response: C{L{openid.consumer.consumer.SuccessResponse}}
359
360 @rtype: Response or None
361 @returns: A provider authentication policy response from the
362 data that was supplied with the C{id_res} response or None
363 if the provider sent no signed PAPE response arguments.
364 """
365 self = cls()
366
367
368 args = success_response.getSignedNS(self.ns_uri)
369 is_openid1 = success_response.isOpenID1()
370
371
372
373 if args is not None:
374 self.parseExtensionArgs(args, is_openid1)
375 return self
376 else:
377 return None
378
380 """Parse the provider authentication policy arguments into the
381 internal state of this object
382
383 @param args: unqualified provider authentication policy
384 arguments
385
386 @param strict: Whether to raise an exception when bad data is
387 encountered
388
389 @returns: None. The data is parsed into the internal fields of
390 this object.
391 """
392 policies_str = args.get('auth_policies')
393 if policies_str:
394 auth_policies = policies_str.split(' ')
395 elif strict:
396 raise ValueError('Missing auth_policies')
397 else:
398 auth_policies = []
399
400 if (len(auth_policies) > 1 and strict and AUTH_NONE in auth_policies):
401 raise ValueError('Got some auth policies, as well as the special '
402 '"none" URI: %r' % (auth_policies,))
403
404 if 'none' in auth_policies:
405 msg = '"none" used as a policy URI (see PAPE draft < 5)'
406 if strict:
407 raise ValueError(msg)
408 else:
409 warnings.warn(msg, stacklevel=2)
410
411 auth_policies = [u for u in auth_policies
412 if u not in ['none', AUTH_NONE]]
413
414 self.auth_policies = auth_policies
415
416 for (key, val) in args.iteritems():
417 if key.startswith('auth_level.'):
418 alias = key[11:]
419
420
421 if alias.startswith('ns.'):
422 continue
423
424 try:
425 uri = args['auth_level.ns.%s' % (alias,)]
426 except KeyError:
427 if is_openid1:
428 uri = self._default_auth_level_aliases.get(alias)
429 else:
430 uri = None
431
432 if uri is None:
433 if strict:
434 raise ValueError(
435 'Undefined auth level alias: %r' % (alias,))
436 else:
437 self.setAuthLevel(uri, val, alias)
438
439 auth_time = args.get('auth_time')
440 if auth_time:
441 if TIME_VALIDATOR.match(auth_time):
442 self.auth_time = auth_time
443 elif strict:
444 raise ValueError("auth_time must be in RFC3339 format")
445
446 fromSuccessResponse = classmethod(fromSuccessResponse)
447
449 """@see: C{L{Extension.getExtensionArgs}}
450 """
451 if len(self.auth_policies) == 0:
452 ns_args = {
453 'auth_policies': AUTH_NONE,
454 }
455 else:
456 ns_args = {
457 'auth_policies':' '.join(self.auth_policies),
458 }
459
460 for level_type, level in self.auth_levels.iteritems():
461 alias = self._getAlias(level_type)
462 ns_args['auth_level.ns.%s' % (alias,)] = level_type
463 ns_args['auth_level.%s' % (alias,)] = str(level)
464
465 if self.auth_time is not None:
466 if not TIME_VALIDATOR.match(self.auth_time):
467 raise ValueError('auth_time must be in RFC3339 format')
468
469 ns_args['auth_time'] = self.auth_time
470
471 return ns_args
472
473 Response.ns_uri = ns_uri
474