summaryrefslogtreecommitdiff
path: root/opendc-web
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-web')
-rwxr-xr-xopendc-web/opendc-web-api/app.py3
-rw-r--r--opendc-web/opendc-web-api/opendc/api/jobs.py105
-rw-r--r--opendc-web/opendc-web-api/opendc/database.py8
-rw-r--r--opendc-web/opendc-web-api/opendc/models/scenario.py52
-rw-r--r--opendc-web/opendc-web-api/static/schema.yml151
-rw-r--r--opendc-web/opendc-web-api/tests/api/test_jobs.py139
6 files changed, 419 insertions, 39 deletions
diff --git a/opendc-web/opendc-web-api/app.py b/opendc-web/opendc-web-api/app.py
index 96a1ca7a..36c80b7a 100755
--- a/opendc-web/opendc-web-api/app.py
+++ b/opendc-web/opendc-web-api/app.py
@@ -10,6 +10,7 @@ from flask_restful import Api
from flask_swagger_ui import get_swaggerui_blueprint
from marshmallow import ValidationError
+from opendc.api.jobs import JobList, Job
from opendc.api.portfolios import Portfolio, PortfolioScenarios
from opendc.api.prefabs import Prefab, PrefabList
from opendc.api.projects import ProjectList, Project, ProjectTopologies, ProjectPortfolios
@@ -60,6 +61,8 @@ def setup_api(app):
api.add_resource(TraceList, '/traces/')
api.add_resource(Trace, '/traces/<string:trace_id>')
api.add_resource(SchedulerList, '/schedulers/')
+ api.add_resource(JobList, '/jobs/')
+ api.add_resource(Job, '/jobs/<string:job_id>')
@app.errorhandler(AuthError)
def handle_auth_error(ex):
diff --git a/opendc-web/opendc-web-api/opendc/api/jobs.py b/opendc-web/opendc-web-api/opendc/api/jobs.py
new file mode 100644
index 00000000..5feaea16
--- /dev/null
+++ b/opendc-web/opendc-web-api/opendc/api/jobs.py
@@ -0,0 +1,105 @@
+# Copyright (c) 2021 AtLarge Research
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+from flask import request
+from flask_restful import Resource
+from marshmallow import fields, Schema, validate
+from werkzeug.exceptions import BadRequest, Conflict
+
+from opendc.exts import requires_auth
+from opendc.models.scenario import Scenario
+
+
+def convert_to_job(scenario):
+ """Convert a scenario to a job.
+ """
+ return JobSchema().dump({
+ '_id': scenario['_id'],
+ 'scenarioId': scenario['_id'],
+ 'state': scenario['simulation']['state'],
+ 'heartbeat': scenario['simulation'].get('heartbeat', None),
+ 'results': scenario.get('results', {})
+ })
+
+
+class JobSchema(Schema):
+ """
+ Schema representing a simulation job.
+ """
+ _id = fields.String(dump_only=True)
+ scenarioId = fields.String(dump_only=True)
+ state = fields.String(required=True,
+ validate=validate.OneOf(["QUEUED", "CLAIMED", "RUNNING", "FINISHED", "FAILED"]))
+ heartbeat = fields.DateTime()
+ results = fields.Dict()
+
+
+class JobList(Resource):
+ """
+ Resource representing the list of available jobs.
+ """
+ method_decorators = [requires_auth]
+
+ def get(self):
+ """Get all available jobs."""
+ jobs = Scenario.get_jobs()
+ data = list(map(convert_to_job, jobs.obj))
+ return {'data': data}
+
+
+class Job(Resource):
+ """
+ Resource representing a single job.
+ """
+ method_decorators = [requires_auth]
+
+ def get(self, job_id):
+ """Get the details of a single job."""
+ job = Scenario.from_id(job_id)
+ job.check_exists()
+ data = convert_to_job(job.obj)
+ return {'data': data}
+
+ def post(self, job_id):
+ """Update the details of a single job."""
+ action = JobSchema(only=('state', 'results')).load(request.json)
+
+ job = Scenario.from_id(job_id)
+ job.check_exists()
+
+ old_state = job.obj['simulation']['state']
+ new_state = action['state']
+
+ if old_state == new_state:
+ data = job.update_state(new_state)
+ elif (old_state, new_state) == ('QUEUED', 'CLAIMED'):
+ data = job.update_state('CLAIMED')
+ elif (old_state, new_state) == ('CLAIMED', 'RUNNING'):
+ data = job.update_state('RUNNING')
+ elif (old_state, new_state) == ('RUNNING', 'FINISHED'):
+ data = job.update_state('FINISHED', results=action.get('results', None))
+ elif old_state in ('CLAIMED', 'RUNNING') and new_state == 'FAILED':
+ data = job.update_state('FAILED')
+ else:
+ raise BadRequest('Invalid state transition')
+
+ if not data:
+ raise Conflict('State conflict')
+
+ return {'data': convert_to_job(data)}
diff --git a/opendc-web/opendc-web-api/opendc/database.py b/opendc-web/opendc-web-api/opendc/database.py
index 37fd1a4d..dd6367f2 100644
--- a/opendc-web/opendc-web-api/opendc/database.py
+++ b/opendc-web/opendc-web-api/opendc/database.py
@@ -20,7 +20,7 @@
import urllib.parse
-from pymongo import MongoClient
+from pymongo import MongoClient, ReturnDocument
DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S'
CONNECTION_POOL = None
@@ -76,6 +76,12 @@ class Database:
"""Updates an existing object."""
return getattr(self.opendc_db, collection).update({'_id': _id}, obj)
+ def fetch_and_update(self, query, update, collection):
+ """Updates an existing object."""
+ return getattr(self.opendc_db, collection).find_one_and_update(query,
+ update,
+ return_document=ReturnDocument.AFTER)
+
def delete_one(self, query, collection):
"""Deletes one object matching the given query.
diff --git a/opendc-web/opendc-web-api/opendc/models/scenario.py b/opendc-web/opendc-web-api/opendc/models/scenario.py
index 658d790e..0fb6c453 100644
--- a/opendc-web/opendc-web-api/opendc/models/scenario.py
+++ b/opendc-web/opendc-web-api/opendc/models/scenario.py
@@ -1,15 +1,12 @@
+from datetime import datetime
+
from marshmallow import Schema, fields
+
+from opendc.exts import db
from opendc.models.model import Model
from opendc.models.portfolio import Portfolio
-class SimulationSchema(Schema):
- """
- Simulation details.
- """
- state = fields.String()
-
-
class TraceSchema(Schema):
"""
Schema for specifying the trace of a scenario.
@@ -34,27 +31,6 @@ class OperationalSchema(Schema):
schedulerName = fields.String()
-class ResultSchema(Schema):
- """
- Schema representing the simulation results.
- """
- max_num_deployed_images = fields.List(fields.Number())
- max_cpu_demand = fields.List(fields.Number())
- max_cpu_usage = fields.List(fields.Number())
- mean_num_deployed_images = fields.List(fields.Number())
- total_failure_slices = fields.List(fields.Number())
- total_failure_vm_slices = fields.List(fields.Number())
- total_granted_burst = fields.List(fields.Number())
- total_interfered_burst = fields.List(fields.Number())
- total_overcommitted_burst = fields.List(fields.Number())
- total_power_draw = fields.List(fields.Number())
- total_requested_burst = fields.List(fields.Number())
- total_vms_failed = fields.List(fields.Number())
- total_vms_finished = fields.List(fields.Number())
- total_vms_queued = fields.List(fields.Number())
- total_vms_submitted = fields.List(fields.Number())
-
-
class ScenarioSchema(Schema):
"""
Schema representing a scenario.
@@ -62,11 +38,9 @@ class ScenarioSchema(Schema):
_id = fields.String(dump_only=True)
portfolioId = fields.String()
name = fields.String(required=True)
- simulation = fields.Nested(SimulationSchema)
trace = fields.Nested(TraceSchema)
topology = fields.Nested(TopologySchema)
operational = fields.Nested(OperationalSchema)
- results = fields.Nested(ResultSchema, dump_only=True)
class Scenario(Model):
@@ -84,3 +58,21 @@ class Scenario(Model):
"""
portfolio = Portfolio.from_id(self.obj['portfolioId'])
portfolio.check_user_access(user_id, edit_access)
+
+ @classmethod
+ def get_jobs(cls):
+ """Obtain the scenarios that have been queued.
+ """
+ return cls(db.fetch_all({'simulation.state': 'QUEUED'}, cls.collection_name))
+
+ def update_state(self, new_state, results=None):
+ """Atomically update the state of the Scenario.
+ """
+ update = {'$set': {'simulation.state': new_state, 'simulation.heartbeat': datetime.now()}}
+ if results:
+ update['$set']['results'] = results
+ return db.fetch_and_update(
+ query={'_id': self.obj['_id'], 'simulation.state': self.obj['simulation']['state']},
+ update=update,
+ collection=self.collection_name
+ )
diff --git a/opendc-web/opendc-web-api/static/schema.yml b/opendc-web/opendc-web-api/static/schema.yml
index 6e0bddfd..5a8c6825 100644
--- a/opendc-web/opendc-web-api/static/schema.yml
+++ b/opendc-web/opendc-web-api/static/schema.yml
@@ -965,7 +965,7 @@ paths:
application/json:
schema:
properties:
- project:
+ prefab:
$ref: "#/components/schemas/Prefab"
description: Prefab's new properties.
required: true
@@ -1040,6 +1040,135 @@ paths:
"application/json":
schema:
$ref: "#/components/schemas/NotFound"
+ /jobs:
+ get:
+ tags:
+ - jobs
+ description: Get all available jobs to run.
+ responses:
+ "200":
+ description: Successfully retrieved available jobs.
+ content:
+ "application/json":
+ schema:
+ type: object
+ required:
+ - data
+ properties:
+ data:
+ type: array
+ items:
+ $ref: "#/components/schemas/Job"
+ "401":
+ description: Unauthorized.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/Unauthorized"
+ "/jobs/{jobId}":
+ get:
+ tags:
+ - jobs
+ description: Get this Job.
+ parameters:
+ - name: jobId
+ in: path
+ description: Job's ID.
+ required: true
+ schema:
+ type: string
+ responses:
+ "200":
+ description: Successfully retrieved Job.
+ content:
+ "application/json":
+ schema:
+ type: object
+ required:
+ - data
+ properties:
+ data:
+ $ref: "#/components/schemas/Job"
+ "401":
+ description: Unauthorized.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/Unauthorized"
+ "403":
+ description: Forbidden from retrieving Job.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/Forbidden"
+ "404":
+ description: Job not found.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/NotFound"
+ post:
+ tags:
+ - jobs
+ description: Update this Job.
+ parameters:
+ - name: jobId
+ in: path
+ description: Job's ID.
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ properties:
+ job:
+ $ref: "#/components/schemas/Job"
+ description: Job's new properties.
+ required: true
+ responses:
+ "200":
+ description: Successfully updated Job.
+ content:
+ "application/json":
+ schema:
+ type: object
+ required:
+ - data
+ properties:
+ data:
+ $ref: "#/components/schemas/Job"
+ "400":
+ description: Missing or incorrectly typed parameter.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/Invalid"
+ "401":
+ description: Unauthorized.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/Unauthorized"
+ "403":
+ description: Forbidden from retrieving Job.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/Forbidden"
+ "404":
+ description: Job not found.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/NotFound"
+ "409":
+ description: State conflict.
+ content:
+ "application/json":
+ schema:
+ $ref: "#/components/schemas/Invalid"
components:
securitySchemes:
auth0:
@@ -1261,13 +1390,6 @@ components:
type: string
name:
type: string
- simulation:
- type: object
- properties:
- state:
- type: string
- results:
- type: object
trace:
type: object
properties:
@@ -1289,6 +1411,19 @@ components:
type: boolean
schedulerName:
type: string
+ Job:
+ type: object
+ properties:
+ _id:
+ type: string
+ scenarioId:
+ type: string
+ state:
+ type: string
+ heartbeat:
+ type: string
+ results:
+ type: object
Trace:
type: object
properties:
diff --git a/opendc-web/opendc-web-api/tests/api/test_jobs.py b/opendc-web/opendc-web-api/tests/api/test_jobs.py
new file mode 100644
index 00000000..2efe6933
--- /dev/null
+++ b/opendc-web/opendc-web-api/tests/api/test_jobs.py
@@ -0,0 +1,139 @@
+# Copyright (c) 2021 AtLarge Research
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+#
+from datetime import datetime
+
+from opendc.exts import db
+
+test_id = 24 * '1'
+test_id_2 = 24 * '2'
+
+
+def test_get_jobs(client, mocker):
+ mocker.patch.object(db, 'fetch_all', return_value=[
+ {'_id': 'a', 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'}}
+ ])
+ res = client.get('/jobs/')
+ assert '200' in res.status
+
+
+def test_get_job_non_existing(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value=None)
+ assert '404' in client.get(f'/jobs/{test_id}').status
+
+
+def test_get_job(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': 'a', 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'}
+ })
+ res = client.get(f'/jobs/{test_id}')
+ assert '200' in res.status
+
+
+def test_update_job_nop(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'}
+ })
+ update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y',
+ 'simulation': {'state': 'QUEUED', 'heartbeat': datetime.now()}
+ })
+ res = client.post(f'/jobs/{test_id}', json={'state': 'QUEUED'})
+ assert '200' in res.status
+ update_mock.assert_called_once()
+
+
+def test_update_job_invalid_state(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'}
+ })
+ res = client.post(f'/jobs/{test_id}', json={'state': 'FINISHED'})
+ assert '400' in res.status
+
+
+def test_update_job_claim(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'}
+ })
+ update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y',
+ 'simulation': {'state': 'CLAIMED', 'heartbeat': datetime.now()}
+ })
+ res = client.post(f'/jobs/{test_id}', json={'state': 'CLAIMED'})
+ assert '200' in res.status
+ update_mock.assert_called_once()
+
+
+def test_update_job_conflict(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'QUEUED'}
+ })
+ update_mock = mocker.patch.object(db, 'fetch_and_update', return_value=None)
+ res = client.post(f'/jobs/{test_id}', json={'state': 'CLAIMED'})
+ assert '409' in res.status
+ update_mock.assert_called_once()
+
+
+def test_update_job_run(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'CLAIMED'}
+ })
+ update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y',
+ 'simulation': {'state': 'RUNNING', 'heartbeat': datetime.now()}
+ })
+ res = client.post(f'/jobs/{test_id}', json={'state': 'RUNNING'})
+ assert '200' in res.status
+ update_mock.assert_called_once()
+
+
+def test_update_job_finished(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'RUNNING'}
+ })
+ update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y',
+ 'simulation': {'state': 'FINISHED', 'heartbeat': datetime.now()}
+ })
+ res = client.post(f'/jobs/{test_id}', json={'state': 'FINISHED'})
+ assert '200' in res.status
+ update_mock.assert_called_once()
+
+
+def test_update_job_failed(client, mocker):
+ mocker.patch.object(db, 'fetch_one', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y', 'simulation': {'state': 'RUNNING'}
+ })
+ update_mock = mocker.patch.object(db, 'fetch_and_update', return_value={
+ '_id': test_id, 'scenarioId': 'x', 'portfolioId': 'y',
+ 'simulation': {'state': 'FAILED', 'heartbeat': datetime.now()}
+ })
+ res = client.post(f'/jobs/{test_id}', json={'state': 'FAILED'})
+ assert '200' in res.status
+ update_mock.assert_called_once()