1  """An implementation of the OpenID Provider Authentication Policy 
  2  Extension 1.0, Draft 5 
  3   
  4  @see: http://openid.net/developers/specs/ 
  5   
  6  @since: 2.1.0 
  7  """ 
  8   
  9  __all__ = [ 
 10      'Request', 
 11      'Response', 
 12      'ns_uri', 
 13      'AUTH_PHISHING_RESISTANT', 
 14      'AUTH_MULTI_FACTOR', 
 15      'AUTH_MULTI_FACTOR_PHYSICAL', 
 16      'LEVELS_NIST', 
 17      'LEVELS_JISA', 
 18      ] 
 19   
 20  from openid.extension import Extension 
 21  import warnings 
 22  import re 
 23   
 24  ns_uri = "http://specs.openid.net/extensions/pape/1.0" 
 25   
 26  AUTH_MULTI_FACTOR_PHYSICAL = \ 
 27      'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical' 
 28  AUTH_MULTI_FACTOR = \ 
 29      'http://schemas.openid.net/pape/policies/2007/06/multi-factor' 
 30  AUTH_PHISHING_RESISTANT = \ 
 31      'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant' 
 32  AUTH_NONE = \ 
 33      'http://schemas.openid.net/pape/policies/2007/06/none' 
 34   
 35  TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$') 
 36   
 37  LEVELS_NIST = 'http://csrc.nist.gov/publications/nistpubs/800-63/SP800-63V1_0_2.pdf' 
 38  LEVELS_JISA = 'http://www.jisa.or.jp/spec/auth_level.html' 
 39   
 41      _default_auth_level_aliases = { 
 42          'nist': LEVELS_NIST, 
 43          'jisa': LEVELS_JISA, 
 44          } 
 45   
 47          self.auth_level_aliases = self._default_auth_level_aliases.copy() 
  48   
 50          """Add an auth level URI alias to this request. 
 51   
 52          @param auth_level_uri: The auth level URI to send in the 
 53              request. 
 54   
 55          @param alias: The namespace alias to use for this auth level 
 56              in this message. May be None if the alias is not 
 57              important. 
 58          """ 
 59          if alias is None: 
 60              try: 
 61                  alias = self._getAlias(auth_level_uri) 
 62              except KeyError: 
 63                  alias = self._generateAlias() 
 64          else: 
 65              existing_uri = self.auth_level_aliases.get(alias) 
 66              if existing_uri is not None and existing_uri != auth_level_uri: 
 67                  raise KeyError('Attempting to redefine alias %r from %r to %r', 
 68                                 alias, existing_uri, auth_level_uri) 
 69   
 70          self.auth_level_aliases[alias] = auth_level_uri 
  71   
 73          """Return an unused auth level alias""" 
 74          for i in xrange(1000): 
 75              alias = 'cust%d' % (i,) 
 76              if alias not in self.auth_level_aliases: 
 77                  return alias 
 78   
 79          raise RuntimeError('Could not find an unused alias (tried 1000!)') 
  80   
 82          """Return the alias for the specified auth level URI. 
 83   
 84          @raises KeyError: if no alias is defined 
 85          """ 
 86          for (alias, existing_uri) in self.auth_level_aliases.iteritems(): 
 87              if auth_level_uri == existing_uri: 
 88                  return alias 
 89   
 90          raise KeyError(auth_level_uri) 
   91   
 93      """A Provider Authentication Policy request, sent from a relying 
 94      party to a provider 
 95   
 96      @ivar preferred_auth_policies: The authentication policies that 
 97          the relying party prefers 
 98      @type preferred_auth_policies: [str] 
 99   
100      @ivar max_auth_age: The maximum time, in seconds, that the relying 
101          party wants to allow to have elapsed before the user must 
102          re-authenticate 
103      @type max_auth_age: int or NoneType 
104   
105      @ivar preferred_auth_level_types: Ordered list of authentication 
106          level namespace URIs 
107   
108      @type preferred_auth_level_types: [str] 
109      """ 
110   
111      ns_alias = 'pape' 
112   
113 -    def __init__(self, preferred_auth_policies=None, max_auth_age=None, 
114                   preferred_auth_level_types=None): 
 115          super(Request, self).__init__() 
116          if preferred_auth_policies is None: 
117              preferred_auth_policies = [] 
118   
119          self.preferred_auth_policies = preferred_auth_policies 
120          self.max_auth_age = max_auth_age 
121          self.preferred_auth_level_types = [] 
122   
123          if preferred_auth_level_types is not None: 
124              for auth_level in preferred_auth_level_types: 
125                  self.addAuthLevel(auth_level) 
 126   
128          return bool(self.preferred_auth_policies or 
129                      self.max_auth_age is not None or 
130                      self.preferred_auth_level_types) 
 131   
133          """Add an acceptable authentication policy URI to this request 
134   
135          This method is intended to be used by the relying party to add 
136          acceptable authentication types to the request. 
137   
138          @param policy_uri: The identifier for the preferred type of 
139              authentication. 
140          @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-05.html#auth_policies 
141          """ 
142          if policy_uri not in self.preferred_auth_policies: 
143              self.preferred_auth_policies.append(policy_uri) 
 144   
146          self._addAuthLevelAlias(auth_level_uri, alias) 
147          if auth_level_uri not in self.preferred_auth_level_types: 
148              self.preferred_auth_level_types.append(auth_level_uri) 
 149   
151          """@see: C{L{Extension.getExtensionArgs}} 
152          """ 
153          ns_args = { 
154              'preferred_auth_policies':' '.join(self.preferred_auth_policies), 
155              } 
156   
157          if self.max_auth_age is not None: 
158              ns_args['max_auth_age'] = str(self.max_auth_age) 
159   
160          if self.preferred_auth_level_types: 
161              preferred_types = [] 
162   
163              for auth_level_uri in self.preferred_auth_level_types: 
164                  alias = self._getAlias(auth_level_uri) 
165                  ns_args['auth_level.ns.%s' % (alias,)] = auth_level_uri 
166                  preferred_types.append(alias) 
167   
168              ns_args['preferred_auth_level_types'] = ' '.join(preferred_types) 
169   
170          return ns_args 
 171   
173          """Instantiate a Request object from the arguments in a 
174          C{checkid_*} OpenID message 
175          """ 
176          self = cls() 
177          args = request.message.getArgs(self.ns_uri) 
178          is_openid1 = request.message.isOpenID1() 
179   
180          if args == {}: 
181              return None 
182   
183          self.parseExtensionArgs(args, is_openid1) 
184          return self 
 185   
186      fromOpenIDRequest = classmethod(fromOpenIDRequest) 
187   
189          """Set the state of this request to be that expressed in these 
190          PAPE arguments 
191   
192          @param args: The PAPE arguments without a namespace 
193   
194          @param strict: Whether to raise an exception if the input is 
195              out of spec or otherwise malformed. If strict is false, 
196              malformed input will be ignored. 
197   
198          @param is_openid1: Whether the input should be treated as part 
199              of an OpenID1 request 
200   
201          @rtype: None 
202   
203          @raises ValueError: When the max_auth_age is not parseable as 
204              an integer 
205          """ 
206   
207           
208          self.preferred_auth_policies = [] 
209   
210          policies_str = args.get('preferred_auth_policies') 
211          if policies_str: 
212              for uri in policies_str.split(' '): 
213                  if uri not in self.preferred_auth_policies: 
214                      self.preferred_auth_policies.append(uri) 
215   
216           
217          max_auth_age_str = args.get('max_auth_age') 
218          self.max_auth_age = None 
219   
220          if max_auth_age_str: 
221              try: 
222                  self.max_auth_age = int(max_auth_age_str) 
223              except ValueError: 
224                  if strict: 
225                      raise 
226   
227           
228          preferred_auth_level_types = args.get('preferred_auth_level_types') 
229          if preferred_auth_level_types: 
230              aliases = preferred_auth_level_types.strip().split() 
231   
232              for alias in aliases: 
233                  key = 'auth_level.ns.%s' % (alias,) 
234                  try: 
235                      uri = args[key] 
236                  except KeyError: 
237                      if is_openid1: 
238                          uri = self._default_auth_level_aliases.get(alias) 
239                      else: 
240                          uri = None 
241   
242                  if uri is None: 
243                      if strict: 
244                          raise ValueError('preferred auth level %r is not ' 
245                                           'defined in this message' % (alias,)) 
246                  else: 
247                      self.addAuthLevel(uri, alias) 
 248   
250          """Given a list of authentication policy URIs that a provider 
251          supports, this method returns the subsequence of those types 
252          that are preferred by the relying party. 
253   
254          @param supported_types: A sequence of authentication policy 
255              type URIs that are supported by a provider 
256   
257          @returns: The sub-sequence of the supported types that are 
258              preferred by the relying party. This list will be ordered 
259              in the order that the types appear in the supported_types 
260              sequence, and may be empty if the provider does not prefer 
261              any of the supported authentication types. 
262   
263          @returntype: [str] 
264          """ 
265          return filter(self.preferred_auth_policies.__contains__, 
266                        supported_types) 
  267   
268  Request.ns_uri = ns_uri 
269   
270   
272      """A Provider Authentication Policy response, sent from a provider 
273      to a relying party 
274   
275      @ivar auth_policies: List of authentication policies conformed to 
276          by this OpenID assertion, represented as policy URIs 
277      """ 
278   
279      ns_alias = 'pape' 
280   
281 -    def __init__(self, auth_policies=None, auth_time=None, 
282                   auth_levels=None): 
 283          super(Response, self).__init__() 
284          if auth_policies: 
285              self.auth_policies = auth_policies 
286          else: 
287              self.auth_policies = [] 
288   
289          self.auth_time = auth_time 
290          self.auth_levels = {} 
291   
292          if auth_levels is None: 
293              auth_levels = {} 
294   
295          for uri, level in auth_levels.iteritems(): 
296              self.setAuthLevel(uri, level) 
 297   
299          """Set the value for the given auth level type. 
300   
301          @param level: string representation of an authentication level 
302              valid for level_uri 
303   
304          @param alias: An optional namespace alias for the given auth 
305              level URI. May be omitted if the alias is not 
306              significant. The library will use a reasonable default for 
307              widely-used auth level types. 
308          """ 
309          self._addAuthLevelAlias(level_uri, alias) 
310          self.auth_levels[level_uri] = level 
 311   
313          """Return the auth level for the specified auth level 
314          identifier 
315   
316          @returns: A string that should map to the auth levels defined 
317              for the auth level type 
318   
319          @raises KeyError: If the auth level type is not present in 
320              this message 
321          """ 
322          return self.auth_levels[level_uri] 
 323   
329   
330      nist_auth_level = property( 
331          _getNISTAuthLevel, 
332          doc="Backward-compatibility accessor for the NIST auth level") 
333   
335          """Add a authentication policy to this response 
336   
337          This method is intended to be used by the provider to add a 
338          policy that the provider conformed to when authenticating the user. 
339   
340          @param policy_uri: The identifier for the preferred type of 
341              authentication. 
342          @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies 
343          """ 
344          if policy_uri == AUTH_NONE: 
345              raise RuntimeError( 
346                  'To send no policies, do not set any on the response.') 
347   
348          if policy_uri not in self.auth_policies: 
349              self.auth_policies.append(policy_uri) 
 350   
352          """Create a C{L{Response}} object from a successful OpenID 
353          library response 
354          (C{L{openid.consumer.consumer.SuccessResponse}}) response 
355          message 
356   
357          @param success_response: A SuccessResponse from consumer.complete() 
358          @type success_response: C{L{openid.consumer.consumer.SuccessResponse}} 
359   
360          @rtype: Response or None 
361          @returns: A provider authentication policy response from the 
362              data that was supplied with the C{id_res} response or None 
363              if the provider sent no signed PAPE response arguments. 
364          """ 
365          self = cls() 
366   
367           
368          args = success_response.getSignedNS(self.ns_uri) 
369          is_openid1 = success_response.isOpenID1() 
370   
371           
372           
373          if args is not None: 
374              self.parseExtensionArgs(args, is_openid1) 
375              return self 
376          else: 
377              return None 
 378   
380          """Parse the provider authentication policy arguments into the 
381          internal state of this object 
382   
383          @param args: unqualified provider authentication policy 
384              arguments 
385   
386          @param strict: Whether to raise an exception when bad data is 
387              encountered 
388   
389          @returns: None. The data is parsed into the internal fields of 
390              this object. 
391          """ 
392          policies_str = args.get('auth_policies') 
393          if policies_str: 
394              auth_policies = policies_str.split(' ') 
395          elif strict: 
396              raise ValueError('Missing auth_policies') 
397          else: 
398              auth_policies = [] 
399   
400          if (len(auth_policies) > 1 and strict and AUTH_NONE in auth_policies): 
401              raise ValueError('Got some auth policies, as well as the special ' 
402                               '"none" URI: %r' % (auth_policies,)) 
403   
404          if 'none' in auth_policies: 
405              msg = '"none" used as a policy URI (see PAPE draft < 5)' 
406              if strict: 
407                  raise ValueError(msg) 
408              else: 
409                  warnings.warn(msg, stacklevel=2) 
410   
411          auth_policies = [u for u in auth_policies 
412                           if u not in ['none', AUTH_NONE]] 
413   
414          self.auth_policies = auth_policies 
415   
416          for (key, val) in args.iteritems(): 
417              if key.startswith('auth_level.'): 
418                  alias = key[11:] 
419   
420                   
421                  if alias.startswith('ns.'): 
422                      continue 
423   
424                  try: 
425                      uri = args['auth_level.ns.%s' % (alias,)] 
426                  except KeyError: 
427                      if is_openid1: 
428                          uri = self._default_auth_level_aliases.get(alias) 
429                      else: 
430                          uri = None 
431   
432                  if uri is None: 
433                      if strict: 
434                          raise ValueError( 
435                              'Undefined auth level alias: %r' % (alias,)) 
436                  else: 
437                      self.setAuthLevel(uri, val, alias) 
438   
439          auth_time = args.get('auth_time') 
440          if auth_time: 
441              if TIME_VALIDATOR.match(auth_time): 
442                  self.auth_time = auth_time 
443              elif strict: 
444                  raise ValueError("auth_time must be in RFC3339 format") 
 445   
446      fromSuccessResponse = classmethod(fromSuccessResponse) 
447   
449          """@see: C{L{Extension.getExtensionArgs}} 
450          """ 
451          if len(self.auth_policies) == 0: 
452              ns_args = { 
453                  'auth_policies': AUTH_NONE, 
454              } 
455          else: 
456              ns_args = { 
457                  'auth_policies':' '.join(self.auth_policies), 
458                  } 
459   
460          for level_type, level in self.auth_levels.iteritems(): 
461              alias = self._getAlias(level_type) 
462              ns_args['auth_level.ns.%s' % (alias,)] = level_type 
463              ns_args['auth_level.%s' % (alias,)] = str(level) 
464   
465          if self.auth_time is not None: 
466              if not TIME_VALIDATOR.match(self.auth_time): 
467                  raise ValueError('auth_time must be in RFC3339 format') 
468   
469              ns_args['auth_time'] = self.auth_time 
470   
471          return ns_args 
  472   
473  Response.ns_uri = ns_uri 
474