summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-api/opendc/exts.py
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-web/opendc-web-api/opendc/exts.py')
-rw-r--r--opendc-web/opendc-web-api/opendc/exts.py91
1 files changed, 0 insertions, 91 deletions
diff --git a/opendc-web/opendc-web-api/opendc/exts.py b/opendc-web/opendc-web-api/opendc/exts.py
deleted file mode 100644
index 3ee8babb..00000000
--- a/opendc-web/opendc-web-api/opendc/exts.py
+++ /dev/null
@@ -1,91 +0,0 @@
-import os
-from functools import wraps
-
-from flask import g, _request_ctx_stack
-from jose import jwt
-from werkzeug.local import LocalProxy
-
-from opendc.database import Database
-from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token, AuthError
-
-
-def get_db():
- """
- Return the configured database instance for the application.
- """
- _db = getattr(g, 'db', None)
- if _db is None:
- _db = Database.from_credentials(user=os.environ['OPENDC_DB_USERNAME'],
- password=os.environ['OPENDC_DB_PASSWORD'],
- database=os.environ['OPENDC_DB'],
- host=os.environ.get('OPENDC_DB_HOST', 'localhost'))
- g.db = _db
- return _db
-
-
-db = LocalProxy(get_db)
-
-
-def get_auth_context():
- """
- Return the configured auth context for the application.
- """
- _auth_context = getattr(g, 'auth_context', None)
- if _auth_context is None:
- _auth_context = AuthContext(
- alg=AsymmetricJwtAlgorithm(jwks_url=f"https://{os.environ['AUTH0_DOMAIN']}/.well-known/jwks.json"),
- issuer=f"https://{os.environ['AUTH0_DOMAIN']}/",
- audience=os.environ['AUTH0_AUDIENCE'])
- g.auth_context = _auth_context
- return _auth_context
-
-
-auth_context = LocalProxy(get_auth_context)
-
-
-def requires_auth(f):
- """Decorator to determine if the Access Token is valid.
- """
- @wraps(f)
- def decorated(*args, **kwargs):
- token = get_token()
- payload = auth_context.validate(token)
- _request_ctx_stack.top.current_user = payload
- return f(*args, **kwargs)
-
- return decorated
-
-
-current_user = LocalProxy(lambda: getattr(_request_ctx_stack.top, 'current_user', None))
-
-
-def has_scope(required_scope):
- """Determines if the required scope is present in the Access Token
- Args:
- required_scope (str): The scope required to access the resource
- """
- token = get_token()
- unverified_claims = jwt.get_unverified_claims(token)
- if unverified_claims.get("scope"):
- token_scopes = unverified_claims["scope"].split()
- for token_scope in token_scopes:
- if token_scope == required_scope:
- return True
- return False
-
-
-def requires_scope(required_scope):
- """Determines if the required scope is present in the Access Token
- Args:
- required_scope (str): The scope required to access the resource
- """
- def decorator(f):
- @wraps(f)
- def decorated(*args, **kwargs):
- if not has_scope(required_scope):
- raise AuthError({"code": "Unauthorized", "description": "You don't have access to this resource"}, 403)
- return f(*args, **kwargs)
-
- return decorated
-
- return decorator