summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-api/opendc/exts.py
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-web/opendc-web-api/opendc/exts.py')
-rw-r--r--opendc-web/opendc-web-api/opendc/exts.py40
1 files changed, 36 insertions, 4 deletions
diff --git a/opendc-web/opendc-web-api/opendc/exts.py b/opendc-web/opendc-web-api/opendc/exts.py
index f088a29c..17dacd5e 100644
--- a/opendc-web/opendc-web-api/opendc/exts.py
+++ b/opendc-web/opendc-web-api/opendc/exts.py
@@ -2,10 +2,11 @@ import os
from functools import wraps
from flask import g, _request_ctx_stack
+from jose import jwt
from werkzeug.local import LocalProxy
from opendc.database import Database
-from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token
+from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token, AuthError
def get_db():
@@ -34,8 +35,7 @@ def get_auth_context():
_auth_context = AuthContext(
alg=AsymmetricJwtAlgorithm(jwks_url=f"https://{os.environ['AUTH0_DOMAIN']}/.well-known/jwks.json"),
issuer=f"https://{os.environ['AUTH0_DOMAIN']}/",
- audience=os.environ['AUTH0_AUDIENCE']
- )
+ audience=os.environ['AUTH0_AUDIENCE'])
g.auth_context = _auth_context
return _auth_context
@@ -46,7 +46,6 @@ auth_context = LocalProxy(get_auth_context)
def requires_auth(f):
"""Decorator to determine if the Access Token is valid.
"""
-
@wraps(f)
def decorated(*args, **kwargs):
token = get_token()
@@ -58,3 +57,36 @@ def requires_auth(f):
current_user = LocalProxy(lambda: getattr(_request_ctx_stack.top, 'current_user', None))
+
+
+def has_scope(required_scope):
+ """Determines if the required scope is present in the Access Token
+ Args:
+ required_scope (str): The scope required to access the resource
+ """
+ token = get_token()
+ unverified_claims = jwt.get_unverified_claims(token)
+ if unverified_claims.get("scope"):
+ token_scopes = unverified_claims["scope"].split()
+ for token_scope in token_scopes:
+ if token_scope == required_scope:
+ return True
+ return False
+
+
+def requires_scope(required_scope):
+ """Determines if the required scope is present in the Access Token
+ Args:
+ required_scope (str): The scope required to access the resource
+ """
+ def decorator(f):
+ @wraps(f)
+ def decorated(*args, **kwargs):
+ if not has_scope(required_scope):
+ raise AuthError({
+ "code": "Unauthorized",
+ "description": "You don't have access to this resource"
+ }, 403)
+ return f(*args, **kwargs)
+ return decorated
+ return decorator