1 import sys
2 import unittest
3 import datadriven
4 import os.path
5 from openid import fetchers
6 from openid.fetchers import HTTPResponse
7 from openid.yadis.discover import DiscoveryFailure
8 from openid.consumer import discover
9 from openid.yadis import xrires
10 from openid.yadis.xri import XRI
11 from urlparse import urlsplit
12 from openid import message
13
14
15
18 self.responses = list(responses)
19
20 - def fetch(self, url, body=None, headers=None):
25
27 cases = [
28 [HTTPResponse('http://network.error/', None)],
29 [HTTPResponse('http://not.found/', 404)],
30 [HTTPResponse('http://bad.request/', 400)],
31 [HTTPResponse('http://server.error/', 500)],
32 [HTTPResponse('http://header.found/', 200,
33 headers={'x-xrds-location':'http://xrds.missing/'}),
34 HTTPResponse('http://xrds.missing/', 404)],
35 ]
36
41
45
48
50 expected_status = self.responses[-1].status
51 try:
52 discover.discover(self.url)
53 except DiscoveryFailure, why:
54 self.failUnlessEqual(why.http_response.status, expected_status)
55 else:
56 self.fail('Did not raise DiscoveryFailure')
57
58
59
60
61
62
63
64
65
66 import warnings
67 warnings.filterwarnings('ignore', 'raising a string.*', DeprecationWarning,
68 r'^openid\.test\.test_discover$', 77)
69
71 """Just raise an exception when fetch is called"""
72
74 self.thing_to_raise = thing_to_raise
75
76 - def fetch(self, url, body=None, headers=None):
77 raise self.thing_to_raise
78
80 """Custom exception just to make sure it's not handled differently"""
81
83 """Make sure exceptions get passed through discover function from
84 fetcher."""
85
86 cases = [
87 Exception(),
88 DidFetch(),
89 ValueError(),
90 RuntimeError(),
91 ]
92
93
94 if sys.version_info[:2] < (2, 6):
95 cases.append('oi!')
96
100
104
107
109 try:
110 discover.discover('http://doesnt.matter/')
111 except:
112 exc = sys.exc_info()[1]
113 if exc is None:
114
115 self.failUnless(self.exc is sys.exc_info()[0])
116 else:
117 self.failUnless(self.exc is exc, exc)
118 else:
119 self.fail('Expected %r', self.exc)
120
121
122
123
137
138
140 redirect = None
141
145
146 - def fetch(self, url, body=None, headers=None):
163
164
165
167 id_url = "http://someuser.unittest/"
168
169 documents = {}
170 fetcherClass = DiscoveryMockFetcher
171
172 - def _checkService(self, s,
173 server_url,
174 claimed_id=None,
175 local_id=None,
176 canonical_id=None,
177 types=None,
178 used_yadis=False,
179 display_identifier=None
180 ):
181 self.failUnlessEqual(server_url, s.server_url)
182 if types == ['2.0 OP']:
183 self.failIf(claimed_id)
184 self.failIf(local_id)
185 self.failIf(s.claimed_id)
186 self.failIf(s.local_id)
187 self.failIf(s.getLocalID())
188 self.failIf(s.compatibilityMode())
189 self.failUnless(s.isOPIdentifier())
190 self.failUnlessEqual(s.preferredNamespace(),
191 discover.OPENID_2_0_MESSAGE_NS)
192 else:
193 self.failUnlessEqual(claimed_id, s.claimed_id)
194 self.failUnlessEqual(local_id, s.getLocalID())
195
196 if used_yadis:
197 self.failUnless(s.used_yadis, "Expected to use Yadis")
198 else:
199 self.failIf(s.used_yadis,
200 "Expected to use old-style discovery")
201
202 openid_types = {
203 '1.1': discover.OPENID_1_1_TYPE,
204 '1.0': discover.OPENID_1_0_TYPE,
205 '2.0': discover.OPENID_2_0_TYPE,
206 '2.0 OP': discover.OPENID_IDP_2_0_TYPE,
207 }
208
209 type_uris = [openid_types[t] for t in types]
210 self.failUnlessEqual(type_uris, s.type_uris)
211 self.failUnlessEqual(canonical_id, s.canonicalID)
212
213 if s.canonicalID:
214 self.failUnless(s.getDisplayIdentifier() != claimed_id)
215 self.failUnless(s.getDisplayIdentifier() is not None)
216 self.failUnlessEqual(display_identifier, s.getDisplayIdentifier())
217 self.failUnlessEqual(s.claimed_id, s.canonicalID)
218
219 self.failUnlessEqual(s.display_identifier or s.claimed_id, s.getDisplayIdentifier())
220
225
228
230 module_directory = os.path.dirname(os.path.abspath(__file__))
231 filename = os.path.join(
232 module_directory, 'data', 'test_discover', filename)
233 return file(filename).read()
234
236 - def _discover(self, content_type, data,
237 expected_services, expected_id=None):
246
250
252 services = self._discover(content_type='text/plain',
253 data="junk",
254 expected_services=0)
255
256 services = self._discover(
257 content_type='text/html',
258 data=readDataFile('openid_no_delegate.html'),
259 expected_services=1,
260 )
261
262 self._checkService(
263 services[0],
264 used_yadis=False,
265 types=['1.1'],
266 server_url="http://www.myopenid.com/server",
267 claimed_id=self.id_url,
268 local_id=self.id_url,
269 )
270
272 services = self._discover(
273 content_type='text/html',
274 data=readDataFile('openid.html'),
275 expected_services=1)
276
277
278 self._checkService(
279 services[0],
280 used_yadis=False,
281 types=['1.1'],
282 server_url="http://www.myopenid.com/server",
283 claimed_id=self.id_url,
284 local_id='http://smoker.myopenid.com/',
285 display_identifier=self.id_url,
286 )
287
289 """Ensure that the Claimed Identifier does not have a fragment
290 if one is supplied in the User Input."""
291 content_type = 'text/html'
292 data = readDataFile('openid.html')
293 expected_services = 1
294
295 self.documents[self.id_url] = (content_type, data)
296 expected_id = self.id_url
297 self.id_url = self.id_url + '#fragment'
298 id_url, services = discover.discover(self.id_url)
299 self.failUnlessEqual(expected_services, len(services))
300 self.failUnlessEqual(expected_id, id_url)
301
302 self._checkService(
303 services[0],
304 used_yadis=False,
305 types=['1.1'],
306 server_url="http://www.myopenid.com/server",
307 claimed_id=expected_id,
308 local_id='http://smoker.myopenid.com/',
309 display_identifier=expected_id,
310 )
311
313 services = self._discover(
314 content_type='text/html',
315 data=readDataFile('openid2.html'),
316 expected_services=1,
317 )
318
319 self._checkService(
320 services[0],
321 used_yadis=False,
322 types=['2.0'],
323 server_url="http://www.myopenid.com/server",
324 claimed_id=self.id_url,
325 local_id='http://smoker.myopenid.com/',
326 display_identifier=self.id_url,
327 )
328
330 services = self._discover(
331 content_type='text/html',
332 data=readDataFile('openid_1_and_2.html'),
333 expected_services=2,
334 )
335
336 for t, s in zip(['2.0', '1.1'], services):
337 self._checkService(
338 s,
339 used_yadis=False,
340 types=[t],
341 server_url="http://www.myopenid.com/server",
342 claimed_id=self.id_url,
343 local_id='http://smoker.myopenid.com/',
344 display_identifier=self.id_url,
345 )
346
351
353 """HTML document has discovery information, but points to an
354 empty Yadis document."""
355
356 self.documents[self.id_url + 'xrds'] = (
357 'application/xrds+xml', readDataFile('yadis_0entries.xml'))
358
359 services = self._discover(content_type='text/html',
360 data=readDataFile('openid_and_yadis.html'),
361 expected_services=1)
362
363 self._checkService(
364 services[0],
365 used_yadis=False,
366 types=['1.1'],
367 server_url="http://www.myopenid.com/server",
368 claimed_id=self.id_url,
369 local_id='http://smoker.myopenid.com/',
370 display_identifier=self.id_url,
371 )
372
387
389 services = self._discover(
390 content_type='application/xrds+xml',
391 data=readDataFile('openid2_xrds_no_local_id.xml'),
392 expected_services=1,
393 )
394
395 self._checkService(
396 services[0],
397 used_yadis=True,
398 types=['2.0'],
399 server_url="http://www.myopenid.com/server",
400 claimed_id=self.id_url,
401 local_id=self.id_url,
402 display_identifier=self.id_url,
403 )
404
406 services = self._discover(
407 content_type='application/xrds+xml',
408 data=readDataFile('openid2_xrds.xml'),
409 expected_services=1,
410 )
411
412 self._checkService(
413 services[0],
414 used_yadis=True,
415 types=['2.0'],
416 server_url="http://www.myopenid.com/server",
417 claimed_id=self.id_url,
418 local_id='http://smoker.myopenid.com/',
419 display_identifier=self.id_url,
420 )
421
423 services = self._discover(
424 content_type='application/xrds+xml',
425 data=readDataFile('yadis_idp.xml'),
426 expected_services=1,
427 )
428
429 self._checkService(
430 services[0],
431 used_yadis=True,
432 types=['2.0 OP'],
433 server_url="http://www.myopenid.com/server",
434 display_identifier=self.id_url,
435 )
436
438 """The delegate tag isn't meaningful for OP entries."""
439 services = self._discover(
440 content_type='application/xrds+xml',
441 data=readDataFile('yadis_idp_delegate.xml'),
442 expected_services=1,
443 )
444
445 self._checkService(
446 services[0],
447 used_yadis=True,
448 types=['2.0 OP'],
449 server_url="http://www.myopenid.com/server",
450 display_identifier=self.id_url,
451 )
452
459
461 services = self._discover(
462 content_type='application/xrds+xml',
463 data=readDataFile('openid_1_and_2_xrds.xml'),
464 expected_services=1,
465 )
466
467 self._checkService(
468 services[0],
469 used_yadis=True,
470 types=['2.0', '1.1'],
471 server_url="http://www.myopenid.com/server",
472 claimed_id=self.id_url,
473 local_id='http://smoker.myopenid.com/',
474 display_identifier=self.id_url,
475 )
476
483
485
490
491
492 - def fetch(self, url, body=None, headers=None):
493 self.fetchlog.append((url, body, headers))
494
495 u = urlsplit(url)
496 proxy_host = u[1]
497 xri = u[2]
498 query = u[3]
499
500 if not headers and not query:
501 raise ValueError("No headers or query; you probably didn't "
502 "mean to do that.")
503
504 if xri.startswith('/'):
505 xri = xri[1:]
506
507 try:
508 ctype, body = self.documents[xri]
509 except KeyError:
510 status = 404
511 ctype = 'text/plain'
512 body = ''
513 else:
514 status = 200
515
516 return HTTPResponse(url, status, {'content-type': ctype}, body)
517
518
520 fetcherClass = MockFetcherForXRIProxy
521
522 documents = {'=smoker': ('application/xrds+xml',
523 readDataFile('yadis_2entries_delegate.xml')),
524 '=smoker*bad': ('application/xrds+xml',
525 readDataFile('yadis_another_delegate.xml')) }
526
528 user_xri, services = discover.discoverXRI('=smoker')
529
530 self._checkService(
531 services[0],
532 used_yadis=True,
533 types=['1.0'],
534 server_url="http://www.myopenid.com/server",
535 claimed_id=XRI("=!1000"),
536 canonical_id=XRI("=!1000"),
537 local_id='http://smoker.myopenid.com/',
538 display_identifier='=smoker'
539 )
540
541 self._checkService(
542 services[1],
543 used_yadis=True,
544 types=['1.0'],
545 server_url="http://www.livejournal.com/openid/server.bml",
546 claimed_id=XRI("=!1000"),
547 canonical_id=XRI("=!1000"),
548 local_id='http://frank.livejournal.com/',
549 display_identifier='=smoker'
550 )
551
553 user_xri, services = discover.discoverXRI('xri://=smoker')
554
555 self._checkService(
556 services[0],
557 used_yadis=True,
558 types=['1.0'],
559 server_url="http://www.myopenid.com/server",
560 claimed_id=XRI("=!1000"),
561 canonical_id=XRI("=!1000"),
562 local_id='http://smoker.myopenid.com/',
563 display_identifier='=smoker'
564 )
565
566 self._checkService(
567 services[1],
568 used_yadis=True,
569 types=['1.0'],
570 server_url="http://www.livejournal.com/openid/server.bml",
571 claimed_id=XRI("=!1000"),
572 canonical_id=XRI("=!1000"),
573 local_id='http://frank.livejournal.com/',
574 display_identifier='=smoker'
575 )
576
580
588
589
601
602
604 - def __init__(self, expected_ns, type_uris):
609
615
616 cases = [
617 (message.OPENID1_NS, []),
618 (message.OPENID1_NS, ['http://jyte.com/']),
619 (message.OPENID1_NS, [discover.OPENID_1_0_TYPE]),
620 (message.OPENID1_NS, [discover.OPENID_1_1_TYPE]),
621 (message.OPENID2_NS, [discover.OPENID_2_0_TYPE]),
622 (message.OPENID2_NS, [discover.OPENID_IDP_2_0_TYPE]),
623 (message.OPENID2_NS, [discover.OPENID_2_0_TYPE,
624 discover.OPENID_1_0_TYPE]),
625 (message.OPENID2_NS, [discover.OPENID_1_0_TYPE,
626 discover.OPENID_2_0_TYPE]),
627 ]
628
632
635
639
643
647
651
656
662
668
671
675
678
680 self.failUnlessEqual(self.endpoint.canonicalID, None)
681
684
692
696
699
702
705
708
711
714
718
733
736
740
745
749
753
759
768
769
775
776
779
780 if __name__ == '__main__':
781 suite = pyUnitTests()
782 runner = unittest.TextTestRunner()
783 runner.run(suite)
784