diff options
Diffstat (limited to 'opendc-web/opendc-web-api/opendc/exts.py')
| -rw-r--r-- | opendc-web/opendc-web-api/opendc/exts.py | 40 |
1 files changed, 36 insertions, 4 deletions
diff --git a/opendc-web/opendc-web-api/opendc/exts.py b/opendc-web/opendc-web-api/opendc/exts.py index f088a29c..17dacd5e 100644 --- a/opendc-web/opendc-web-api/opendc/exts.py +++ b/opendc-web/opendc-web-api/opendc/exts.py @@ -2,10 +2,11 @@ import os from functools import wraps from flask import g, _request_ctx_stack +from jose import jwt from werkzeug.local import LocalProxy from opendc.database import Database -from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token +from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token, AuthError def get_db(): @@ -34,8 +35,7 @@ def get_auth_context(): _auth_context = AuthContext( alg=AsymmetricJwtAlgorithm(jwks_url=f"https://{os.environ['AUTH0_DOMAIN']}/.well-known/jwks.json"), issuer=f"https://{os.environ['AUTH0_DOMAIN']}/", - audience=os.environ['AUTH0_AUDIENCE'] - ) + audience=os.environ['AUTH0_AUDIENCE']) g.auth_context = _auth_context return _auth_context @@ -46,7 +46,6 @@ auth_context = LocalProxy(get_auth_context) def requires_auth(f): """Decorator to determine if the Access Token is valid. """ - @wraps(f) def decorated(*args, **kwargs): token = get_token() @@ -58,3 +57,36 @@ def requires_auth(f): current_user = LocalProxy(lambda: getattr(_request_ctx_stack.top, 'current_user', None)) + + +def has_scope(required_scope): + """Determines if the required scope is present in the Access Token + Args: + required_scope (str): The scope required to access the resource + """ + token = get_token() + unverified_claims = jwt.get_unverified_claims(token) + if unverified_claims.get("scope"): + token_scopes = unverified_claims["scope"].split() + for token_scope in token_scopes: + if token_scope == required_scope: + return True + return False + + +def requires_scope(required_scope): + """Determines if the required scope is present in the Access Token + Args: + required_scope (str): The scope required to access the resource + """ + def decorator(f): + @wraps(f) + def decorated(*args, **kwargs): + if not has_scope(required_scope): + raise AuthError({ + "code": "Unauthorized", + "description": "You don't have access to this resource" + }, 403) + return f(*args, **kwargs) + return decorated + return decorator |
