diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-06 22:42:31 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-10 11:54:51 +0200 |
| commit | 5b8dfc78496452bd23fab59e3ead84a8941da779 (patch) | |
| tree | f45c9c7ccb367bcb291edf535fba0981b104933b | |
| parent | 5c05d729b83dfc367bf19e8559569030f6e400b3 (diff) | |
feat(web/server): Add support for accounting simulation time
This change updates the Quarkus-based web server to add support for
tracking and limiting the simulation minutes used by the user in order
to prevent misuse of shared resources.
17 files changed, 594 insertions, 42 deletions
diff --git a/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt b/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt index dfaaa09e..4f21f0bb 100644 --- a/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt +++ b/opendc-web/opendc-web-proto/src/main/kotlin/org/opendc/web/proto/runner/Job.kt @@ -36,11 +36,16 @@ public data class Job( val state: JobState, val createdAt: Instant, val updatedAt: Instant, + val runtime: Int, val results: Map<String, Any>? = null ) { /** * A request to update the state of a job. + * + * @property state The next state of the job. + * @property runtime The runtime of the job (in seconds). + * @property results The results of the job. */ @Schema(name = "Runner.Job.Update") - public data class Update(val state: JobState, val results: Map<String, Any>? = null) + public data class Update(val state: JobState, val runtime: Int, val results: Map<String, Any>? = null) } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt index 50aa03d8..d6c06889 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt @@ -42,18 +42,22 @@ public interface JobManager { /** * Update the heartbeat of the specified job. + * + * @param id The identifier of the job. + * @param runtime The total runtime of the job. + * @return `true` if the job can continue, `false` if the job has been cancelled. */ - public fun heartbeat(id: Long) + public fun heartbeat(id: Long, runtime: Int): Boolean /** * Mark the job as failed. */ - public fun fail(id: Long) + public fun fail(id: Long, runtime: Int) /** * Persist the specified results for the specified job. */ - public fun finish(id: Long, results: Map<String, Any>) + public fun finish(id: Long, runtime: Int, results: Map<String, Any>) public companion object { /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 226bad47..d4996198 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -48,6 +48,8 @@ import org.opendc.web.proto.runner.Topology import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Random import java.util.UUID import java.util.concurrent.Executors @@ -144,9 +146,15 @@ public class OpenDCRunner( override fun compute() { val id = job.id val scenario = job.scenario + val startTime = Instant.now() + val currentThread = Thread.currentThread() val heartbeat = scheduler.scheduleWithFixedDelay( - { manager.heartbeat(id) }, + { + if (!manager.heartbeat(id, startTime.secondsSince())) { + currentThread.interrupt() + } + }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS @@ -163,12 +171,14 @@ public class OpenDCRunner( } val results = invokeAll(jobs).map { it.rawResult } - logger.info { "Finished simulation for job $id" } - heartbeat.cancel(true) + val duration = startTime.secondsSince() + logger.info { "Finished simulation for job $id (in $duration seconds)" } + manager.finish( id, + duration, mapOf( "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, "total_granted_burst" to results.map { it.totalActiveTime }, @@ -190,19 +200,26 @@ public class OpenDCRunner( } catch (e: Exception) { // Check whether the job failed due to exceeding its time budget if (Thread.interrupted()) { - logger.info { "Simulation job $id exceeded time limit" } + logger.info { "Simulation job $id exceeded time limit (${startTime.secondsSince()} seconds)" } } else { logger.info(e) { "Simulation job $id failed" } } try { heartbeat.cancel(true) - manager.fail(id) + manager.fail(id, startTime.secondsSince()) } catch (e: Throwable) { logger.error(e) { "Failed to update job" } } } } + + /** + * Calculate the seconds since the specified instant. + */ + private fun Instant.secondsSince(): Int { + return ChronoUnit.SECONDS.between(this, Instant.now()).toInt() + } } /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt index 39a6851c..5b1b7132 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt @@ -37,22 +37,23 @@ internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManag override fun claim(id: Long): Boolean { return try { - client.jobs.update(id, Job.Update(JobState.CLAIMED)) + client.jobs.update(id, Job.Update(JobState.CLAIMED, 0)) true } catch (e: IllegalStateException) { false } } - override fun heartbeat(id: Long) { - client.jobs.update(id, Job.Update(JobState.RUNNING)) + override fun heartbeat(id: Long, runtime: Int): Boolean { + val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime)) + return res?.state != JobState.FAILED } - override fun fail(id: Long) { - client.jobs.update(id, Job.Update(JobState.FAILED)) + override fun fail(id: Long, runtime: Int) { + client.jobs.update(id, Job.Update(JobState.FAILED, runtime)) } - override fun finish(id: Long, results: Map<String, Any>) { - client.jobs.update(id, Job.Update(JobState.FINISHED, results)) + override fun finish(id: Long, runtime: Int, results: Map<String, Any>) { + client.jobs.update(id, Job.Update(JobState.FINISHED, runtime)) } } diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt index c07e07f0..84a71acf 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt @@ -55,7 +55,7 @@ import javax.persistence.Table name = "Job.updateOne", query = """ UPDATE Job j - SET j.state = :newState, j.updatedAt = :updatedAt, j.results = :results + SET j.state = :newState, j.updatedAt = :updatedAt, j.runtime = :runtime, j.results = :results WHERE j.id = :id AND j.state = :oldState """ ) @@ -66,6 +66,9 @@ class Job( @GeneratedValue(strategy = GenerationType.AUTO) val id: Long, + @Column(name = "created_by", nullable = false, updatable = false) + val createdBy: String, + @OneToOne(optional = false, mappedBy = "job", fetch = FetchType.EAGER) @JoinColumn(name = "scenario_id", nullable = false) val scenario: Scenario, @@ -92,6 +95,12 @@ class Job( var state: JobState = JobState.PENDING /** + * The runtime of the job (in seconds). + */ + @Column(nullable = false) + var runtime: Int = 0 + + /** * Experiment results in JSON */ @Type(type = "json") diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt new file mode 100644 index 00000000..5b813044 --- /dev/null +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.server.model + +import java.time.LocalDate +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.NamedQueries +import javax.persistence.NamedQuery +import javax.persistence.Table + +/** + * Entity to track the number of simulation minutes used by a user. + */ +@Entity +@Table(name = "user_accounting") +@NamedQueries( + value = [ + NamedQuery( + name = "UserAccounting.consumeBudget", + query = """ + UPDATE UserAccounting a + SET a.simulationTime = a.simulationTime + :seconds + WHERE a.userId = :userId AND a.periodEnd = :periodEnd + """ + ), + NamedQuery( + name = "UserAccounting.resetBudget", + query = """ + UPDATE UserAccounting a + SET a.periodEnd = :periodEnd, a.simulationTime = :seconds + WHERE a.userId = :userId AND a.periodEnd = :oldPeriodEnd + """ + ) + ] +) +class UserAccounting( + @Id + @Column(name = "user_id", nullable = false) + val userId: String, + + /** + * The end of the accounting period. + */ + @Column(name = "period_end", nullable = false) + var periodEnd: LocalDate, + + /** + * The number of simulation seconds to be used per accounting period. + */ + @Column(name = "simulation_time_budget", nullable = false) + var simulationTimeBudget: Int +) { + /** + * The number of simulation seconds used in this period. This number should reset once the accounting period has + * been reached. + */ + @Column(name = "simulation_time", nullable = false) + var simulationTime: Int = 0 +} diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt index 5fee07a3..e9bf0af0 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt @@ -79,12 +79,13 @@ class JobRepository @Inject constructor(private val em: EntityManager) { * @param results The results to possible set. * @return `true` when the update succeeded`, `false` when there was a conflict. */ - fun updateOne(job: Job, newState: JobState, time: Instant, results: Map<String, Any>?): Boolean { + fun updateOne(job: Job, newState: JobState, time: Instant, runtime: Int, results: Map<String, Any>?): Boolean { val count = em.createNamedQuery("Job.updateOne") .setParameter("id", job.id) .setParameter("oldState", job.state) .setParameter("newState", newState) .setParameter("updatedAt", Instant.now()) + .setParameter("runtime", runtime) .setParameter("results", results) .executeUpdate() em.refresh(job) diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt new file mode 100644 index 00000000..f0265d3d --- /dev/null +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.server.repository + +import org.opendc.web.server.model.UserAccounting +import java.time.LocalDate +import javax.enterprise.context.ApplicationScoped +import javax.inject.Inject +import javax.persistence.EntityManager + +/** + * A repository to manage [UserAccounting] entities. + */ +@ApplicationScoped +class UserAccountingRepository @Inject constructor(private val em: EntityManager) { + /** + * Find the [UserAccounting] object for the specified [userId]. + * + * @param userId The unique identifier of the user. + * @return The [UserAccounting] object or `null` if it does not exist. + */ + fun findForUser(userId: String): UserAccounting? { + return em.find(UserAccounting::class.java, userId) + } + + /** + * Save the specified [UserAccounting] object to the database. + */ + fun save(accounting: UserAccounting) { + em.persist(accounting) + } + + /** + * Atomically consume the budget for the specified [UserAccounting] object. + * + * @param accounting The [UserAccounting] object to update atomically. + * @param seconds The number of seconds to consume from the user. + * @return `true` when the update succeeded`, `false` when there was a conflict. + */ + fun consumeBudget(accounting: UserAccounting, seconds: Int): Boolean { + val count = em.createNamedQuery("UserAccounting.consumeBudget") + .setParameter("userId", accounting.userId) + .setParameter("periodEnd", accounting.periodEnd) + .setParameter("seconds", seconds) + .executeUpdate() + em.refresh(accounting) + return count > 0 + } + + /** + * Atomically reset the budget for the specified [UserAccounting] object. + * + * @param accounting The [UserAccounting] object to update atomically. + * @param periodEnd The new end period for the budget. + * @param seconds The number of seconds that have already been consumed. + * @return `true` when the update succeeded`, `false` when there was a conflict. + */ + fun resetBudget(accounting: UserAccounting, periodEnd: LocalDate, seconds: Int): Boolean { + val count = em.createNamedQuery("UserAccounting.resetBudget") + .setParameter("userId", accounting.userId) + .setParameter("oldPeriodEnd", accounting.periodEnd) + .setParameter("periodEnd", periodEnd) + .setParameter("seconds", seconds) + .executeUpdate() + em.refresh(accounting) + return count > 0 + } +} diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt index 4aa2f6a1..d0432360 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt @@ -65,7 +65,7 @@ class JobResource @Inject constructor(private val jobService: JobService) { @Transactional fun update(@PathParam("job") id: Long, @Valid update: Job.Update): Job { return try { - jobService.updateState(id, update.state, update.results) + jobService.updateState(id, update.state, update.runtime, update.results) ?: throw WebApplicationException("Job not found", 404) } catch (e: IllegalArgumentException) { throw WebApplicationException(e, 400) diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt index 6b49e8b6..a0ebd4f4 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt @@ -33,7 +33,10 @@ import javax.inject.Inject * Service for managing [Job]s. */ @ApplicationScoped -class JobService @Inject constructor(private val repository: JobRepository) { +class JobService @Inject constructor( + private val repository: JobRepository, + private val accountingService: UserAccountingService +) { /** * Query the pending simulation jobs. */ @@ -50,8 +53,13 @@ class JobService @Inject constructor(private val repository: JobRepository) { /** * Atomically update the state of a [Job]. + * + * @param id The identifier of the job. + * @param newState The next state for the job. + * @param runtime The runtime of the job (in seconds). + * @param results The potential results of the job. */ - fun updateState(id: Long, newState: JobState, results: Map<String, Any>?): Job? { + fun updateState(id: Long, newState: JobState, runtime: Int, results: Map<String, Any>?): Job? { val entity = repository.findOne(id) ?: return null val state = entity.state if (!state.isTransitionLegal(newState)) { @@ -59,7 +67,15 @@ class JobService @Inject constructor(private val repository: JobRepository) { } val now = Instant.now() - if (!repository.updateOne(entity, newState, now, results)) { + var nextState = newState + val consumedBudget = (runtime - entity.runtime).coerceAtLeast(1) + + // Check whether the user still has any simulation budget left + if (accountingService.consumeSimulationBudget(entity.createdBy, consumedBudget) && nextState == JobState.RUNNING) { + nextState = JobState.FAILED // User has consumed all their budget; cancel the job + } + + if (!repository.updateOne(entity, nextState, now, runtime, results)) { throw IllegalStateException("Conflicting update") } diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt index 1dcc95ee..465ac2df 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt @@ -49,7 +49,7 @@ internal fun Portfolio.toRunnerDto(): org.opendc.web.proto.runner.Portfolio { * Convert a [Job] into a runner-facing DTO. */ internal fun Job.toRunnerDto(): org.opendc.web.proto.runner.Job { - return org.opendc.web.proto.runner.Job(id, scenario.toRunnerDto(), state, createdAt, updatedAt, results) + return org.opendc.web.proto.runner.Job(id, scenario.toRunnerDto(), state, createdAt, updatedAt, runtime, results) } /** diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt index fad4e56f..083f2451 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt @@ -22,6 +22,7 @@ package org.opendc.web.server.service +import org.opendc.web.proto.JobState import org.opendc.web.server.model.Job import org.opendc.web.server.model.Scenario import org.opendc.web.server.model.Workload @@ -44,7 +45,8 @@ class ScenarioService @Inject constructor( private val portfolioRepository: PortfolioRepository, private val topologyRepository: TopologyRepository, private val traceRepository: TraceRepository, - private val scenarioRepository: ScenarioRepository + private val scenarioRepository: ScenarioRepository, + private val accountingService: UserAccountingService ) { /** * List all [Scenario]s that belong a certain portfolio. @@ -123,7 +125,12 @@ class ScenarioService @Inject constructor( request.phenomena, request.schedulerName ) - val job = Job(0, scenario, now, portfolio.targets.repeats) + val job = Job(0, userId, scenario, now, portfolio.targets.repeats) + + // Fail the job if there is not enough budget for the simulation + if (!accountingService.hasSimulationBudget(userId)) { + job.state = JobState.FAILED + } scenario.job = job portfolio.scenarios.add(scenario) diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt new file mode 100644 index 00000000..13440a81 --- /dev/null +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.server.service + +import org.eclipse.microprofile.config.inject.ConfigProperty +import org.opendc.web.server.model.UserAccounting +import org.opendc.web.server.repository.UserAccountingRepository +import java.time.Duration +import java.time.LocalDate +import java.time.temporal.TemporalAdjusters +import javax.enterprise.context.ApplicationScoped +import javax.inject.Inject +import javax.persistence.EntityExistsException + +/** + * Service for tracking the simulation budget of users. + * + * @param repository The [UserAccountingRepository] used to communicate with the database. + * @param simulationBudget The default simulation budget for new users. + */ +@ApplicationScoped +class UserAccountingService @Inject constructor( + private val repository: UserAccountingRepository, + @ConfigProperty(name = "opendc.accounting.simulation-budget", defaultValue = "2000m") + private val simulationBudget: Duration +) { + /** + * Determine whether the user with [userId] has any remaining simulation budget. + * + * @param userId The unique identifier of the user. + * @return `true` when the user still has budget left, `false` otherwise. + */ + fun hasSimulationBudget(userId: String): Boolean { + val accounting = repository.findForUser(userId) ?: return true + val today = LocalDate.now() + + // The accounting period must be over or there must be budget remaining. + return !today.isBefore(accounting.periodEnd) || accounting.simulationTimeBudget > accounting.simulationTime + } + + /** + * Consume [seconds] from the simulation budget of the user with [userId]. + * + * @param userId The unique identifier of the user. + * @param seconds The seconds to consume from the simulation budget. + * @param `true` if the user has consumed his full budget or `false` if there is still budget remaining. + */ + fun consumeSimulationBudget(userId: String, seconds: Int): Boolean { + val today = LocalDate.now() + val nextAccountingPeriod = today.with(TemporalAdjusters.firstDayOfNextMonth()) + val repository = repository + + // We need to be careful to prevent conflicts in case of concurrency + // 1. First, we try to create the accounting object if it does not exist yet. This may fail if another instance + // creates the object concurrently. + // 2. Second, we check if the budget needs to be reset and try this atomically. + // 3. Finally, we atomically consume the budget from the object + // This is repeated three times in case there is a conflict + repeat(3) { + val accounting = repository.findForUser(userId) + + if (accounting == null) { + try { + val newAccounting = UserAccounting(userId, nextAccountingPeriod, simulationBudget.toSeconds().toInt()) + newAccounting.simulationTime = seconds + repository.save(newAccounting) + + return newAccounting.simulationTime >= newAccounting.simulationTimeBudget + } catch (e: EntityExistsException) { + // Conflict due to concurrency; retry + } + } else { + val success = if (!today.isBefore(accounting.periodEnd)) { + repository.resetBudget(accounting, nextAccountingPeriod, seconds) + } else { + repository.consumeBudget(accounting, seconds) + } + + if (success) { + return accounting.simulationTimeBudget <= accounting.simulationTime + } + } + } + + throw IllegalStateException("Failed to allocate consume budget due to conflict") + } +} diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt index 4704c5ae..742a510c 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt @@ -43,7 +43,7 @@ class QuarkusJobManager @Inject constructor(private val service: JobService) : J @Transactional override fun claim(id: Long): Boolean { return try { - service.updateState(id, JobState.CLAIMED, null) + service.updateState(id, JobState.CLAIMED, 0, null) true } catch (e: IllegalStateException) { false @@ -51,17 +51,18 @@ class QuarkusJobManager @Inject constructor(private val service: JobService) : J } @Transactional - override fun heartbeat(id: Long) { - service.updateState(id, JobState.RUNNING, null) + override fun heartbeat(id: Long, runtime: Int): Boolean { + val res = service.updateState(id, JobState.RUNNING, runtime, null) + return res?.state != JobState.FAILED } @Transactional - override fun fail(id: Long) { - service.updateState(id, JobState.FAILED, null) + override fun fail(id: Long, runtime: Int) { + service.updateState(id, JobState.FAILED, runtime, null) } @Transactional - override fun finish(id: Long, results: Map<String, Any>) { - service.updateState(id, JobState.FINISHED, results) + override fun finish(id: Long, runtime: Int, results: Map<String, Any>) { + service.updateState(id, JobState.FINISHED, runtime, results) } } diff --git a/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql b/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql index 183a70ea..1a0e4046 100644 --- a/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql +++ b/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql @@ -65,15 +65,27 @@ create table scenarios create table jobs ( - id bigint not null, - created_at timestamp not null, - repeats integer not null, + id bigint not null, + created_by varchar(255) not null, + created_at timestamp not null, + repeats integer not null, results jsonb, - state integer not null, - updated_at timestamp not null, + state integer not null, + runtime integer not null, + updated_at timestamp not null, primary key (id) ); +-- User accounting +create table user_accounting +( + user_id varchar(255) not null, + period_end date not null, + simulation_time integer not null, + simulation_time_budget integer not null, + primary key (user_id) +); + -- Workload traces available to the user. create table traces ( diff --git a/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt index 9aca58e9..4a86c928 100644 --- a/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt +++ b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/rest/runner/JobResourceTest.kt @@ -63,7 +63,7 @@ class JobResourceTest { private val dummyTopology = Topology(1, 1, "test", emptyList(), Instant.now(), Instant.now()) private val dummyTrace = Trace("bitbrains", "Bitbrains", "vm") private val dummyScenario = Scenario(1, 1, dummyPortfolio, "test", Workload(dummyTrace, 1.0), dummyTopology, OperationalPhenomena(false, false), "test") - private val dummyJob = Job(1, dummyScenario, JobState.PENDING, Instant.now(), Instant.now()) + private val dummyJob = Job(1, dummyScenario, JobState.PENDING, Instant.now(), Instant.now(), 0) @BeforeEach fun setUp() { @@ -151,10 +151,10 @@ class JobResourceTest { @Test @TestSecurity(user = "testUser", roles = ["runner"]) fun testUpdateNonExistent() { - every { jobService.updateState(1, any(), any()) } returns null + every { jobService.updateState(1, any(), any(), any()) } returns null Given { - body(Job.Update(JobState.PENDING)) + body(Job.Update(JobState.PENDING, 0)) contentType(ContentType.JSON) } When { post("/1") @@ -170,10 +170,10 @@ class JobResourceTest { @Test @TestSecurity(user = "testUser", roles = ["runner"]) fun testUpdateState() { - every { jobService.updateState(1, any(), any()) } returns dummyJob.copy(state = JobState.CLAIMED) + every { jobService.updateState(1, any(), any(), any()) } returns dummyJob.copy(state = JobState.CLAIMED) Given { - body(Job.Update(JobState.CLAIMED)) + body(Job.Update(JobState.CLAIMED, 0)) contentType(ContentType.JSON) } When { post("/1") diff --git a/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/service/UserAccountingServiceTest.kt b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/service/UserAccountingServiceTest.kt new file mode 100644 index 00000000..fdf04787 --- /dev/null +++ b/opendc-web/opendc-web-server/src/test/kotlin/org/opendc/web/server/service/UserAccountingServiceTest.kt @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.server.service + +import io.mockk.every +import io.mockk.mockk +import io.quarkus.test.junit.QuarkusTest +import org.junit.jupiter.api.Assertions.assertAll +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.web.server.model.UserAccounting +import org.opendc.web.server.repository.UserAccountingRepository +import java.time.Duration +import java.time.LocalDate +import javax.persistence.EntityExistsException + +/** + * Test suite for the [UserAccountingService]. + */ +@QuarkusTest +class UserAccountingServiceTest { + /** + * The [UserAccountingRepository] that is mocked. + */ + private val repository: UserAccountingRepository = mockk() + + /** + * The [UserAccountingService] instance under test. + */ + private val service: UserAccountingService = UserAccountingService(repository, Duration.ofHours(1)) + + @Test + fun testGetUserDoesNotExist() { + val userId = "test" + + every { repository.findForUser(userId) } returns null + + val accounting = service.getAccounting(userId) + + assertTrue(accounting.periodEnd.isAfter(LocalDate.now())) + assertEquals(0, accounting.simulationTime) + } + + @Test + fun testGetUserDoesExist() { + val userId = "test" + + val now = LocalDate.now() + val periodEnd = now.plusMonths(1) + + every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 32 } + + val accounting = service.getAccounting(userId) + + assertAll( + { assertEquals(periodEnd, accounting.periodEnd) }, + { assertEquals(32, accounting.simulationTime) }, + { assertEquals(3600, accounting.simulationTimeBudget) } + ) + } + + @Test + fun testHasBudgetUserDoesNotExist() { + val userId = "test" + + every { repository.findForUser(userId) } returns null + + assertTrue(service.hasSimulationBudget(userId)) + } + + @Test + fun testHasBudget() { + val userId = "test" + val periodEnd = LocalDate.now().plusMonths(2) + + every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600) + + assertTrue(service.hasSimulationBudget(userId)) + } + + @Test + fun testHasBudgetExceededButPeriodExpired() { + val userId = "test" + val periodEnd = LocalDate.now().minusMonths(2) + + every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 3900 } + + assertTrue(service.hasSimulationBudget(userId)) + } + + @Test + fun testHasBudgetPeriodExpired() { + val userId = "test" + val periodEnd = LocalDate.now().minusMonths(2) + + every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600) + + assertTrue(service.hasSimulationBudget(userId)) + } + + @Test + fun testHasBudgetExceeded() { + val userId = "test" + val periodEnd = LocalDate.now().plusMonths(1) + + every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 3900 } + + assertFalse(service.hasSimulationBudget(userId)) + } + + @Test + fun testConsumeBudgetNewUser() { + val userId = "test" + + every { repository.findForUser(userId) } returns null + every { repository.save(any()) } returns Unit + + assertFalse(service.consumeSimulationBudget(userId, 10)) + } + + @Test + fun testConsumeBudgetNewUserExceeded() { + val userId = "test" + + every { repository.findForUser(userId) } returns null + every { repository.save(any()) } returns Unit + + assertTrue(service.consumeSimulationBudget(userId, 4000)) + } + + @Test + fun testConsumeBudgetNewUserConflict() { + val userId = "test" + + val periodEnd = LocalDate.now().plusMonths(1) + + every { repository.findForUser(userId) } returns null andThen UserAccounting(userId, periodEnd, 3600) + every { repository.save(any()) } throws EntityExistsException() + every { repository.consumeBudget(any(), any()) } answers { + val accounting = it.invocation.args[0] as UserAccounting + accounting.simulationTime -= it.invocation.args[1] as Int + true + } + + assertFalse(service.consumeSimulationBudget(userId, 10)) + } + + @Test + fun testConsumeBudgetResetSuccess() { + val userId = "test" + + val periodEnd = LocalDate.now().minusMonths(2) + + every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600).also { it.simulationTime = 3900 } + every { repository.resetBudget(any(), any(), any()) } answers { + val accounting = it.invocation.args[0] as UserAccounting + accounting.periodEnd = it.invocation.args[1] as LocalDate + accounting.simulationTime = it.invocation.args[2] as Int + true + } + + assertTrue(service.consumeSimulationBudget(userId, 4000)) + } + + @Test + fun testInfiniteConflict() { + val userId = "test" + + val periodEnd = LocalDate.now().plusMonths(1) + + every { repository.findForUser(userId) } returns UserAccounting(userId, periodEnd, 3600) + every { repository.consumeBudget(any(), any()) } answers { + val accounting = it.invocation.args[0] as UserAccounting + accounting.simulationTime -= it.invocation.args[1] as Int + false + } + + assertThrows<IllegalStateException> { service.consumeSimulationBudget(userId, 10) } + } +} |
