diff options
Diffstat (limited to 'opendc-web/opendc-web-api')
| -rwxr-xr-x | opendc-web/opendc-web-api/app.py | 3 | ||||
| -rw-r--r-- | opendc-web/opendc-web-api/opendc/api/jobs.py | 105 | ||||
| -rw-r--r-- | opendc-web/opendc-web-api/opendc/database.py | 8 | ||||
| -rw-r--r-- | opendc-web/opendc-web-api/opendc/models/scenario.py | 52 | ||||
| -rw-r--r-- | opendc-web/opendc-web-api/static/schema.yml | 151 | ||||
| -rw-r--r-- | opendc-web/opendc-web-api/tests/api/test_jobs.py | 139 |
6 files changed, 419 insertions, 39 deletions
diff --git a/opendc-web/opendc-web-api/app.py b/opendc-web/opendc-web-api/app.py index 96a1ca7a..36c80b7a 100755 --- a/opendc-web/opendc-web-api/app.py +++ b/opendc-web/opendc-web-api/app.py @@ -10,6 +10,7 @@ from flask_restful import Api from flask_swagger_ui import get_swaggerui_blueprint from marshmallow import ValidationError +from opendc.api.jobs import JobList, Job from opendc.api.portfolios import Portfolio, PortfolioScenarios from opendc.api.prefabs import Prefab, PrefabList from opendc.api.projects import ProjectList, Project, ProjectTopologies, ProjectPortfolios @@ -60,6 +61,8 @@ def setup_api(app): api.add_resource(TraceList, '/traces/') api.add_resource(Trace, '/traces/<string:trace_id>') api.add_resource(SchedulerList, '/schedulers/') + api.add_resource(JobList, '/jobs/') + api.add_resource(Job, '/jobs/<string:job_id>') @app.errorhandler(AuthError) def handle_auth_error(ex): diff --git a/opendc-web/opendc-web-api/opendc/api/jobs.py b/opendc-web/opendc-web-api/opendc/api/jobs.py new file mode 100644 index 00000000..5feaea16 --- /dev/null +++ b/opendc-web/opendc-web-api/opendc/api/jobs.py @@ -0,0 +1,105 @@ +# Copyright (c) 2021 AtLarge Research +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +from flask import request +from flask_restful import Resource +from marshmallow import fields, Schema, validate +from werkzeug.exceptions import BadRequest, Conflict + +from opendc.exts import requires_auth +from opendc.models.scenario import Scenario + + +def convert_to_job(scenario): + """Convert a scenario to a job. + """ + return JobSchema().dump({ + '_id': scenario['_id'], + 'scenarioId': scenario['_id'], + 'state': scenario['simulation']['state'], + 'heartbeat': scenario['simulation'].get('heartbeat', None), + 'results': scenario.get('results', {}) + }) + + +class JobSchema(Schema): + """ + Schema representing a simulation job. + """ + _id = fields.String(dump_only=True) + scenarioId = fields.String(dump_only=True) + state = fields.String(required=True, + validate=validate.OneOf(["QUEUED", "CLAIMED", "RUNNING", "FINISHED", "FAILED"])) + heartbeat = fields.DateTime() + results = fields.Dict() + + +class JobList(Resource): + """ + Resource representing the list of available jobs. + """ + method_decorators = [requires_auth] + + def get(self): + """Get all available jobs.""" + jobs = Scenario.get_jobs() + data = list(map(convert_to_job, jobs.obj)) + return {'data': data} + + +class Job(Resource): + """ + Resource representing a single job. + """ + method_decorators = [requires_auth] + + def get(self, job_id): + """Get the details of a single job.""" + job = Scenario.from_id(job_id) + job.check_exists() + data = convert_to_job(job.obj) + return {'data': data} + + def post(self, job_id): + """Update the details of a single job.""" + action = JobSchema(only=('state', 'results')).load(request.json) + + job = Scenario.from_id(job_id) + job.check_exists() + + old_state = job.obj['simulation']['state'] + new_state = action['state'] + + if old_state == new_state: + data = job.update_state(new_state) + elif (old_state, new_state) == ('QUEUED', 'CLAIMED'): + data = job.update_state('CLAIMED') + elif (old_state, new_state) == ('CLAIMED', 'RUNNING'): + data = job.update_state('RUNNING') + elif (old_state, new_state) == ('RUNNING', 'FINISHED'): + data = job.update_state('FINISHED', results=action.get('results', None)) + elif old_state in ('CLAIMED', 'RUNNING') and new_state == 'FAILED': + data = job.update_state('FAILED') + else: + raise BadRequest('Invalid state transition') + + if not data: + raise Conflict('State conflict') + + return {'data': convert_to_job(data)} diff --git a/opendc-web/opendc-web-api/opendc/database.py b/opendc-web/opendc-web-api/opendc/database.py index 37fd1a4d..dd6367f2 100644 --- a/opendc-web/opendc-web-api/opendc/database.py +++ b/opendc-web/opendc-web-api/opendc/database.py @@ -20,7 +20,7 @@ import urllib.parse -from pymongo import MongoClient +from pymongo import MongoClient, ReturnDocument DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S' CONNECTION_POOL = None @@ -76,6 +76,12 @@ class Database: """Updates an existing object.""" return getattr(self.opendc_db, collection).update({'_id': _id}, obj) + def fetch_and_update(self, query, update, collection): + """Updates an existing object.""" + return getattr(self.opendc_db, collection).find_one_and_update(query, + update, + return_document=ReturnDocument.AFTER) + def delete_one(self, query, collection): """Deletes one object matching the given query. diff --git a/opendc-web/opendc-web-api/opendc/models/scenario.py b/opendc-web/opendc-web-api/opendc/models/scenario.py index 658d790e..0fb6c453 100644 --- a/opendc-web/opendc-web-api/opendc/models/scenario.py +++ b/opendc-web/opendc-web-api/opendc/models/scenario.py @@ -1,15 +1,12 @@ +from datetime import datetime + from marshmallow import Schema, fields + +from opendc.exts import db from opendc.models.model import Model from opendc.models.portfolio import Portfolio -class SimulationSchema(Schema): - """ - Simulation details. - """ - state = fields.String() - - class TraceSchema(Schema): """ Schema for specifying the trace of a scenario. @@ -34,27 +31,6 @@ class OperationalSchema(Schema): schedulerName = fields.String() -class ResultSchema(Schema): - """ - Schema representing the simulation results. - """ - max_num_deployed_images = fields.List(fields.Number()) - max_cpu_demand = fields.List(fields.Number()) - max_cpu_usage = fields.List(fields.Number()) - mean_num_deployed_images = fields.List(fields.Number()) - total_failure_slices = fields.List(fields.Number()) - total_failure_vm_slices = fields.List(fields.Number()) - total_granted_burst = fields.List(fields.Number()) - total_interfered_burst = fields.List(fields.Number()) - total_overcommitted_burst = fields.List(fields.Number()) - total_power_draw = fields.List(fields.Number()) - total_requested_burst = fields.List(fields.Number()) - total_vms_failed = fields.List(fields.Number()) - total_vms_finished = fields.List(fields.Number()) - total_vms_queued = fields.List(fields.Number()) - total_vms_submitted = fields.List(fields.Number()) - - class ScenarioSchema(Schema): """ Schema representing a scenario. @@ -62,11 +38,9 @@ class ScenarioSchema(Schema): _id = fields.String(dump_only=True) portfolioId = fields.String() name = fields.String(required=True) - simulation = fields.Nested(SimulationSchema) trace = fields.Nested(TraceSchema) topology = fields.Nested(TopologySchema) operational = fields.Nested(OperationalSchema) - results = fields.Nested(ResultSchema, dump_only=True) class Scenario(Model): @@ -84,3 +58,21 @@ class Scenario(Model): """ portfolio = Portfolio.from_id(self.obj['portfolioId']) portfolio.check_user_access(user_id, edit_access) + + @classmethod + def get_jobs(cls): + """Obtain the scenarios that have been queued. + """ + return cls(db.fetch_all({'simulation.state': 'QUEUED'}, cls.collection_name)) + + def update_state(self, new_state, results=None): + """Atomically update the state of the Scenario. + """ + update = {'$set': {'simulation.state': new_state, 'simulation.heartbeat': datetime.now()}} + if results: + update['$set']['results'] = results + return db.fetch_and_update( + query={'_id': self.obj['_id'], 'simulation.state': self.obj['simulation']['state']}, + update=update, + collection=self.collection_name + ) diff --git a/opendc-web/opendc-web-api/static/schema.yml b/opendc-web/opendc-web-api/static/schema.yml index 6e0bddfd..5a8c6825 100644 --- a/opendc-web/opendc-web-api/static/schema.yml +++ b/opendc-web/opendc-web-api/static/schema.yml @@ -965,7 +965,7 @@ paths: application/json: schema: properties: - project: + prefab: $ref: "#/components/schemas/Prefab" description: Prefab's new properties. required: true @@ -1040,6 +1040,135 @@ paths: "application/json": schema: $ref: "#/components/schemas/NotFound" + /jobs: + get: + tags: + - jobs + description: Get all available jobs to run. + responses: + "200": + description: Successfully retrieved available jobs. + content: + "application/json": + schema: + type: object + required: + - data + properties: + data: + type: array + items: + $ref: "#/components/schemas/Job" + "401": + description: Unauthorized. + content: + "application/json": + schema: + $ref: "#/components/schemas/Unauthorized" + "/jobs/{jobId}": + get: + tags: + - jobs + description: Get this Job. + parameters: + - name: jobId + in: path + description: Job's ID. + required: true + schema: + type: string + responses: + "200": + description: Successfully retrieved Job. + content: + "application/json": + schema: + type: object + required: + - data + properties: + data: + $ref: "#/components/schemas/Job" + "401": + description: Unauthorized. + content: + "application/json": + schema: + $ref: "#/components/schemas/Unauthorized" + "403": + description: Forbidden from retrieving Job. + content: + "application/json": + schema: + $ref: "#/components/schemas/Forbidden" + "404": + description: Job not found. + content: + "application/json": + schema: + $ref: "#/components/schemas/NotFound" + post: + tags: + - jobs + description: Update this Job. + parameters: + - name: jobId + in: path + description: Job's ID. + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + properties: + job: + $ref: "#/components/schemas/Job" + description: Job's new properties. + required: true + responses: + "200": + description: Successfully updated Job. + content: + "application/json": + schema: + type: object + required: + - data + properties: + data: + $ref: "#/components/schemas/Job" + "400": + description: Missing or incorrectly typed parameter. + content: + "application/json": + schema: + $ref: "#/components/schemas/Invalid" + "401": + description: Unauthorized. + content: + "application/json": + schema: + $ref: "#/components/schemas/Unauthorized" + "403": + description: Forbidden from retrieving Job. + content: + "application/json": + schema: + $ref: "#/components/schemas/Forbidden" + "404": + description: Job not found. + content: + "application/json": + schema: + $ref: "#/components/schemas/NotFound" + "409": + description: State conflict. + content: + "application/json": + schema: + $ref: "#/components/schemas/Invalid" components: securitySchemes: auth0: @@ -1261,13 +1390,6 @@ components: type: string name: type: string - simulation: - type: object - properties: - state: - type: string - results: - type: object trace: type: object properties: @@ -1289,6 +1411,19 @@ components: type: boolean schedulerName: type: string + Job: + type: object + properties: + _id: + type: string + scenarioId: + type: string + state: + type: string + heartbeat: + type: string + results: + type: object Trace: type: object properties: diff --git a/opendc-web/opendc-web-api/tests/api/test_jobs.py b/opendc-web/opendc-web-api/tests/api/test_jobs.py new file mode 100644 index 00000000..2efe6933 --- /dev/null +++ b/opendc-web/opendc-web-api/tests/api/test_jobs.py @@ -0,0 +1,139 @@ +# Copyright (c) 2021 AtLarge Research +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# +from datetime import datetime + +from opendc.exts import db + +test_id = 24 * '1' +test_id_2 = 24 * '2' + + +def test_get_jobs(client, mocker): + mocker.patch.object(db, 'fetch_all', return_value=[ + {'_id': 'a', 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'}} + ]) + res = client.get('/jobs/') + assert '200' in res.status + + +def test_get_job_non_existing(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value=None) + assert '404' in client.get(f'/jobs/{test_id}').status + + +def test_get_job(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': 'a', 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'} + }) + res = client.get(f'/jobs/{test_id}') + assert '200' in res.status + + +def test_update_job_nop(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'} + }) + update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', + 'simulation': {'state': 'QUEUED', 'heartbeat': datetime.now()} + }) + res = client.post(f'/jobs/{test_id}', json={'state': 'QUEUED'}) + assert '200' in res.status + update_mock.assert_called_once() + + +def test_update_job_invalid_state(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'} + }) + res = client.post(f'/jobs/{test_id}', json={'state': 'FINISHED'}) + assert '400' in res.status + + +def test_update_job_claim(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'} + }) + update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', + 'simulation': {'state': 'CLAIMED', 'heartbeat': datetime.now()} + }) + res = client.post(f'/jobs/{test_id}', json={'state': 'CLAIMED'}) + assert '200' in res.status + update_mock.assert_called_once() + + +def test_update_job_conflict(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'} + }) + update_mock = mocker.patch.object(db, 'fetch_and_update', return_value=None) + res = client.post(f'/jobs/{test_id}', json={'state': 'CLAIMED'}) + assert '409' in res.status + update_mock.assert_called_once() + + +def test_update_job_run(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'CLAIMED'} + }) + update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', + 'simulation': {'state': 'RUNNING', 'heartbeat': datetime.now()} + }) + res = client.post(f'/jobs/{test_id}', json={'state': 'RUNNING'}) + assert '200' in res.status + update_mock.assert_called_once() + + +def test_update_job_finished(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'RUNNING'} + }) + update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', + 'simulation': {'state': 'FINISHED', 'heartbeat': datetime.now()} + }) + res = client.post(f'/jobs/{test_id}', json={'state': 'FINISHED'}) + assert '200' in res.status + update_mock.assert_called_once() + + +def test_update_job_failed(client, mocker): + mocker.patch.object(db, 'fetch_one', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'RUNNING'} + }) + update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={ + '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', + 'simulation': {'state': 'FAILED', 'heartbeat': datetime.now()} + }) + res = client.post(f'/jobs/{test_id}', json={'state': 'FAILED'}) + assert '200' in res.status + update_mock.assert_called_once() |
