summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-api/opendc/exts.py
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-web/opendc-web-api/opendc/exts.py')
-rw-r--r--opendc-web/opendc-web-api/opendc/exts.py91
1 files changed, 91 insertions, 0 deletions
diff --git a/opendc-web/opendc-web-api/opendc/exts.py b/opendc-web/opendc-web-api/opendc/exts.py
new file mode 100644
index 00000000..3ee8babb
--- /dev/null
+++ b/opendc-web/opendc-web-api/opendc/exts.py
@@ -0,0 +1,91 @@
+import os
+from functools import wraps
+
+from flask import g, _request_ctx_stack
+from jose import jwt
+from werkzeug.local import LocalProxy
+
+from opendc.database import Database
+from opendc.auth import AuthContext, AsymmetricJwtAlgorithm, get_token, AuthError
+
+
+def get_db():
+ """
+ Return the configured database instance for the application.
+ """
+ _db = getattr(g, 'db', None)
+ if _db is None:
+ _db = Database.from_credentials(user=os.environ['OPENDC_DB_USERNAME'],
+ password=os.environ['OPENDC_DB_PASSWORD'],
+ database=os.environ['OPENDC_DB'],
+ host=os.environ.get('OPENDC_DB_HOST', 'localhost'))
+ g.db = _db
+ return _db
+
+
+db = LocalProxy(get_db)
+
+
+def get_auth_context():
+ """
+ Return the configured auth context for the application.
+ """
+ _auth_context = getattr(g, 'auth_context', None)
+ if _auth_context is None:
+ _auth_context = AuthContext(
+ alg=AsymmetricJwtAlgorithm(jwks_url=f"https://{os.environ['AUTH0_DOMAIN']}/.well-known/jwks.json"),
+ issuer=f"https://{os.environ['AUTH0_DOMAIN']}/",
+ audience=os.environ['AUTH0_AUDIENCE'])
+ g.auth_context = _auth_context
+ return _auth_context
+
+
+auth_context = LocalProxy(get_auth_context)
+
+
+def requires_auth(f):
+ """Decorator to determine if the Access Token is valid.
+ """
+ @wraps(f)
+ def decorated(*args, **kwargs):
+ token = get_token()
+ payload = auth_context.validate(token)
+ _request_ctx_stack.top.current_user = payload
+ return f(*args, **kwargs)
+
+ return decorated
+
+
+current_user = LocalProxy(lambda: getattr(_request_ctx_stack.top, 'current_user', None))
+
+
+def has_scope(required_scope):
+ """Determines if the required scope is present in the Access Token
+ Args:
+ required_scope (str): The scope required to access the resource
+ """
+ token = get_token()
+ unverified_claims = jwt.get_unverified_claims(token)
+ if unverified_claims.get("scope"):
+ token_scopes = unverified_claims["scope"].split()
+ for token_scope in token_scopes:
+ if token_scope == required_scope:
+ return True
+ return False
+
+
+def requires_scope(required_scope):
+ """Determines if the required scope is present in the Access Token
+ Args:
+ required_scope (str): The scope required to access the resource
+ """
+ def decorator(f):
+ @wraps(f)
+ def decorated(*args, **kwargs):
+ if not has_scope(required_scope):
+ raise AuthError({"code": "Unauthorized", "description": "You don't have access to this resource"}, 403)
+ return f(*args, **kwargs)
+
+ return decorated
+
+ return decorator