diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-10 15:57:05 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-10 15:57:05 +0200 |
| commit | a832ea376e360f3029036a9570c244fb9080e91f (patch) | |
| tree | a54e3f9d3a44a0248ef1b17430eed9b5d47ff5de /opendc-web/opendc-web-runner | |
| parent | 7ba3b953300c46b4e3afcde17cd3dd14b1af8406 (diff) | |
| parent | 4ebe2f28ba940aabdaa1f57653fbe86a91582ebd (diff) | |
merge: Add support for accounting of user simulation time (#108)
This pull request adds support for tracking and limiting the simulation time consumed per user.
These changes allow users to consume 60 minutes of shared compute resources for simulations
in OpenDC per month.
Closes #103
## Implementation Notes :hammer_and_pick:
* Limit exception mapper to WebApplicationException
* Add support for accounting simulation time
* Add API for querying user accounting data
* Show monthly simulation budget in UI
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* The web runner now also sends the runtime of the simulation job to the server, in order to
update the consumed simulation time in the database.
Diffstat (limited to 'opendc-web/opendc-web-runner')
4 files changed, 50 insertions, 37 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt index 50aa03d8..d6c06889 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt @@ -42,18 +42,22 @@ public interface JobManager { /** * Update the heartbeat of the specified job. + * + * @param id The identifier of the job. + * @param runtime The total runtime of the job. + * @return `true` if the job can continue, `false` if the job has been cancelled. */ - public fun heartbeat(id: Long) + public fun heartbeat(id: Long, runtime: Int): Boolean /** * Mark the job as failed. */ - public fun fail(id: Long) + public fun fail(id: Long, runtime: Int) /** * Persist the specified results for the specified job. */ - public fun finish(id: Long, results: Map<String, Any>) + public fun finish(id: Long, runtime: Int, results: Map<String, Any>) public companion object { /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 226bad47..1bc4e938 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -48,6 +48,8 @@ import org.opendc.web.proto.runner.Topology import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Random import java.util.UUID import java.util.concurrent.Executors @@ -72,7 +74,7 @@ public class OpenDCRunner( private val manager: JobManager, private val tracePath: File, parallelism: Int = Runtime.getRuntime().availableProcessors(), - private val jobTimeout: Duration = Duration.ofMillis(10), + private val jobTimeout: Duration = Duration.ofMinutes(10), private val pollInterval: Duration = Duration.ofSeconds(30), private val heartbeatInterval: Duration = Duration.ofMinutes(1) ) : Runnable { @@ -144,9 +146,15 @@ public class OpenDCRunner( override fun compute() { val id = job.id val scenario = job.scenario + val startTime = Instant.now() + val currentThread = Thread.currentThread() val heartbeat = scheduler.scheduleWithFixedDelay( - { manager.heartbeat(id) }, + { + if (!manager.heartbeat(id, startTime.secondsSince())) { + currentThread.interrupt() + } + }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS @@ -163,12 +171,14 @@ public class OpenDCRunner( } val results = invokeAll(jobs).map { it.rawResult } - logger.info { "Finished simulation for job $id" } - heartbeat.cancel(true) + val duration = startTime.secondsSince() + logger.info { "Finished simulation for job $id (in $duration seconds)" } + manager.finish( id, + duration, mapOf( "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, "total_granted_burst" to results.map { it.totalActiveTime }, @@ -190,19 +200,26 @@ public class OpenDCRunner( } catch (e: Exception) { // Check whether the job failed due to exceeding its time budget if (Thread.interrupted()) { - logger.info { "Simulation job $id exceeded time limit" } + logger.info { "Simulation job $id exceeded time limit (${startTime.secondsSince()} seconds)" } } else { logger.info(e) { "Simulation job $id failed" } } try { heartbeat.cancel(true) - manager.fail(id) + manager.fail(id, startTime.secondsSince()) } catch (e: Throwable) { logger.error(e) { "Failed to update job" } } } } + + /** + * Calculate the seconds since the specified instant. + */ + private fun Instant.secondsSince(): Int { + return ChronoUnit.SECONDS.between(this, Instant.now()).toInt() + } } /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt index 39a6851c..5b1b7132 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt @@ -37,22 +37,23 @@ internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManag override fun claim(id: Long): Boolean { return try { - client.jobs.update(id, Job.Update(JobState.CLAIMED)) + client.jobs.update(id, Job.Update(JobState.CLAIMED, 0)) true } catch (e: IllegalStateException) { false } } - override fun heartbeat(id: Long) { - client.jobs.update(id, Job.Update(JobState.RUNNING)) + override fun heartbeat(id: Long, runtime: Int): Boolean { + val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime)) + return res?.state != JobState.FAILED } - override fun fail(id: Long) { - client.jobs.update(id, Job.Update(JobState.FAILED)) + override fun fail(id: Long, runtime: Int) { + client.jobs.update(id, Job.Update(JobState.FAILED, runtime)) } - override fun finish(id: Long, results: Map<String, Any>) { - client.jobs.update(id, Job.Update(JobState.FINISHED, results)) + override fun finish(id: Long, runtime: Int, results: Map<String, Any>) { + client.jobs.update(id, Job.Update(JobState.FINISHED, runtime)) } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 4db70d3d..d6722115 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -24,8 +24,9 @@ package org.opendc.web.runner.internal import org.opendc.experiments.compute.telemetry.ComputeMonitor import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceData import org.opendc.experiments.compute.telemetry.table.ServiceTableReader -import kotlin.math.max +import org.opendc.experiments.compute.telemetry.table.toServiceData import kotlin.math.roundToLong /** @@ -76,30 +77,20 @@ internal class WebComputeMonitor : ComputeMonitor { val count: Long ) - private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + private lateinit var serviceData: ServiceData override fun record(reader: ServiceTableReader) { - serviceMetrics = AggregateServiceMetrics( - max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), - max(reader.serversPending, serviceMetrics.vmWaitingCount), - max(reader.serversActive, serviceMetrics.vmActiveCount), - max(0, serviceMetrics.vmInactiveCount), - max(reader.attemptsFailure, serviceMetrics.vmFailedCount) - ) + serviceData = reader.toServiceData() } - private data class AggregateServiceMetrics( - val vmTotalCount: Int = 0, - val vmWaitingCount: Int = 0, - val vmActiveCount: Int = 0, - val vmInactiveCount: Int = 0, - val vmFailedCount: Int = 0 - ) - /** * Collect the results of the simulation. */ fun collectResults(): Results { + val hostAggregateMetrics = hostAggregateMetrics + val hostMetrics = hostMetrics + val serviceData = serviceData + return Results( hostAggregateMetrics.totalActiveTime, hostAggregateMetrics.totalIdleTime, @@ -112,10 +103,10 @@ internal class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalPowerDraw, hostAggregateMetrics.totalFailureSlices.roundToLong(), hostAggregateMetrics.totalFailureVmSlices.roundToLong(), - serviceMetrics.vmTotalCount, - serviceMetrics.vmWaitingCount, - serviceMetrics.vmInactiveCount, - serviceMetrics.vmFailedCount + serviceData.serversTotal, + serviceData.serversPending, + serviceData.serversTotal - serviceData.serversPending - serviceData.serversActive, + serviceData.attemptsError + serviceData.attemptsFailure ) } |
