Package openid :: Package extensions :: Package draft :: Module pape5
[frames] | no frames]

Source Code for Module openid.extensions.draft.pape5

  1  """An implementation of the OpenID Provider Authentication Policy 
  2  Extension 1.0, Draft 5 
  3   
  4  @see: http://openid.net/developers/specs/ 
  5   
  6  @since: 2.1.0 
  7  """ 
  8   
  9  __all__ = [ 
 10      'Request', 
 11      'Response', 
 12      'ns_uri', 
 13      'AUTH_PHISHING_RESISTANT', 
 14      'AUTH_MULTI_FACTOR', 
 15      'AUTH_MULTI_FACTOR_PHYSICAL', 
 16      'LEVELS_NIST', 
 17      'LEVELS_JISA', 
 18      ] 
 19   
 20  from openid.extension import Extension 
 21  import warnings 
 22  import re 
 23   
 24  ns_uri = "http://specs.openid.net/extensions/pape/1.0" 
 25   
 26  AUTH_MULTI_FACTOR_PHYSICAL = \ 
 27      'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical' 
 28  AUTH_MULTI_FACTOR = \ 
 29      'http://schemas.openid.net/pape/policies/2007/06/multi-factor' 
 30  AUTH_PHISHING_RESISTANT = \ 
 31      'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant' 
 32  AUTH_NONE = \ 
 33      'http://schemas.openid.net/pape/policies/2007/06/none' 
 34   
 35  TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$') 
 36   
 37  LEVELS_NIST = 'http://csrc.nist.gov/publications/nistpubs/800-63/SP800-63V1_0_2.pdf' 
 38  LEVELS_JISA = 'http://www.jisa.or.jp/spec/auth_level.html' 
 39   
40 -class PAPEExtension(Extension):
41 _default_auth_level_aliases = { 42 'nist': LEVELS_NIST, 43 'jisa': LEVELS_JISA, 44 } 45
46 - def __init__(self):
47 self.auth_level_aliases = self._default_auth_level_aliases.copy()
48
49 - def _addAuthLevelAlias(self, auth_level_uri, alias=None):
50 """Add an auth level URI alias to this request. 51 52 @param auth_level_uri: The auth level URI to send in the 53 request. 54 55 @param alias: The namespace alias to use for this auth level 56 in this message. May be None if the alias is not 57 important. 58 """ 59 if alias is None: 60 try: 61 alias = self._getAlias(auth_level_uri) 62 except KeyError: 63 alias = self._generateAlias() 64 else: 65 existing_uri = self.auth_level_aliases.get(alias) 66 if existing_uri is not None and existing_uri != auth_level_uri: 67 raise KeyError('Attempting to redefine alias %r from %r to %r', 68 alias, existing_uri, auth_level_uri) 69 70 self.auth_level_aliases[alias] = auth_level_uri
71
72 - def _generateAlias(self):
73 """Return an unused auth level alias""" 74 for i in xrange(1000): 75 alias = 'cust%d' % (i,) 76 if alias not in self.auth_level_aliases: 77 return alias 78 79 raise RuntimeError('Could not find an unused alias (tried 1000!)')
80
81 - def _getAlias(self, auth_level_uri):
82 """Return the alias for the specified auth level URI. 83 84 @raises KeyError: if no alias is defined 85 """ 86 for (alias, existing_uri) in self.auth_level_aliases.iteritems(): 87 if auth_level_uri == existing_uri: 88 return alias 89 90 raise KeyError(auth_level_uri)
91
92 -class Request(PAPEExtension):
93 """A Provider Authentication Policy request, sent from a relying 94 party to a provider 95 96 @ivar preferred_auth_policies: The authentication policies that 97 the relying party prefers 98 @type preferred_auth_policies: [str] 99 100 @ivar max_auth_age: The maximum time, in seconds, that the relying 101 party wants to allow to have elapsed before the user must 102 re-authenticate 103 @type max_auth_age: int or NoneType 104 105 @ivar preferred_auth_level_types: Ordered list of authentication 106 level namespace URIs 107 108 @type preferred_auth_level_types: [str] 109 """ 110 111 ns_alias = 'pape' 112
113 - def __init__(self, preferred_auth_policies=None, max_auth_age=None, 114 preferred_auth_level_types=None):
115 super(Request, self).__init__() 116 if preferred_auth_policies is None: 117 preferred_auth_policies = [] 118 119 self.preferred_auth_policies = preferred_auth_policies 120 self.max_auth_age = max_auth_age 121 self.preferred_auth_level_types = [] 122 123 if preferred_auth_level_types is not None: 124 for auth_level in preferred_auth_level_types: 125 self.addAuthLevel(auth_level)
126
127 - def __nonzero__(self):
128 return bool(self.preferred_auth_policies or 129 self.max_auth_age is not None or 130 self.preferred_auth_level_types)
131
132 - def addPolicyURI(self, policy_uri):
133 """Add an acceptable authentication policy URI to this request 134 135 This method is intended to be used by the relying party to add 136 acceptable authentication types to the request. 137 138 @param policy_uri: The identifier for the preferred type of 139 authentication. 140 @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-05.html#auth_policies 141 """ 142 if policy_uri not in self.preferred_auth_policies: 143 self.preferred_auth_policies.append(policy_uri)
144
145 - def addAuthLevel(self, auth_level_uri, alias=None):
146 self._addAuthLevelAlias(auth_level_uri, alias) 147 if auth_level_uri not in self.preferred_auth_level_types: 148 self.preferred_auth_level_types.append(auth_level_uri)
149
150 - def getExtensionArgs(self):
151 """@see: C{L{Extension.getExtensionArgs}} 152 """ 153 ns_args = { 154 'preferred_auth_policies':' '.join(self.preferred_auth_policies), 155 } 156 157 if self.max_auth_age is not None: 158 ns_args['max_auth_age'] = str(self.max_auth_age) 159 160 if self.preferred_auth_level_types: 161 preferred_types = [] 162 163 for auth_level_uri in self.preferred_auth_level_types: 164 alias = self._getAlias(auth_level_uri) 165 ns_args['auth_level.ns.%s' % (alias,)] = auth_level_uri 166 preferred_types.append(alias) 167 168 ns_args['preferred_auth_level_types'] = ' '.join(preferred_types) 169 170 return ns_args
171
172 - def fromOpenIDRequest(cls, request):
173 """Instantiate a Request object from the arguments in a 174 C{checkid_*} OpenID message 175 """ 176 self = cls() 177 args = request.message.getArgs(self.ns_uri) 178 is_openid1 = request.message.isOpenID1() 179 180 if args == {}: 181 return None 182 183 self.parseExtensionArgs(args, is_openid1) 184 return self
185 186 fromOpenIDRequest = classmethod(fromOpenIDRequest) 187
188 - def parseExtensionArgs(self, args, is_openid1, strict=False):
189 """Set the state of this request to be that expressed in these 190 PAPE arguments 191 192 @param args: The PAPE arguments without a namespace 193 194 @param strict: Whether to raise an exception if the input is 195 out of spec or otherwise malformed. If strict is false, 196 malformed input will be ignored. 197 198 @param is_openid1: Whether the input should be treated as part 199 of an OpenID1 request 200 201 @rtype: None 202 203 @raises ValueError: When the max_auth_age is not parseable as 204 an integer 205 """ 206 207 # preferred_auth_policies is a space-separated list of policy URIs 208 self.preferred_auth_policies = [] 209 210 policies_str = args.get('preferred_auth_policies') 211 if policies_str: 212 for uri in policies_str.split(' '): 213 if uri not in self.preferred_auth_policies: 214 self.preferred_auth_policies.append(uri) 215 216 # max_auth_age is base-10 integer number of seconds 217 max_auth_age_str = args.get('max_auth_age') 218 self.max_auth_age = None 219 220 if max_auth_age_str: 221 try: 222 self.max_auth_age = int(max_auth_age_str) 223 except ValueError: 224 if strict: 225 raise 226 227 # Parse auth level information 228 preferred_auth_level_types = args.get('preferred_auth_level_types') 229 if preferred_auth_level_types: 230 aliases = preferred_auth_level_types.strip().split() 231 232 for alias in aliases: 233 key = 'auth_level.ns.%s' % (alias,) 234 try: 235 uri = args[key] 236 except KeyError: 237 if is_openid1: 238 uri = self._default_auth_level_aliases.get(alias) 239 else: 240 uri = None 241 242 if uri is None: 243 if strict: 244 raise ValueError('preferred auth level %r is not ' 245 'defined in this message' % (alias,)) 246 else: 247 self.addAuthLevel(uri, alias)
248
249 - def preferredTypes(self, supported_types):
250 """Given a list of authentication policy URIs that a provider 251 supports, this method returns the subsequence of those types 252 that are preferred by the relying party. 253 254 @param supported_types: A sequence of authentication policy 255 type URIs that are supported by a provider 256 257 @returns: The sub-sequence of the supported types that are 258 preferred by the relying party. This list will be ordered 259 in the order that the types appear in the supported_types 260 sequence, and may be empty if the provider does not prefer 261 any of the supported authentication types. 262 263 @returntype: [str] 264 """ 265 return filter(self.preferred_auth_policies.__contains__, 266 supported_types)
267 268 Request.ns_uri = ns_uri 269 270
271 -class Response(PAPEExtension):
272 """A Provider Authentication Policy response, sent from a provider 273 to a relying party 274 275 @ivar auth_policies: List of authentication policies conformed to 276 by this OpenID assertion, represented as policy URIs 277 """ 278 279 ns_alias = 'pape' 280
281 - def __init__(self, auth_policies=None, auth_time=None, 282 auth_levels=None):
283 super(Response, self).__init__() 284 if auth_policies: 285 self.auth_policies = auth_policies 286 else: 287 self.auth_policies = [] 288 289 self.auth_time = auth_time 290 self.auth_levels = {} 291 292 if auth_levels is None: 293 auth_levels = {} 294 295 for uri, level in auth_levels.iteritems(): 296 self.setAuthLevel(uri, level)
297
298 - def setAuthLevel(self, level_uri, level, alias=None):
299 """Set the value for the given auth level type. 300 301 @param level: string representation of an authentication level 302 valid for level_uri 303 304 @param alias: An optional namespace alias for the given auth 305 level URI. May be omitted if the alias is not 306 significant. The library will use a reasonable default for 307 widely-used auth level types. 308 """ 309 self._addAuthLevelAlias(level_uri, alias) 310 self.auth_levels[level_uri] = level
311
312 - def getAuthLevel(self, level_uri):
313 """Return the auth level for the specified auth level 314 identifier 315 316 @returns: A string that should map to the auth levels defined 317 for the auth level type 318 319 @raises KeyError: If the auth level type is not present in 320 this message 321 """ 322 return self.auth_levels[level_uri]
323
324 - def _getNISTAuthLevel(self):
325 try: 326 return int(self.getAuthLevel(LEVELS_NIST)) 327 except KeyError: 328 return None
329 330 nist_auth_level = property( 331 _getNISTAuthLevel, 332 doc="Backward-compatibility accessor for the NIST auth level") 333
334 - def addPolicyURI(self, policy_uri):
335 """Add a authentication policy to this response 336 337 This method is intended to be used by the provider to add a 338 policy that the provider conformed to when authenticating the user. 339 340 @param policy_uri: The identifier for the preferred type of 341 authentication. 342 @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies 343 """ 344 if policy_uri == AUTH_NONE: 345 raise RuntimeError( 346 'To send no policies, do not set any on the response.') 347 348 if policy_uri not in self.auth_policies: 349 self.auth_policies.append(policy_uri)
350
351 - def fromSuccessResponse(cls, success_response):
352 """Create a C{L{Response}} object from a successful OpenID 353 library response 354 (C{L{openid.consumer.consumer.SuccessResponse}}) response 355 message 356 357 @param success_response: A SuccessResponse from consumer.complete() 358 @type success_response: C{L{openid.consumer.consumer.SuccessResponse}} 359 360 @rtype: Response or None 361 @returns: A provider authentication policy response from the 362 data that was supplied with the C{id_res} response or None 363 if the provider sent no signed PAPE response arguments. 364 """ 365 self = cls() 366 367 # PAPE requires that the args be signed. 368 args = success_response.getSignedNS(self.ns_uri) 369 is_openid1 = success_response.isOpenID1() 370 371 # Only try to construct a PAPE response if the arguments were 372 # signed in the OpenID response. If not, return None. 373 if args is not None: 374 self.parseExtensionArgs(args, is_openid1) 375 return self 376 else: 377 return None
378
379 - def parseExtensionArgs(self, args, is_openid1, strict=False):
380 """Parse the provider authentication policy arguments into the 381 internal state of this object 382 383 @param args: unqualified provider authentication policy 384 arguments 385 386 @param strict: Whether to raise an exception when bad data is 387 encountered 388 389 @returns: None. The data is parsed into the internal fields of 390 this object. 391 """ 392 policies_str = args.get('auth_policies') 393 if policies_str: 394 auth_policies = policies_str.split(' ') 395 elif strict: 396 raise ValueError('Missing auth_policies') 397 else: 398 auth_policies = [] 399 400 if (len(auth_policies) > 1 and strict and AUTH_NONE in auth_policies): 401 raise ValueError('Got some auth policies, as well as the special ' 402 '"none" URI: %r' % (auth_policies,)) 403 404 if 'none' in auth_policies: 405 msg = '"none" used as a policy URI (see PAPE draft < 5)' 406 if strict: 407 raise ValueError(msg) 408 else: 409 warnings.warn(msg, stacklevel=2) 410 411 auth_policies = [u for u in auth_policies 412 if u not in ['none', AUTH_NONE]] 413 414 self.auth_policies = auth_policies 415 416 for (key, val) in args.iteritems(): 417 if key.startswith('auth_level.'): 418 alias = key[11:] 419 420 # skip the already-processed namespace declarations 421 if alias.startswith('ns.'): 422 continue 423 424 try: 425 uri = args['auth_level.ns.%s' % (alias,)] 426 except KeyError: 427 if is_openid1: 428 uri = self._default_auth_level_aliases.get(alias) 429 else: 430 uri = None 431 432 if uri is None: 433 if strict: 434 raise ValueError( 435 'Undefined auth level alias: %r' % (alias,)) 436 else: 437 self.setAuthLevel(uri, val, alias) 438 439 auth_time = args.get('auth_time') 440 if auth_time: 441 if TIME_VALIDATOR.match(auth_time): 442 self.auth_time = auth_time 443 elif strict: 444 raise ValueError("auth_time must be in RFC3339 format")
445 446 fromSuccessResponse = classmethod(fromSuccessResponse) 447
448 - def getExtensionArgs(self):
449 """@see: C{L{Extension.getExtensionArgs}} 450 """ 451 if len(self.auth_policies) == 0: 452 ns_args = { 453 'auth_policies': AUTH_NONE, 454 } 455 else: 456 ns_args = { 457 'auth_policies':' '.join(self.auth_policies), 458 } 459 460 for level_type, level in self.auth_levels.iteritems(): 461 alias = self._getAlias(level_type) 462 ns_args['auth_level.ns.%s' % (alias,)] = level_type 463 ns_args['auth_level.%s' % (alias,)] = str(level) 464 465 if self.auth_time is not None: 466 if not TIME_VALIDATOR.match(self.auth_time): 467 raise ValueError('auth_time must be in RFC3339 format') 468 469 ns_args['auth_time'] = self.auth_time 470 471 return ns_args
472 473 Response.ns_uri = ns_uri 474