1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
import os
from functools import wraps
from flask import g, _request_ctx_stack
from jose import jwt
from werkzeug.local import LocalProxy
from opendc.database import Database
from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token, AuthError
def get_db():
"""
Return the configured database instance for the application.
"""
_db = getattr(g, 'db', None)
if _db is None:
_db = Database.from_credentials(user=os.environ['OPENDC_DB_USERNAME'],
password=os.environ['OPENDC_DB_PASSWORD'],
database=os.environ['OPENDC_DB'],
host=os.environ.get('OPENDC_DB_HOST', 'localhost'))
g.db = _db
return _db
db = LocalProxy(get_db)
def get_auth_context():
"""
Return the configured auth context for the application.
"""
_auth_context = getattr(g, 'auth_context', None)
if _auth_context is None:
_auth_context = AuthContext(
alg=AsymmetricJwtAlgorithm(jwks_url=f"https://{os.environ['AUTH0_DOMAIN']}/.well-known/jwks.json"),
issuer=f"https://{os.environ['AUTH0_DOMAIN']}/",
audience=os.environ['AUTH0_AUDIENCE'])
g.auth_context = _auth_context
return _auth_context
auth_context = LocalProxy(get_auth_context)
def requires_auth(f):
"""Decorator to determine if the Access Token is valid.
"""
@wraps(f)
def decorated(*args, **kwargs):
token = get_token()
payload = auth_context.validate(token)
_request_ctx_stack.top.current_user = payload
return f(*args, **kwargs)
return decorated
current_user = LocalProxy(lambda: getattr(_request_ctx_stack.top, 'current_user', None))
def has_scope(required_scope):
"""Determines if the required scope is present in the Access Token
Args:
required_scope (str): The scope required to access the resource
"""
token = get_token()
unverified_claims = jwt.get_unverified_claims(token)
if unverified_claims.get("scope"):
token_scopes = unverified_claims["scope"].split()
for token_scope in token_scopes:
if token_scope == required_scope:
return True
return False
def requires_scope(required_scope):
"""Determines if the required scope is present in the Access Token
Args:
required_scope (str): The scope required to access the resource
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
if not has_scope(required_scope):
raise AuthError({
"code": "Unauthorized",
"description": "You don't have access to this resource"
}, 403)
return f(*args, **kwargs)
return decorated
return decorator
|