diff options
Diffstat (limited to 'opendc-web/opendc-web-api/opendc/exts.py')
| -rw-r--r-- | opendc-web/opendc-web-api/opendc/exts.py | 91 |
1 files changed, 0 insertions, 91 deletions
diff --git a/opendc-web/opendc-web-api/opendc/exts.py b/opendc-web/opendc-web-api/opendc/exts.py deleted file mode 100644 index 3ee8babb..00000000 --- a/opendc-web/opendc-web-api/opendc/exts.py +++ /dev/null @@ -1,91 +0,0 @@ -import os -from functools import wraps - -from flask import g, _request_ctx_stack -from jose import jwt -from werkzeug.local import LocalProxy - -from opendc.database import Database -from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token, AuthError - - -def get_db(): - """ - Return the configured database instance for the application. - """ - _db = getattr(g, 'db', None) - if _db is None: - _db = Database.from_credentials(user=os.environ['OPENDC_DB_USERNAME'], - password=os.environ['OPENDC_DB_PASSWORD'], - database=os.environ['OPENDC_DB'], - host=os.environ.get('OPENDC_DB_HOST', 'localhost')) - g.db = _db - return _db - - -db = LocalProxy(get_db) - - -def get_auth_context(): - """ - Return the configured auth context for the application. - """ - _auth_context = getattr(g, 'auth_context', None) - if _auth_context is None: - _auth_context = AuthContext( - alg=AsymmetricJwtAlgorithm(jwks_url=f"https://{os.environ['AUTH0_DOMAIN']}/.well-known/jwks.json"), - issuer=f"https://{os.environ['AUTH0_DOMAIN']}/", - audience=os.environ['AUTH0_AUDIENCE']) - g.auth_context = _auth_context - return _auth_context - - -auth_context = LocalProxy(get_auth_context) - - -def requires_auth(f): - """Decorator to determine if the Access Token is valid. - """ - @wraps(f) - def decorated(*args, **kwargs): - token = get_token() - payload = auth_context.validate(token) - _request_ctx_stack.top.current_user = payload - return f(*args, **kwargs) - - return decorated - - -current_user = LocalProxy(lambda: getattr(_request_ctx_stack.top, 'current_user', None)) - - -def has_scope(required_scope): - """Determines if the required scope is present in the Access Token - Args: - required_scope (str): The scope required to access the resource - """ - token = get_token() - unverified_claims = jwt.get_unverified_claims(token) - if unverified_claims.get("scope"): - token_scopes = unverified_claims["scope"].split() - for token_scope in token_scopes: - if token_scope == required_scope: - return True - return False - - -def requires_scope(required_scope): - """Determines if the required scope is present in the Access Token - Args: - required_scope (str): The scope required to access the resource - """ - def decorator(f): - @wraps(f) - def decorated(*args, **kwargs): - if not has_scope(required_scope): - raise AuthError({"code": "Unauthorized", "description": "You don't have access to this resource"}, 403) - return f(*args, **kwargs) - - return decorated - - return decorator |
