# -*- coding: utf-8 -*-
# OpenID relying party library
# Copyright Martin v. Löwis, 2009
# Licensed under the Academic Free License, version 3
# This library implements OpenID Authentication 2.0,
# in the role of a relying party
import urlparse, urllib, httplib, time, cgi, HTMLParser
import cStringIO, base64, hmac, hashlib, datetime, re, random
import itertools, cPickle, sys
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
# Don't use urllib2, since it breaks in 2.5
# for https://login.launchpad.net//+xrds
# Don't use urllib, since it sometimes selects HTTP/1.1 (e.g. in PyPI)
# and then fails to parse chunked responses.
# 3.x portability
if sys.version_info < (3,):
def b(s):
return s
# Convert byte to integer
b2i = ord
def bytes_from_ints(L):
return ''.join([chr(i) for i in L])
else:
def b(s):
return s.encode('latin-1')
def b2i(char):
# 3.x: bytes are already sequences of integers
return char
bytes_from_ints = bytes
if sys.version_info < (2,6):
# Add getcode into 2.5
class _addinfourl(urllib.addinfourl):
def __init__(self, fp, headers, url, code=None):
urllib.addinfourl.__init__(self, fp, headers, url)
self.code = code
def getcode(self):
return self.code
class FancyURLopener(urllib.FancyURLopener):
def http_error_default(self, url, fp, errcode, errmsg, headers):
return _addinfourl(fp, headers, "http:" + url, errcode)
else:
FancyURLopener = urllib.FancyURLopener
class NotAuthenticated(Exception):
CONNECTION_REFUSED = 1
DIRECT_VERIFICATION_FAILED = 2
CANCELLED = 3
UNSUPPORTED_VERSION = 4
UNEXPECTED_MODE = 5
CLAIMED_ID_MISSING = 6
DISCOVERY_FAILED = 7
INCONSISTENT_IDS = 8
REPLAY_ATTACK = 9
MISSING_NONCE = 10
msgs = {
CONNECTION_REFUSED : 'OP refuses connection with status %d',
DIRECT_VERIFICATION_FAILED : 'OP doesn\'t assert that the signature of the verification request is valid',
CANCELLED : 'OP did not authenticate user (cancelled)',
UNSUPPORTED_VERSION : 'Unsupported OpenID version',
UNEXPECTED_MODE : 'Unexpected mode %s',
CLAIMED_ID_MISSING : 'Cannot determine claimed ID',
DISCOVERY_FAILED : 'Claimed ID %s cannot be rediscovered',
INCONSISTENT_IDS : 'Discovered and asserted endpoints differ',
REPLAY_ATTACK : 'Replay attack detected',
MISSING_NONCE : 'Nonce missing in OpenID 2 response',
}
def __init__(self, errno, *args):
msg = self.msgs[errno]
if args:
msg %= args
self.errno = errno
Exception.__init__(self, msg, errno, *args)
def __str__(self):
return self.args[0]
def normalize_uri(uri):
"""Normalize an uri according to OpenID section 7.2. Return a pair
type,value, where type can be either 'xri' or 'uri'."""
# 7.2 Normalization
if uri.startswith('xri://'):
uri = uri[6:]
if uri[0] in ("=", "@", "+", "$", "!", ")"):
return 'xri', uri
if not uri.startswith('http'):
uri = 'http://' + uri
# RFC 3986, section 6
# 6.2.2.1 case normalization
parts = urlparse.urlparse(uri) # already lower-cases scheme
if '@' in parts[1]: #netloc
userinfo,hostname = parts[1].rsplit('@', 1)
else:
userinfo,hostname = None,parts[1]
if ':' in hostname:
host,port = hostname.rsplit(':', 1)
if ']' in port:
# IPv6
host,port = hostname,None
else:
host,port = hostname,None
netloc = hostname = host.lower()
if port:
netloc = hostname = host+':'+port
if userinfo:
netloc = userinfo + '@' + hostname
parts = list(parts)
parts[1] = netloc
uri = urlparse.urlunparse(parts)
# 6.2.2.2. normalize case in % escapes
# XXX should restrict search to parts that can be pct-encoded
for match in re.findall('%[0-9a-fA-F][0-9a-fA-F]', uri):
m2 = match.upper()
if m2 != match:
uri = uri.replace(match, m2)
# 6.2.2.3 remove dot segments
parts = urlparse.urlparse(uri)
path = parts[2] #path
newpath = ''
while path:
if path.startswith('../'):
path = path[3:]
elif path.startswith('./'):
path = path[2:]
elif path.startswith('/./'):
newpath += '/'; path = path[3:]
elif path == '/.':
newpath += '/'; path = ''
elif path.startswith('/../'):
newpath = newpath.rsplit('/', 1)[0]
path = path[3:] # leave /
elif path == '/..':
newpath = newpath.rsplit('/', 1)[0]
path = '/'
elif path == '.' or path=='..':
path = ''
else:
pos = path.find('/', 1)
if pos == -1:
pos = len(path)
newpath += path[:pos]
path = path[pos:]
parts = list(parts)
parts[2] = newpath
uri = urlparse.urlunparse(parts)
# 6.2.3 scheme based normalization
parts = urlparse.urlparse(uri)
netloc = parts[1]
if netloc.endswith(':'):
netloc = netloc[:-1]
elif parts[0] == 'http' and netloc.endswith(':80'):
netloc = netloc[:-3]
elif parts[0] == 'https' and netloc.endswith(':443'):
netloc = netloc[:-4]
# other default ports not considered here
path = parts[2]
if parts[0] in ('http', 'https') and parts[2]=='':
path = '/'
# 6.2.5 protocol-based normalization not done, as it
# is not appropriate to resolve the URL just for normalization
# it seems like a bug in the OpenID spec that it doesn't specify
# which normalizations exactly should be performed
parts = list(parts)
parts[1] = netloc
parts[2] = path
return 'uri', urlparse.urlunparse(parts)
def parse_response(s):
'''Parse a key-value form (OpenID section 4.1.1) into a dictionary'''
res = {}
for line in s.decode('utf-8').splitlines():
k,v = line.split(':', 1)
res[k] = v
return res
class OpenIDParser(HTMLParser.HTMLParser):
def __init__(self):
HTMLParser.HTMLParser.__init__(self)
self.links = {}
self.xrds_location=None
def handle_starttag(self, tag, attrs):
if tag == 'link':
attrs = dict(attrs)
try:
self.links[attrs['rel']] = attrs['href']
except KeyError:
pass
elif tag == 'meta':
attrs = dict(attrs)
# Yadis 6.2.5 option 1: meta tag
if attrs.get('http-equiv','').lower() == 'x-xrds-location':
self.xrds_location = attrs['content']
def _extract_services(doc):
for svc in doc.findall(".//{xri://$xrd*($v*2.0)}Service"):
services = [x.text for x in svc.findall("{xri://$xrd*($v*2.0)}Type")]
if 'http://specs.openid.net/auth/2.0/server' in services:
# 7.3.2.1.1 OP Identifier Element
uri = svc.find("{xri://$xrd*($v*2.0)}URI")
if uri is not None:
op_local = None
op_endpoint = uri.text
break
elif 'http://specs.openid.net/auth/2.0/signon' in services:
# 7.3.2.1.2. Claimed Identifier Element
op_local = svc.find("{xri://$xrd*($v*2.0)}LocalID")
if op_local is not None:
op_local = op_local.text
uri = svc.find("{xri://$xrd*($v*2.0)}URI")
if uri is not None:
op_endpoint = uri.text
break
elif 'http://openid.net/server/1.0' in services or \
'http://openid.net/server/1.1' in services or \
'http://openid.net/signon/1.0' in services or \
'http://openid.net/signon/1.1' in services:
# 14.2.1 says we also need to check for the 1.x types;
# XXX should c