From 5b8dfc78496452bd23fab59e3ead84a8941da779 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 6 Oct 2022 22:42:31 +0200 Subject: feat(web/server): Add support for accounting simulation time This change updates the Quarkus-based web server to add support for tracking and limiting the simulation minutes used by the user in order to prevent misuse of shared resources. --- .../main/kotlin/org/opendc/web/server/model/Job.kt | 11 ++- .../org/opendc/web/server/model/UserAccounting.kt | 81 ++++++++++++++++ .../opendc/web/server/repository/JobRepository.kt | 3 +- .../server/repository/UserAccountingRepository.kt | 88 +++++++++++++++++ .../opendc/web/server/rest/runner/JobResource.kt | 2 +- .../org/opendc/web/server/service/JobService.kt | 22 ++++- .../opendc/web/server/service/RunnerConversions.kt | 2 +- .../opendc/web/server/service/ScenarioService.kt | 11 ++- .../web/server/service/UserAccountingService.kt | 107 +++++++++++++++++++++ .../web/server/util/runner/QuarkusJobManager.kt | 15 +-- .../main/resources/db/migration/V1.0.0__core.sql | 22 ++++- 11 files changed, 343 insertions(+), 21 deletions(-) create mode 100644 opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt create mode 100644 opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt create mode 100644 opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt (limited to 'opendc-web/opendc-web-server/src/main') diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt index c07e07f0..84a71acf 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/Job.kt @@ -55,7 +55,7 @@ import javax.persistence.Table name = "Job.updateOne", query = """ UPDATE Job j - SET j.state = :newState, j.updatedAt = :updatedAt, j.results = :results + SET j.state = :newState, j.updatedAt = :updatedAt, j.runtime = :runtime, j.results = :results WHERE j.id = :id AND j.state = :oldState """ ) @@ -66,6 +66,9 @@ class Job( @GeneratedValue(strategy = GenerationType.AUTO) val id: Long, + @Column(name = "created_by", nullable = false, updatable = false) + val createdBy: String, + @OneToOne(optional = false, mappedBy = "job", fetch = FetchType.EAGER) @JoinColumn(name = "scenario_id", nullable = false) val scenario: Scenario, @@ -91,6 +94,12 @@ class Job( @Column(nullable = false) var state: JobState = JobState.PENDING + /** + * The runtime of the job (in seconds). + */ + @Column(nullable = false) + var runtime: Int = 0 + /** * Experiment results in JSON */ diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt new file mode 100644 index 00000000..5b813044 --- /dev/null +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/model/UserAccounting.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.server.model + +import java.time.LocalDate +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.NamedQueries +import javax.persistence.NamedQuery +import javax.persistence.Table + +/** + * Entity to track the number of simulation minutes used by a user. + */ +@Entity +@Table(name = "user_accounting") +@NamedQueries( + value = [ + NamedQuery( + name = "UserAccounting.consumeBudget", + query = """ + UPDATE UserAccounting a + SET a.simulationTime = a.simulationTime + :seconds + WHERE a.userId = :userId AND a.periodEnd = :periodEnd + """ + ), + NamedQuery( + name = "UserAccounting.resetBudget", + query = """ + UPDATE UserAccounting a + SET a.periodEnd = :periodEnd, a.simulationTime = :seconds + WHERE a.userId = :userId AND a.periodEnd = :oldPeriodEnd + """ + ) + ] +) +class UserAccounting( + @Id + @Column(name = "user_id", nullable = false) + val userId: String, + + /** + * The end of the accounting period. + */ + @Column(name = "period_end", nullable = false) + var periodEnd: LocalDate, + + /** + * The number of simulation seconds to be used per accounting period. + */ + @Column(name = "simulation_time_budget", nullable = false) + var simulationTimeBudget: Int +) { + /** + * The number of simulation seconds used in this period. This number should reset once the accounting period has + * been reached. + */ + @Column(name = "simulation_time", nullable = false) + var simulationTime: Int = 0 +} diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt index 5fee07a3..e9bf0af0 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/JobRepository.kt @@ -79,12 +79,13 @@ class JobRepository @Inject constructor(private val em: EntityManager) { * @param results The results to possible set. * @return `true` when the update succeeded`, `false` when there was a conflict. */ - fun updateOne(job: Job, newState: JobState, time: Instant, results: Map?): Boolean { + fun updateOne(job: Job, newState: JobState, time: Instant, runtime: Int, results: Map?): Boolean { val count = em.createNamedQuery("Job.updateOne") .setParameter("id", job.id) .setParameter("oldState", job.state) .setParameter("newState", newState) .setParameter("updatedAt", Instant.now()) + .setParameter("runtime", runtime) .setParameter("results", results) .executeUpdate() em.refresh(job) diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt new file mode 100644 index 00000000..f0265d3d --- /dev/null +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/repository/UserAccountingRepository.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.server.repository + +import org.opendc.web.server.model.UserAccounting +import java.time.LocalDate +import javax.enterprise.context.ApplicationScoped +import javax.inject.Inject +import javax.persistence.EntityManager + +/** + * A repository to manage [UserAccounting] entities. + */ +@ApplicationScoped +class UserAccountingRepository @Inject constructor(private val em: EntityManager) { + /** + * Find the [UserAccounting] object for the specified [userId]. + * + * @param userId The unique identifier of the user. + * @return The [UserAccounting] object or `null` if it does not exist. + */ + fun findForUser(userId: String): UserAccounting? { + return em.find(UserAccounting::class.java, userId) + } + + /** + * Save the specified [UserAccounting] object to the database. + */ + fun save(accounting: UserAccounting) { + em.persist(accounting) + } + + /** + * Atomically consume the budget for the specified [UserAccounting] object. + * + * @param accounting The [UserAccounting] object to update atomically. + * @param seconds The number of seconds to consume from the user. + * @return `true` when the update succeeded`, `false` when there was a conflict. + */ + fun consumeBudget(accounting: UserAccounting, seconds: Int): Boolean { + val count = em.createNamedQuery("UserAccounting.consumeBudget") + .setParameter("userId", accounting.userId) + .setParameter("periodEnd", accounting.periodEnd) + .setParameter("seconds", seconds) + .executeUpdate() + em.refresh(accounting) + return count > 0 + } + + /** + * Atomically reset the budget for the specified [UserAccounting] object. + * + * @param accounting The [UserAccounting] object to update atomically. + * @param periodEnd The new end period for the budget. + * @param seconds The number of seconds that have already been consumed. + * @return `true` when the update succeeded`, `false` when there was a conflict. + */ + fun resetBudget(accounting: UserAccounting, periodEnd: LocalDate, seconds: Int): Boolean { + val count = em.createNamedQuery("UserAccounting.resetBudget") + .setParameter("userId", accounting.userId) + .setParameter("oldPeriodEnd", accounting.periodEnd) + .setParameter("periodEnd", periodEnd) + .setParameter("seconds", seconds) + .executeUpdate() + em.refresh(accounting) + return count > 0 + } +} diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt index 4aa2f6a1..d0432360 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/rest/runner/JobResource.kt @@ -65,7 +65,7 @@ class JobResource @Inject constructor(private val jobService: JobService) { @Transactional fun update(@PathParam("job") id: Long, @Valid update: Job.Update): Job { return try { - jobService.updateState(id, update.state, update.results) + jobService.updateState(id, update.state, update.runtime, update.results) ?: throw WebApplicationException("Job not found", 404) } catch (e: IllegalArgumentException) { throw WebApplicationException(e, 400) diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt index 6b49e8b6..a0ebd4f4 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/JobService.kt @@ -33,7 +33,10 @@ import javax.inject.Inject * Service for managing [Job]s. */ @ApplicationScoped -class JobService @Inject constructor(private val repository: JobRepository) { +class JobService @Inject constructor( + private val repository: JobRepository, + private val accountingService: UserAccountingService +) { /** * Query the pending simulation jobs. */ @@ -50,8 +53,13 @@ class JobService @Inject constructor(private val repository: JobRepository) { /** * Atomically update the state of a [Job]. + * + * @param id The identifier of the job. + * @param newState The next state for the job. + * @param runtime The runtime of the job (in seconds). + * @param results The potential results of the job. */ - fun updateState(id: Long, newState: JobState, results: Map?): Job? { + fun updateState(id: Long, newState: JobState, runtime: Int, results: Map?): Job? { val entity = repository.findOne(id) ?: return null val state = entity.state if (!state.isTransitionLegal(newState)) { @@ -59,7 +67,15 @@ class JobService @Inject constructor(private val repository: JobRepository) { } val now = Instant.now() - if (!repository.updateOne(entity, newState, now, results)) { + var nextState = newState + val consumedBudget = (runtime - entity.runtime).coerceAtLeast(1) + + // Check whether the user still has any simulation budget left + if (accountingService.consumeSimulationBudget(entity.createdBy, consumedBudget) && nextState == JobState.RUNNING) { + nextState = JobState.FAILED // User has consumed all their budget; cancel the job + } + + if (!repository.updateOne(entity, nextState, now, runtime, results)) { throw IllegalStateException("Conflicting update") } diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt index 1dcc95ee..465ac2df 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/RunnerConversions.kt @@ -49,7 +49,7 @@ internal fun Portfolio.toRunnerDto(): org.opendc.web.proto.runner.Portfolio { * Convert a [Job] into a runner-facing DTO. */ internal fun Job.toRunnerDto(): org.opendc.web.proto.runner.Job { - return org.opendc.web.proto.runner.Job(id, scenario.toRunnerDto(), state, createdAt, updatedAt, results) + return org.opendc.web.proto.runner.Job(id, scenario.toRunnerDto(), state, createdAt, updatedAt, runtime, results) } /** diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt index fad4e56f..083f2451 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/ScenarioService.kt @@ -22,6 +22,7 @@ package org.opendc.web.server.service +import org.opendc.web.proto.JobState import org.opendc.web.server.model.Job import org.opendc.web.server.model.Scenario import org.opendc.web.server.model.Workload @@ -44,7 +45,8 @@ class ScenarioService @Inject constructor( private val portfolioRepository: PortfolioRepository, private val topologyRepository: TopologyRepository, private val traceRepository: TraceRepository, - private val scenarioRepository: ScenarioRepository + private val scenarioRepository: ScenarioRepository, + private val accountingService: UserAccountingService ) { /** * List all [Scenario]s that belong a certain portfolio. @@ -123,7 +125,12 @@ class ScenarioService @Inject constructor( request.phenomena, request.schedulerName ) - val job = Job(0, scenario, now, portfolio.targets.repeats) + val job = Job(0, userId, scenario, now, portfolio.targets.repeats) + + // Fail the job if there is not enough budget for the simulation + if (!accountingService.hasSimulationBudget(userId)) { + job.state = JobState.FAILED + } scenario.job = job portfolio.scenarios.add(scenario) diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt new file mode 100644 index 00000000..13440a81 --- /dev/null +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/service/UserAccountingService.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.server.service + +import org.eclipse.microprofile.config.inject.ConfigProperty +import org.opendc.web.server.model.UserAccounting +import org.opendc.web.server.repository.UserAccountingRepository +import java.time.Duration +import java.time.LocalDate +import java.time.temporal.TemporalAdjusters +import javax.enterprise.context.ApplicationScoped +import javax.inject.Inject +import javax.persistence.EntityExistsException + +/** + * Service for tracking the simulation budget of users. + * + * @param repository The [UserAccountingRepository] used to communicate with the database. + * @param simulationBudget The default simulation budget for new users. + */ +@ApplicationScoped +class UserAccountingService @Inject constructor( + private val repository: UserAccountingRepository, + @ConfigProperty(name = "opendc.accounting.simulation-budget", defaultValue = "2000m") + private val simulationBudget: Duration +) { + /** + * Determine whether the user with [userId] has any remaining simulation budget. + * + * @param userId The unique identifier of the user. + * @return `true` when the user still has budget left, `false` otherwise. + */ + fun hasSimulationBudget(userId: String): Boolean { + val accounting = repository.findForUser(userId) ?: return true + val today = LocalDate.now() + + // The accounting period must be over or there must be budget remaining. + return !today.isBefore(accounting.periodEnd) || accounting.simulationTimeBudget > accounting.simulationTime + } + + /** + * Consume [seconds] from the simulation budget of the user with [userId]. + * + * @param userId The unique identifier of the user. + * @param seconds The seconds to consume from the simulation budget. + * @param `true` if the user has consumed his full budget or `false` if there is still budget remaining. + */ + fun consumeSimulationBudget(userId: String, seconds: Int): Boolean { + val today = LocalDate.now() + val nextAccountingPeriod = today.with(TemporalAdjusters.firstDayOfNextMonth()) + val repository = repository + + // We need to be careful to prevent conflicts in case of concurrency + // 1. First, we try to create the accounting object if it does not exist yet. This may fail if another instance + // creates the object concurrently. + // 2. Second, we check if the budget needs to be reset and try this atomically. + // 3. Finally, we atomically consume the budget from the object + // This is repeated three times in case there is a conflict + repeat(3) { + val accounting = repository.findForUser(userId) + + if (accounting == null) { + try { + val newAccounting = UserAccounting(userId, nextAccountingPeriod, simulationBudget.toSeconds().toInt()) + newAccounting.simulationTime = seconds + repository.save(newAccounting) + + return newAccounting.simulationTime >= newAccounting.simulationTimeBudget + } catch (e: EntityExistsException) { + // Conflict due to concurrency; retry + } + } else { + val success = if (!today.isBefore(accounting.periodEnd)) { + repository.resetBudget(accounting, nextAccountingPeriod, seconds) + } else { + repository.consumeBudget(accounting, seconds) + } + + if (success) { + return accounting.simulationTimeBudget <= accounting.simulationTime + } + } + } + + throw IllegalStateException("Failed to allocate consume budget due to conflict") + } +} diff --git a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt index 4704c5ae..742a510c 100644 --- a/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt +++ b/opendc-web/opendc-web-server/src/main/kotlin/org/opendc/web/server/util/runner/QuarkusJobManager.kt @@ -43,7 +43,7 @@ class QuarkusJobManager @Inject constructor(private val service: JobService) : J @Transactional override fun claim(id: Long): Boolean { return try { - service.updateState(id, JobState.CLAIMED, null) + service.updateState(id, JobState.CLAIMED, 0, null) true } catch (e: IllegalStateException) { false @@ -51,17 +51,18 @@ class QuarkusJobManager @Inject constructor(private val service: JobService) : J } @Transactional - override fun heartbeat(id: Long) { - service.updateState(id, JobState.RUNNING, null) + override fun heartbeat(id: Long, runtime: Int): Boolean { + val res = service.updateState(id, JobState.RUNNING, runtime, null) + return res?.state != JobState.FAILED } @Transactional - override fun fail(id: Long) { - service.updateState(id, JobState.FAILED, null) + override fun fail(id: Long, runtime: Int) { + service.updateState(id, JobState.FAILED, runtime, null) } @Transactional - override fun finish(id: Long, results: Map) { - service.updateState(id, JobState.FINISHED, results) + override fun finish(id: Long, runtime: Int, results: Map) { + service.updateState(id, JobState.FINISHED, runtime, results) } } diff --git a/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql b/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql index 183a70ea..1a0e4046 100644 --- a/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql +++ b/opendc-web/opendc-web-server/src/main/resources/db/migration/V1.0.0__core.sql @@ -65,15 +65,27 @@ create table scenarios create table jobs ( - id bigint not null, - created_at timestamp not null, - repeats integer not null, + id bigint not null, + created_by varchar(255) not null, + created_at timestamp not null, + repeats integer not null, results jsonb, - state integer not null, - updated_at timestamp not null, + state integer not null, + runtime integer not null, + updated_at timestamp not null, primary key (id) ); +-- User accounting +create table user_accounting +( + user_id varchar(255) not null, + period_end date not null, + simulation_time integer not null, + simulation_time_budget integer not null, + primary key (user_id) +); + -- Workload traces available to the user. create table traces ( -- cgit v1.2.3