diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-06-30 10:31:27 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-08-24 19:42:26 +0200 |
| commit | 690818051d0c9768cdaf735acf77ea9e98f00b38 (patch) | |
| tree | 81c43b13309e765c57a0e7d1ff7085310516b053 /web-server | |
| parent | 13d8de8b9e3ecdfcf1f315a095934bd2b0a68729 (diff) | |
Implement authorizations endpoint
Diffstat (limited to 'web-server')
5 files changed, 43 insertions, 205 deletions
diff --git a/web-server/opendc/api/v2/simulations/simulationId/authorizations/endpoint.py b/web-server/opendc/api/v2/simulations/simulationId/authorizations/endpoint.py index df2b5cfd..49d0fc20 100644 --- a/web-server/opendc/api/v2/simulations/simulationId/authorizations/endpoint.py +++ b/web-server/opendc/api/v2/simulations/simulationId/authorizations/endpoint.py @@ -1,37 +1,17 @@ -from opendc.models_old.authorization import Authorization -from opendc.models_old.simulation import Simulation -from opendc.util import exceptions +from opendc.models.simulation import Simulation from opendc.util.rest import Response def GET(request): """Find all authorizations for a Simulation.""" - # Make sure required parameters are there + request.check_required_parameters(path={'simulationId': 'string'}) - try: - request.check_required_parameters(path={'simulationId': 'string'}) + simulation = Simulation.from_id(request.params_path['simulationId']) - except exceptions.ParameterError as e: - return Response(400, str(e)) + simulation.check_exists() + simulation.check_user_access(request.google_id, False) - # Instantiate a Simulation and make sure it exists + authorizations = simulation.get_all_authorizations() - simulation = Simulation.from_primary_key((request.params_path['simulationId'], )) - - if not simulation.exists(): - return Response(404, '{} not found.'.format(simulation)) - - # Make sure this User is allowed to view this Simulation's Authorizations - - if not simulation.google_id_has_at_least(request.google_id, 'VIEW'): - return Response(403, 'Forbidden from retrieving Authorizations for {}.'.format(simulation)) - - # Get the Authorizations - - authorizations = Authorization.query('simulation_id', request.params_path['simulationId']) - - # Return the Authorizations - - return Response(200, 'Successfully retrieved Authorizations for {}.'.format(simulation), - [x.to_JSON() for x in authorizations]) + return Response(200, 'Successfully retrieved simulation authorizations', authorizations) diff --git a/web-server/opendc/api/v2/simulations/simulationId/authorizations/test_endpoint.py b/web-server/opendc/api/v2/simulations/simulationId/authorizations/test_endpoint.py new file mode 100644 index 00000000..30a1c090 --- /dev/null +++ b/web-server/opendc/api/v2/simulations/simulationId/authorizations/test_endpoint.py @@ -0,0 +1,28 @@ +from opendc.util.database import DB + + +def test_get_authorizations_non_existing(client, mocker): + mocker.patch.object(DB, 'fetch_one', return_value=None) + mocker.patch.object(DB, 'fetch_all', return_value=None) + assert '404' in client.get('/api/v2/simulations/1/authorizations').status + + +def test_get_authorizations_not_authorized(client, mocker): + mocker.patch.object(DB, 'fetch_one', return_value={'_id': '1', 'name': 'test trace', 'authorizations': [{ + 'simulationId': '2', + 'authorizationLevel': 'OWN' + }]}) + mocker.patch.object(DB, 'fetch_all', return_value=[]) + res = client.get('/api/v2/simulations/1/authorizations') + assert '403' in res.status + + +def test_get_authorizations(client, mocker): + mocker.patch.object(DB, 'fetch_one', return_value={'_id': '1', 'name': 'test trace', 'authorizations': [{ + 'simulationId': '1', + 'authorizationLevel': 'OWN' + }]}) + mocker.patch.object(DB, 'fetch_all', return_value=[]) + res = client.get('/api/v2/simulations/1/authorizations') + assert len(res.json['content']) == 0 + assert '200' in res.status diff --git a/web-server/opendc/api/v2/simulations/simulationId/authorizations/userId/__init__.py b/web-server/opendc/api/v2/simulations/simulationId/authorizations/userId/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/web-server/opendc/api/v2/simulations/simulationId/authorizations/userId/__init__.py +++ /dev/null diff --git a/web-server/opendc/api/v2/simulations/simulationId/authorizations/userId/endpoint.py b/web-server/opendc/api/v2/simulations/simulationId/authorizations/userId/endpoint.py deleted file mode 100644 index 121530db..00000000 --- a/web-server/opendc/api/v2/simulations/simulationId/authorizations/userId/endpoint.py +++ /dev/null @@ -1,178 +0,0 @@ -from opendc.models_old.authorization import Authorization -from opendc.models_old.simulation import Simulation -from opendc.models_old.user import User -from opendc.util import exceptions -from opendc.util.rest import Response - - -def DELETE(request): - """Delete a user's authorization level over a simulation.""" - - # Make sure required parameters are there - - try: - request.check_required_parameters(path={'simulationId': 'string', 'userId': 'string'}) - - except exceptions.ParameterError as e: - return Response(400, str(e)) - - # Instantiate an Authorization - - authorization = Authorization.from_primary_key((request.params_path['userId'], request.params_path['simulationId'])) - - # Make sure this Authorization exists in the database - - if not authorization.exists(): - return Response(404, '{} not found.'.format(authorization)) - - # Make sure this User is allowed to delete this Authorization - - if not authorization.google_id_has_at_least(request.google_id, 'OWN'): - return Response(403, 'Forbidden from deleting {}.'.format(authorization)) - - # Delete this Authorization - - authorization.delete() - - return Response(200, 'Successfully deleted {}.'.format(authorization), authorization.to_JSON()) - - -def GET(request): - """Get this User's Authorization over this Simulation.""" - - # Make sure required parameters are there - - try: - request.check_required_parameters(path={'simulationId': 'string', 'userId': 'string'}) - - except exceptions.ParameterError as e: - return Response(400, str(e)) - - # Instantiate an Authorization - - authorization = Authorization.from_primary_key((request.params_path['userId'], request.params_path['simulationId'])) - - # Make sure this Authorization exists in the database - - if not authorization.exists(): - return Response(404, '{} not found.'.format(authorization)) - - # Read this Authorization from the database - - authorization.read() - - # Return this Authorization - - return Response(200, 'Successfully retrieved {}'.format(authorization), authorization.to_JSON()) - - -def POST(request): - """Add an authorization for a user's access to a simulation.""" - - # Make sure required parameters are there - - try: - request.check_required_parameters(path={ - 'userId': 'string', - 'simulationId': 'string' - }, - body={'authorization': { - 'authorizationLevel': 'string' - }}) - - except exceptions.ParameterError as e: - return Response(400, str(e)) - - # Instantiate an Authorization - - authorization = Authorization.from_JSON({ - 'userId': - request.params_path['userId'], - 'simulationId': - request.params_path['simulationId'], - 'authorizationLevel': - request.params_body['authorization']['authorizationLevel'] - }) - - # Make sure the Simulation and User exist - - user = User.from_primary_key((authorization.user_id, )) - if not user.exists(): - return Response(404, '{} not found.'.format(user)) - - simulation = Simulation.from_primary_key((authorization.simulation_id, )) - if not simulation.exists(): - return Response(404, '{} not found.'.format(simulation)) - - # Make sure this User is allowed to add this Authorization - - if not simulation.google_id_has_at_least(request.google_id, 'OWN'): - return Response(403, 'Forbidden from creating {}.'.format(authorization)) - - # Make sure this Authorization does not already exist - - if authorization.exists(): - return Response(409, '{} already exists.'.format(authorization)) - - # Try to insert this Authorization into the database - - try: - authorization.insert() - - except exceptions.ForeignKeyError: - return Response(400, 'Invalid authorizationLevel') - - # Return this Authorization - - return Response(200, 'Successfully added {}'.format(authorization), authorization.to_JSON()) - - -def PUT(request): - """Change a user's authorization level over a simulation.""" - - # Make sure required parameters are there - - try: - request.check_required_parameters(path={ - 'simulationId': 'string', - 'userId': 'string' - }, - body={'authorization': { - 'authorizationLevel': 'string' - }}) - - except exceptions.ParameterError as e: - return Response(400, str(e)) - - # Instantiate and Authorization - - authorization = Authorization.from_JSON({ - 'userId': - request.params_path['userId'], - 'simulationId': - request.params_path['simulationId'], - 'authorizationLevel': - request.params_body['authorization']['authorizationLevel'] - }) - - # Make sure this Authorization exists - - if not authorization.exists(): - return Response(404, '{} not found.'.format(authorization)) - - # Make sure this User is allowed to edit this Authorization - - if not authorization.google_id_has_at_least(request.google_id, 'OWN'): - return Response(403, 'Forbidden from updating {}.'.format(authorization)) - - # Try to update this Authorization - - try: - authorization.update() - - except exceptions.ForeignKeyError as e: - return Response(400, 'Invalid authorization level.') - - # Return this Authorization - - return Response(200, 'Successfully updated {}.'.format(authorization), authorization.to_JSON()) diff --git a/web-server/opendc/models/simulation.py b/web-server/opendc/models/simulation.py index 5cd3d49e..a77697ab 100644 --- a/web-server/opendc/models/simulation.py +++ b/web-server/opendc/models/simulation.py @@ -1,5 +1,6 @@ from opendc.models.model import Model from opendc.models.user import User +from opendc.util.database import DB from opendc.util.exceptions import ClientError from opendc.util.rest import Response @@ -13,3 +14,10 @@ class Simulation(Model): filter(lambda x: str(x['simulationId']) == str(self.obj['_id']), user.obj['authorizations'])) if len(authorizations) == 0 or (edit_access and authorizations[0]['authorizationLevel'] == 'VIEW'): raise ClientError(Response(403, "Forbidden from retrieving simulation.")) + + def get_all_authorizations(self): + return [ + user['_id'] for user in DB.fetch_all({'authorizations': { + 'simulationId': self.obj['_id'] + }}, User.collection_name) + ] |
