summaryrefslogtreecommitdiff
path: root/opendc-web
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-06 22:42:31 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-10 11:54:51 +0200
commit5b8dfc78496452bd23fab59e3ead84a8941da779 (patch)
treef45c9c7ccb367bcb291edf535fba0981b104933b /opendc-web
parent5c05d729b83dfc367bf19e8559569030f6e400b3 (diff)
feat(web/server): Add support for accounting simulation time
This change updates the Quarkus-based web server to add support for tracking and limiting the simulation minutes used by the user in order to prevent misuse of shared resources.
Diffstat (limited to 'opendc-web')
-rw-r--r--opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt7
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt10
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt27
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt15
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt11
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt81
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt3
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt88
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt2
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt22
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt2
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt11
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt107
-rw-r--r--opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt15
-rw-r--r--opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql22
-rw-r--r--opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt10
-rw-r--r--opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/service/UserAccountingServiceTest.kt203
17 files changed, 594 insertions, 42 deletions
diff --git a/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt b/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt
index dfaaa09e..4f21f0bb 100644
--- a/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt
+++ b/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt
@@ -36,11 +36,16 @@ public data class Job(
val state: JobState,
val createdAt: Instant,
val updatedAt: Instant,
+ val runtime: Int,
val results: Map<String, Any>? = null
) {
/**
* A request to update the state of a job.
+ *
+ * @property state The next state of the job.
+ * @property runtime The runtime of the job (in seconds).
+ * @property results The results of the job.
*/
@Schema(name = "Runner.Job.Update")
- public data class Update(val state: JobState, val results: Map<String, Any>? = null)
+ public data class Update(val state: JobState, val runtime: Int, val results: Map<String, Any>? = null)
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
index 50aa03d8..d6c06889 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
@@ -42,18 +42,22 @@ public interface JobManager {
/**
* Update the heartbeat of the specified job.
+ *
+ * @param id The identifier of the job.
+ * @param runtime The total runtime of the job.
+ * @return `true` if the job can continue, `false` if the job has been cancelled.
*/
- public fun heartbeat(id: Long)
+ public fun heartbeat(id: Long, runtime: Int): Boolean
/**
* Mark the job as failed.
*/
- public fun fail(id: Long)
+ public fun fail(id: Long, runtime: Int)
/**
* Persist the specified results for the specified job.
*/
- public fun finish(id: Long, results: Map<String, Any>)
+ public fun finish(id: Long, runtime: Int, results: Map<String, Any>)
public companion object {
/**
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 226bad47..d4996198 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -48,6 +48,8 @@ import org.opendc.web.proto.runner.Topology
import org.opendc.web.runner.internal.WebComputeMonitor
import java.io.File
import java.time.Duration
+import java.time.Instant
+import java.time.temporal.ChronoUnit
import java.util.Random
import java.util.UUID
import java.util.concurrent.Executors
@@ -144,9 +146,15 @@ public class OpenDCRunner(
override fun compute() {
val id = job.id
val scenario = job.scenario
+ val startTime = Instant.now()
+ val currentThread = Thread.currentThread()
val heartbeat = scheduler.scheduleWithFixedDelay(
- { manager.heartbeat(id) },
+ {
+ if (!manager.heartbeat(id, startTime.secondsSince())) {
+ currentThread.interrupt()
+ }
+ },
0,
heartbeatInterval.toMillis(),
TimeUnit.MILLISECONDS
@@ -163,12 +171,14 @@ public class OpenDCRunner(
}
val results = invokeAll(jobs).map { it.rawResult }
- logger.info { "Finished simulation for job $id" }
-
heartbeat.cancel(true)
+ val duration = startTime.secondsSince()
+ logger.info { "Finished simulation for job $id (in $duration seconds)" }
+
manager.finish(
id,
+ duration,
mapOf(
"total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
"total_granted_burst" to results.map { it.totalActiveTime },
@@ -190,19 +200,26 @@ public class OpenDCRunner(
} catch (e: Exception) {
// Check whether the job failed due to exceeding its time budget
if (Thread.interrupted()) {
- logger.info { "Simulation job $id exceeded time limit" }
+ logger.info { "Simulation job $id exceeded time limit (${startTime.secondsSince()} seconds)" }
} else {
logger.info(e) { "Simulation job $id failed" }
}
try {
heartbeat.cancel(true)
- manager.fail(id)
+ manager.fail(id, startTime.secondsSince())
} catch (e: Throwable) {
logger.error(e) { "Failed to update job" }
}
}
}
+
+ /**
+ * Calculate the seconds since the specified instant.
+ */
+ private fun Instant.secondsSince(): Int {
+ return ChronoUnit.SECONDS.between(this, Instant.now()).toInt()
+ }
}
/**
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
index 39a6851c..5b1b7132 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
@@ -37,22 +37,23 @@ internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManag
override fun claim(id: Long): Boolean {
return try {
- client.jobs.update(id, Job.Update(JobState.CLAIMED))
+ client.jobs.update(id, Job.Update(JobState.CLAIMED, 0))
true
} catch (e: IllegalStateException) {
false
}
}
- override fun heartbeat(id: Long) {
- client.jobs.update(id, Job.Update(JobState.RUNNING))
+ override fun heartbeat(id: Long, runtime: Int): Boolean {
+ val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime))
+ return res?.state != JobState.FAILED
}
- override fun fail(id: Long) {
- client.jobs.update(id, Job.Update(JobState.FAILED))
+ override fun fail(id: Long, runtime: Int) {
+ client.jobs.update(id, Job.Update(JobState.FAILED, runtime))
}
- override fun finish(id: Long, results: Map<String, Any>) {
- client.jobs.update(id, Job.Update(JobState.FINISHED, results))
+ override fun finish(id: Long, runtime: Int, results: Map<String, Any>) {
+ client.jobs.update(id, Job.Update(JobState.FINISHED, runtime))
}
}
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt
index c07e07f0..84a71acf 100644
--- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt
@@ -55,7 +55,7 @@ import javax.persistence.Table
name = "Job.updateOne",
query = """
UPDATE Job j
- SET j.state = :newState, j.updatedAt = :updatedAt, j.results = :results
+ SET j.state = :newState, j.updatedAt = :updatedAt, j.runtime = :runtime, j.results = :results
WHERE j.id = :id AND j.state = :oldState
"""
)
@@ -66,6 +66,9 @@ class Job(
@GeneratedValue(strategy = GenerationType.AUTO)
val id: Long,
+ @Column(name = "created_by", nullable = false, updatable = false)
+ val createdBy: String,
+
@OneToOne(optional = false, mappedBy = "job", fetch = FetchType.EAGER)
@JoinColumn(name = "scenario_id", nullable = false)
val scenario: Scenario,
@@ -92,6 +95,12 @@ class Job(
var state: JobState = JobState.PENDING
/**
+ * The runtime of the job (in seconds).
+ */
+ @Column(nullable = false)
+ var runtime: Int = 0
+
+ /**
* Experiment results in JSON
*/
@Type(type = "json")
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt
new file mode 100644
index 00000000..5b813044
--- /dev/null
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.server.model
+
+import java.time.LocalDate
+import javax.persistence.Column
+import javax.persistence.Entity
+import javax.persistence.Id
+import javax.persistence.NamedQueries
+import javax.persistence.NamedQuery
+import javax.persistence.Table
+
+/**
+ * Entity to track the number of simulation minutes used by a user.
+ */
+@Entity
+@Table(name = "user_accounting")
+@NamedQueries(
+ value = [
+ NamedQuery(
+ name = "UserAccounting.consumeBudget",
+ query = """
+ UPDATE UserAccounting a
+ SET a.simulationTime = a.simulationTime + :seconds
+ WHERE a.userId = :userId AND a.periodEnd = :periodEnd
+ """
+ ),
+ NamedQuery(
+ name = "UserAccounting.resetBudget",
+ query = """
+ UPDATE UserAccounting a
+ SET a.periodEnd = :periodEnd, a.simulationTime = :seconds
+ WHERE a.userId = :userId AND a.periodEnd = :oldPeriodEnd
+ """
+ )
+ ]
+)
+class UserAccounting(
+ @Id
+ @Column(name = "user_id", nullable = false)
+ val userId: String,
+
+ /**
+ * The end of the accounting period.
+ */
+ @Column(name = "period_end", nullable = false)
+ var periodEnd: LocalDate,
+
+ /**
+ * The number of simulation seconds to be used per accounting period.
+ */
+ @Column(name = "simulation_time_budget", nullable = false)
+ var simulationTimeBudget: Int
+) {
+ /**
+ * The number of simulation seconds used in this period. This number should reset once the accounting period has
+ * been reached.
+ */
+ @Column(name = "simulation_time", nullable = false)
+ var simulationTime: Int = 0
+}
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt
index 5fee07a3..e9bf0af0 100644
--- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt
@@ -79,12 +79,13 @@ class JobRepository @Inject constructor(private val em: EntityManager) {
* @param results The results to possible set.
* @return `true` when the update succeeded`, `false` when there was a conflict.
*/
- fun updateOne(job: Job, newState: JobState, time: Instant, results: Map<String, Any>?): Boolean {
+ fun updateOne(job: Job, newState: JobState, time: Instant, runtime: Int, results: Map<String, Any>?): Boolean {
val count = em.createNamedQuery("Job.updateOne")
.setParameter("id", job.id)
.setParameter("oldState", job.state)
.setParameter("newState", newState)
.setParameter("updatedAt", Instant.now())
+ .setParameter("runtime", runtime)
.setParameter("results", results)
.executeUpdate()
em.refresh(job)
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt
new file mode 100644
index 00000000..f0265d3d
--- /dev/null
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.server.repository
+
+import org.opendc.web.server.model.UserAccounting
+import java.time.LocalDate
+import javax.enterprise.context.ApplicationScoped
+import javax.inject.Inject
+import javax.persistence.EntityManager
+
+/**
+ * A repository to manage [UserAccounting] entities.
+ */
+@ApplicationScoped
+class UserAccountingRepository @Inject constructor(private val em: EntityManager) {
+ /**
+ * Find the [UserAccounting] object for the specified [userId].
+ *
+ * @param userId The unique identifier of the user.
+ * @return The [UserAccounting] object or `null` if it does not exist.
+ */
+ fun findForUser(userId: String): UserAccounting? {
+ return em.find(UserAccounting::class.java, userId)
+ }
+
+ /**
+ * Save the specified [UserAccounting] object to the database.
+ */
+ fun save(accounting: UserAccounting) {
+ em.persist(accounting)
+ }
+
+ /**
+ * Atomically consume the budget for the specified [UserAccounting] object.
+ *
+ * @param accounting The [UserAccounting] object to update atomically.
+ * @param seconds The number of seconds to consume from the user.
+ * @return `true` when the update succeeded`, `false` when there was a conflict.
+ */
+ fun consumeBudget(accounting: UserAccounting, seconds: Int): Boolean {
+ val count = em.createNamedQuery("UserAccounting.consumeBudget")
+ .setParameter("userId", accounting.userId)
+ .setParameter("periodEnd", accounting.periodEnd)
+ .setParameter("seconds", seconds)
+ .executeUpdate()
+ em.refresh(accounting)
+ return count > 0
+ }
+
+ /**
+ * Atomically reset the budget for the specified [UserAccounting] object.
+ *
+ * @param accounting The [UserAccounting] object to update atomically.
+ * @param periodEnd The new end period for the budget.
+ * @param seconds The number of seconds that have already been consumed.
+ * @return `true` when the update succeeded`, `false` when there was a conflict.
+ */
+ fun resetBudget(accounting: UserAccounting, periodEnd: LocalDate, seconds: Int): Boolean {
+ val count = em.createNamedQuery("UserAccounting.resetBudget")
+ .setParameter("userId", accounting.userId)
+ .setParameter("oldPeriodEnd", accounting.periodEnd)
+ .setParameter("periodEnd", periodEnd)
+ .setParameter("seconds", seconds)
+ .executeUpdate()
+ em.refresh(accounting)
+ return count > 0
+ }
+}
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt
index 4aa2f6a1..d0432360 100644
--- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt
@@ -65,7 +65,7 @@ class JobResource @Inject constructor(private val jobService: JobService) {
@Transactional
fun update(@PathParam("job") id: Long, @Valid update: Job.Update): Job {
return try {
- jobService.updateState(id, update.state, update.results)
+ jobService.updateState(id, update.state, update.runtime, update.results)
?: throw WebApplicationException("Job not found", 404)
} catch (e: IllegalArgumentException) {
throw WebApplicationException(e, 400)
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt
index 6b49e8b6..a0ebd4f4 100644
--- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt
@@ -33,7 +33,10 @@ import javax.inject.Inject
* Service for managing [Job]s.
*/
@ApplicationScoped
-class JobService @Inject constructor(private val repository: JobRepository) {
+class JobService @Inject constructor(
+ private val repository: JobRepository,
+ private val accountingService: UserAccountingService
+) {
/**
* Query the pending simulation jobs.
*/
@@ -50,8 +53,13 @@ class JobService @Inject constructor(private val repository: JobRepository) {
/**
* Atomically update the state of a [Job].
+ *
+ * @param id The identifier of the job.
+ * @param newState The next state for the job.
+ * @param runtime The runtime of the job (in seconds).
+ * @param results The potential results of the job.
*/
- fun updateState(id: Long, newState: JobState, results: Map<String, Any>?): Job? {
+ fun updateState(id: Long, newState: JobState, runtime: Int, results: Map<String, Any>?): Job? {
val entity = repository.findOne(id) ?: return null
val state = entity.state
if (!state.isTransitionLegal(newState)) {
@@ -59,7 +67,15 @@ class JobService @Inject constructor(private val repository: JobRepository) {
}
val now = Instant.now()
- if (!repository.updateOne(entity, newState, now, results)) {
+ var nextState = newState
+ val consumedBudget = (runtime - entity.runtime).coerceAtLeast(1)
+
+ // Check whether the user still has any simulation budget left
+ if (accountingService.consumeSimulationBudget(entity.createdBy, consumedBudget) && nextState == JobState.RUNNING) {
+ nextState = JobState.FAILED // User has consumed all their budget; cancel the job
+ }
+
+ if (!repository.updateOne(entity, nextState, now, runtime, results)) {
throw IllegalStateException("Conflicting update")
}
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt
index 1dcc95ee..465ac2df 100644
--- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt
@@ -49,7 +49,7 @@ internal fun Portfolio.toRunnerDto(): org.opendc.web.proto.runner.Portfolio {
* Convert a [Job] into a runner-facing DTO.
*/
internal fun Job.toRunnerDto(): org.opendc.web.proto.runner.Job {
- return org.opendc.web.proto.runner.Job(id, scenario.toRunnerDto(), state, createdAt, updatedAt, results)
+ return org.opendc.web.proto.runner.Job(id, scenario.toRunnerDto(), state, createdAt, updatedAt, runtime, results)
}
/**
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt
index fad4e56f..083f2451 100644
--- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt
@@ -22,6 +22,7 @@
package org.opendc.web.server.service
+import org.opendc.web.proto.JobState
import org.opendc.web.server.model.Job
import org.opendc.web.server.model.Scenario
import org.opendc.web.server.model.Workload
@@ -44,7 +45,8 @@ class ScenarioService @Inject constructor(
private val portfolioRepository: PortfolioRepository,
private val topologyRepository: TopologyRepository,
private val traceRepository: TraceRepository,
- private val scenarioRepository: ScenarioRepository
+ private val scenarioRepository: ScenarioRepository,
+ private val accountingService: UserAccountingService
) {
/**
* List all [Scenario]s that belong a certain portfolio.
@@ -123,7 +125,12 @@ class ScenarioService @Inject constructor(
request.phenomena,
request.schedulerName
)
- val job = Job(0, scenario, now, portfolio.targets.repeats)
+ val job = Job(0, userId, scenario, now, portfolio.targets.repeats)
+
+ // Fail the job if there is not enough budget for the simulation
+ if (!accountingService.hasSimulationBudget(userId)) {
+ job.state = JobState.FAILED
+ }
scenario.job = job
portfolio.scenarios.add(scenario)
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt
new file mode 100644
index 00000000..13440a81
--- /dev/null
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.server.service
+
+import org.eclipse.microprofile.config.inject.ConfigProperty
+import org.opendc.web.server.model.UserAccounting
+import org.opendc.web.server.repository.UserAccountingRepository
+import java.time.Duration
+import java.time.LocalDate
+import java.time.temporal.TemporalAdjusters
+import javax.enterprise.context.ApplicationScoped
+import javax.inject.Inject
+import javax.persistence.EntityExistsException
+
+/**
+ * Service for tracking the simulation budget of users.
+ *
+ * @param repository The [UserAccountingRepository] used to communicate with the database.
+ * @param simulationBudget The default simulation budget for new users.
+ */
+@ApplicationScoped
+class UserAccountingService @Inject constructor(
+ private val repository: UserAccountingRepository,
+ @ConfigProperty(name = "opendc.accounting.simulation-budget", defaultValue = "2000m")
+ private val simulationBudget: Duration
+) {
+ /**
+ * Determine whether the user with [userId] has any remaining simulation budget.
+ *
+ * @param userId The unique identifier of the user.
+ * @return `true` when the user still has budget left, `false` otherwise.
+ */
+ fun hasSimulationBudget(userId: String): Boolean {
+ val accounting = repository.findForUser(userId) ?: return true
+ val today = LocalDate.now()
+
+ // The accounting period must be over or there must be budget remaining.
+ return !today.isBefore(accounting.periodEnd) || accounting.simulationTimeBudget > accounting.simulationTime
+ }
+
+ /**
+ * Consume [seconds] from the simulation budget of the user with [userId].
+ *
+ * @param userId The unique identifier of the user.
+ * @param seconds The seconds to consume from the simulation budget.
+ * @param `true` if the user has consumed his full budget or `false` if there is still budget remaining.
+ */
+ fun consumeSimulationBudget(userId: String, seconds: Int): Boolean {
+ val today = LocalDate.now()
+ val nextAccountingPeriod = today.with(TemporalAdjusters.firstDayOfNextMonth())
+ val repository = repository
+
+ // We need to be careful to prevent conflicts in case of concurrency
+ // 1. First, we try to create the accounting object if it does not exist yet. This may fail if another instance
+ // creates the object concurrently.
+ // 2. Second, we check if the budget needs to be reset and try this atomically.
+ // 3. Finally, we atomically consume the budget from the object
+ // This is repeated three times in case there is a conflict
+ repeat(3) {
+ val accounting = repository.findForUser(userId)
+
+ if (accounting == null) {
+ try {
+ val newAccounting = UserAccounting(userId, nextAccountingPeriod, simulationBudget.toSeconds().toInt())
+ newAccounting.simulationTime = seconds
+ repository.save(newAccounting)
+
+ return newAccounting.simulationTime >= newAccounting.simulationTimeBudget
+ } catch (e: EntityExistsException) {
+ // Conflict due to concurrency; retry
+ }
+ } else {
+ val success = if (!today.isBefore(accounting.periodEnd)) {
+ repository.resetBudget(accounting, nextAccountingPeriod, seconds)
+ } else {
+ repository.consumeBudget(accounting, seconds)
+ }
+
+ if (success) {
+ return accounting.simulationTimeBudget <= accounting.simulationTime
+ }
+ }
+ }
+
+ throw IllegalStateException("Failed to allocate consume budget due to conflict")
+ }
+}
diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt
index 4704c5ae..742a510c 100644
--- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt
+++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt
@@ -43,7 +43,7 @@ class QuarkusJobManager @Inject constructor(private val service: JobService) : J
@Transactional
override fun claim(id: Long): Boolean {
return try {
- service.updateState(id, JobState.CLAIMED, null)
+ service.updateState(id, JobState.CLAIMED, 0, null)
true
} catch (e: IllegalStateException) {
false
@@ -51,17 +51,18 @@ class QuarkusJobManager @Inject constructor(private val service: JobService) : J
}
@Transactional
- override fun heartbeat(id: Long) {
- service.updateState(id, JobState.RUNNING, null)
+ override fun heartbeat(id: Long, runtime: Int): Boolean {
+ val res = service.updateState(id, JobState.RUNNING, runtime, null)
+ return res?.state != JobState.FAILED
}
@Transactional
- override fun fail(id: Long) {
- service.updateState(id, JobState.FAILED, null)
+ override fun fail(id: Long, runtime: Int) {
+ service.updateState(id, JobState.FAILED, runtime, null)
}
@Transactional
- override fun finish(id: Long, results: Map<String, Any>) {
- service.updateState(id, JobState.FINISHED, results)
+ override fun finish(id: Long, runtime: Int, results: Map<String, Any>) {
+ service.updateState(id, JobState.FINISHED, runtime, results)
}
}
diff --git a/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql b/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql
index 183a70ea..1a0e4046 100644
--- a/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql
+++ b/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql
@@ -65,15 +65,27 @@ create table scenarios
create table jobs
(
- id bigint not null,
- created_at timestamp not null,
- repeats integer not null,
+ id bigint not null,
+ created_by varchar(255) not null,
+ created_at timestamp not null,
+ repeats integer not null,
results jsonb,
- state integer not null,
- updated_at timestamp not null,
+ state integer not null,
+ runtime integer not null,
+ updated_at timestamp not null,
primary key (id)
);
+-- User accounting
+create table user_accounting
+(
+ user_id varchar(255) not null,
+ period_end date not null,
+ simulation_time integer not null,
+ simulation_time_budget integer not null,
+ primary key (user_id)
+);
+
-- Workload traces available to the user.
create table traces
(
diff --git a/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt
index 9aca58e9..4a86c928 100644
--- a/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt
+++ b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt
@@ -63,7 +63,7 @@ class JobResourceTest {
private val dummyTopology = Topology(1, 1, "test", emptyList(), Instant.now(), Instant.now())
private val dummyTrace = Trace("bitbrains", "Bitbrains", "vm")
private val dummyScenario = Scenario(1, 1, dummyPortfolio, "test", Workload(dummyTrace, 1.0), dummyTopology, OperationalPhenomena(false, false), "test")
- private val dummyJob = Job(1, dummyScenario, JobState.PENDING, Instant.now(), Instant.now())
+ private val dummyJob = Job(1, dummyScenario, JobState.PENDING, Instant.now(), Instant.now(), 0)
@BeforeEach
fun setUp() {
@@ -151,10 +151,10 @@ class JobResourceTest {
@Test
@TestSecurity(user = "testUser", roles = ["runner"])
fun testUpdateNonExistent() {
- every { jobService.updateState(1, any(), any()) } returns null
+ every { jobService.updateState(1, any(), any(), any()) } returns null
Given {
- body(Job.Update(JobState.PENDING))
+ body(Job.Update(JobState.PENDING, 0))
contentType(ContentType.JSON)
} When {
post("/1")
@@ -170,10 +170,10 @@ class JobResourceTest {
@Test
@TestSecurity(user = "testUser", roles = ["runner"])
fun testUpdateState() {
- every { jobService.updateState(1, any(), any()) } returns dummyJob.copy(state = JobState.CLAIMED)
+ every { jobService.updateState(1, any(), any(), any()) } returns dummyJob.copy(state = JobState.CLAIMED)
Given {
- body(Job.Update(JobState.CLAIMED))
+ body(Job.Update(JobState.CLAIMED, 0))
contentType(ContentType.JSON)
} When {
post("/1")
diff --git a/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/service/UserAccountingServiceTest.kt b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/service/UserAccountingServiceTest.kt
new file mode 100644
index 00000000..fdf04787
--- /dev/null
+++ b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/service/UserAccountingServiceTest.kt
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.server.service
+
+import io.mockk.every
+import io.mockk.mockk
+import io.quarkus.test.junit.QuarkusTest
+import org.junit.jupiter.api.Assertions.assertAll
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertFalse
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.web.server.model.UserAccounting
+import org.opendc.web.server.repository.UserAccountingRepository
+import java.time.Duration
+import java.time.LocalDate
+import javax.persistence.EntityExistsException
+
+/**
+ * Test suite for the [UserAccountingService].
+ */
+@QuarkusTest
+class UserAccountingServiceTest {
+ /**
+ * The [UserAccountingRepository] that is mocked.
+ */
+ private val repository: UserAccountingRepository = mockk()
+
+ /**
+ * The [UserAccountingService] instance under test.
+ */
+ private val service: UserAccountingService = UserAccountingService(repository, Duration.ofHours(1))
+
+ @Test
+ fun testGetUserDoesNotExist() {
+ val userId = "test"
+
+ every { repository.findForUser(userId) } returns null
+
+ val accounting = service.getAccounting(userId)
+
+ assertTrue(accounting.periodEnd.isAfter(LocalDate.now()))
+ assertEquals(0, accounting.simulationTime)
+ }
+
+ @Test
+ fun testGetUserDoesExist() {
+ val userId = "test"
+
+ val now = LocalDate.now()
+ val periodEnd = now.plusMonths(1)
+
+ every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 32 }
+
+ val accounting = service.getAccounting(userId)
+
+ assertAll(
+ { assertEquals(periodEnd, accounting.periodEnd) },
+ { assertEquals(32, accounting.simulationTime) },
+ { assertEquals(3600, accounting.simulationTimeBudget) }
+ )
+ }
+
+ @Test
+ fun testHasBudgetUserDoesNotExist() {
+ val userId = "test"
+
+ every { repository.findForUser(userId) } returns null
+
+ assertTrue(service.hasSimulationBudget(userId))
+ }
+
+ @Test
+ fun testHasBudget() {
+ val userId = "test"
+ val periodEnd = LocalDate.now().plusMonths(2)
+
+ every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600)
+
+ assertTrue(service.hasSimulationBudget(userId))
+ }
+
+ @Test
+ fun testHasBudgetExceededButPeriodExpired() {
+ val userId = "test"
+ val periodEnd = LocalDate.now().minusMonths(2)
+
+ every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 3900 }
+
+ assertTrue(service.hasSimulationBudget(userId))
+ }
+
+ @Test
+ fun testHasBudgetPeriodExpired() {
+ val userId = "test"
+ val periodEnd = LocalDate.now().minusMonths(2)
+
+ every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600)
+
+ assertTrue(service.hasSimulationBudget(userId))
+ }
+
+ @Test
+ fun testHasBudgetExceeded() {
+ val userId = "test"
+ val periodEnd = LocalDate.now().plusMonths(1)
+
+ every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 3900 }
+
+ assertFalse(service.hasSimulationBudget(userId))
+ }
+
+ @Test
+ fun testConsumeBudgetNewUser() {
+ val userId = "test"
+
+ every { repository.findForUser(userId) } returns null
+ every { repository.save(any()) } returns Unit
+
+ assertFalse(service.consumeSimulationBudget(userId, 10))
+ }
+
+ @Test
+ fun testConsumeBudgetNewUserExceeded() {
+ val userId = "test"
+
+ every { repository.findForUser(userId) } returns null
+ every { repository.save(any()) } returns Unit
+
+ assertTrue(service.consumeSimulationBudget(userId, 4000))
+ }
+
+ @Test
+ fun testConsumeBudgetNewUserConflict() {
+ val userId = "test"
+
+ val periodEnd = LocalDate.now().plusMonths(1)
+
+ every { repository.findForUser(userId) } returns null andThen UserAccounting(userId, periodEnd, 3600)
+ every { repository.save(any()) } throws EntityExistsException()
+ every { repository.consumeBudget(any(), any()) } answers {
+ val accounting = it.invocation.args[0] as UserAccounting
+ accounting.simulationTime -= it.invocation.args[1] as Int
+ true
+ }
+
+ assertFalse(service.consumeSimulationBudget(userId, 10))
+ }
+
+ @Test
+ fun testConsumeBudgetResetSuccess() {
+ val userId = "test"
+
+ val periodEnd = LocalDate.now().minusMonths(2)
+
+ every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 3900 }
+ every { repository.resetBudget(any(), any(), any()) } answers {
+ val accounting = it.invocation.args[0] as UserAccounting
+ accounting.periodEnd = it.invocation.args[1] as LocalDate
+ accounting.simulationTime = it.invocation.args[2] as Int
+ true
+ }
+
+ assertTrue(service.consumeSimulationBudget(userId, 4000))
+ }
+
+ @Test
+ fun testInfiniteConflict() {
+ val userId = "test"
+
+ val periodEnd = LocalDate.now().plusMonths(1)
+
+ every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600)
+ every { repository.consumeBudget(any(), any()) } answers {
+ val accounting = it.invocation.args[0] as UserAccounting
+ accounting.simulationTime -= it.invocation.args[1] as Int
+ false
+ }
+
+ assertThrows<IllegalStateException> { service.consumeSimulationBudget(userId, 10) }
+ }
+}