summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-10 15:57:05 +0200
committerGitHub <noreply@github.com>2022-10-10 15:57:05 +0200
commita832ea376e360f3029036a9570c244fb9080e91f (patch)
treea54e3f9d3a44a0248ef1b17430eed9b5d47ff5de /opendc-web/opendc-web-runner
parent7ba3b953300c46b4e3afcde17cd3dd14b1af8406 (diff)
parent4ebe2f28ba940aabdaa1f57653fbe86a91582ebd (diff)
merge: Add support for accounting of user simulation time (#108)
This pull request adds support for tracking and limiting the simulation time consumed per user. These changes allow users to consume 60 minutes of shared compute resources for simulations in OpenDC per month. Closes #103 ## Implementation Notes :hammer_and_pick: * Limit exception mapper to WebApplicationException * Add support for accounting simulation time * Add API for querying user accounting data * Show monthly simulation budget in UI ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * The web runner now also sends the runtime of the simulation job to the server, in order to update the consumed simulation time in the database.
Diffstat (limited to 'opendc-web/opendc-web-runner')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt10
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt29
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt15
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt33
4 files changed, 50 insertions, 37 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
index 50aa03d8..d6c06889 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
@@ -42,18 +42,22 @@ public interface JobManager {
/**
* Update the heartbeat of the specified job.
+ *
+ * @param id The identifier of the job.
+ * @param runtime The total runtime of the job.
+ * @return `true` if the job can continue, `false` if the job has been cancelled.
*/
- public fun heartbeat(id: Long)
+ public fun heartbeat(id: Long, runtime: Int): Boolean
/**
* Mark the job as failed.
*/
- public fun fail(id: Long)
+ public fun fail(id: Long, runtime: Int)
/**
* Persist the specified results for the specified job.
*/
- public fun finish(id: Long, results: Map<String, Any>)
+ public fun finish(id: Long, runtime: Int, results: Map<String, Any>)
public companion object {
/**
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 226bad47..1bc4e938 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -48,6 +48,8 @@ import org.opendc.web.proto.runner.Topology
import org.opendc.web.runner.internal.WebComputeMonitor
import java.io.File
import java.time.Duration
+import java.time.Instant
+import java.time.temporal.ChronoUnit
import java.util.Random
import java.util.UUID
import java.util.concurrent.Executors
@@ -72,7 +74,7 @@ public class OpenDCRunner(
private val manager: JobManager,
private val tracePath: File,
parallelism: Int = Runtime.getRuntime().availableProcessors(),
- private val jobTimeout: Duration = Duration.ofMillis(10),
+ private val jobTimeout: Duration = Duration.ofMinutes(10),
private val pollInterval: Duration = Duration.ofSeconds(30),
private val heartbeatInterval: Duration = Duration.ofMinutes(1)
) : Runnable {
@@ -144,9 +146,15 @@ public class OpenDCRunner(
override fun compute() {
val id = job.id
val scenario = job.scenario
+ val startTime = Instant.now()
+ val currentThread = Thread.currentThread()
val heartbeat = scheduler.scheduleWithFixedDelay(
- { manager.heartbeat(id) },
+ {
+ if (!manager.heartbeat(id, startTime.secondsSince())) {
+ currentThread.interrupt()
+ }
+ },
0,
heartbeatInterval.toMillis(),
TimeUnit.MILLISECONDS
@@ -163,12 +171,14 @@ public class OpenDCRunner(
}
val results = invokeAll(jobs).map { it.rawResult }
- logger.info { "Finished simulation for job $id" }
-
heartbeat.cancel(true)
+ val duration = startTime.secondsSince()
+ logger.info { "Finished simulation for job $id (in $duration seconds)" }
+
manager.finish(
id,
+ duration,
mapOf(
"total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
"total_granted_burst" to results.map { it.totalActiveTime },
@@ -190,19 +200,26 @@ public class OpenDCRunner(
} catch (e: Exception) {
// Check whether the job failed due to exceeding its time budget
if (Thread.interrupted()) {
- logger.info { "Simulation job $id exceeded time limit" }
+ logger.info { "Simulation job $id exceeded time limit (${startTime.secondsSince()} seconds)" }
} else {
logger.info(e) { "Simulation job $id failed" }
}
try {
heartbeat.cancel(true)
- manager.fail(id)
+ manager.fail(id, startTime.secondsSince())
} catch (e: Throwable) {
logger.error(e) { "Failed to update job" }
}
}
}
+
+ /**
+ * Calculate the seconds since the specified instant.
+ */
+ private fun Instant.secondsSince(): Int {
+ return ChronoUnit.SECONDS.between(this, Instant.now()).toInt()
+ }
}
/**
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
index 39a6851c..5b1b7132 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
@@ -37,22 +37,23 @@ internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManag
override fun claim(id: Long): Boolean {
return try {
- client.jobs.update(id, Job.Update(JobState.CLAIMED))
+ client.jobs.update(id, Job.Update(JobState.CLAIMED, 0))
true
} catch (e: IllegalStateException) {
false
}
}
- override fun heartbeat(id: Long) {
- client.jobs.update(id, Job.Update(JobState.RUNNING))
+ override fun heartbeat(id: Long, runtime: Int): Boolean {
+ val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime))
+ return res?.state != JobState.FAILED
}
- override fun fail(id: Long) {
- client.jobs.update(id, Job.Update(JobState.FAILED))
+ override fun fail(id: Long, runtime: Int) {
+ client.jobs.update(id, Job.Update(JobState.FAILED, runtime))
}
- override fun finish(id: Long, results: Map<String, Any>) {
- client.jobs.update(id, Job.Update(JobState.FINISHED, results))
+ override fun finish(id: Long, runtime: Int, results: Map<String, Any>) {
+ client.jobs.update(id, Job.Update(JobState.FINISHED, runtime))
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
index 4db70d3d..d6722115 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
@@ -24,8 +24,9 @@ package org.opendc.web.runner.internal
import org.opendc.experiments.compute.telemetry.ComputeMonitor
import org.opendc.experiments.compute.telemetry.table.HostTableReader
+import org.opendc.experiments.compute.telemetry.table.ServiceData
import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
-import kotlin.math.max
+import org.opendc.experiments.compute.telemetry.table.toServiceData
import kotlin.math.roundToLong
/**
@@ -76,30 +77,20 @@ internal class WebComputeMonitor : ComputeMonitor {
val count: Long
)
- private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics()
+ private lateinit var serviceData: ServiceData
override fun record(reader: ServiceTableReader) {
- serviceMetrics = AggregateServiceMetrics(
- max(reader.attemptsSuccess, serviceMetrics.vmTotalCount),
- max(reader.serversPending, serviceMetrics.vmWaitingCount),
- max(reader.serversActive, serviceMetrics.vmActiveCount),
- max(0, serviceMetrics.vmInactiveCount),
- max(reader.attemptsFailure, serviceMetrics.vmFailedCount)
- )
+ serviceData = reader.toServiceData()
}
- private data class AggregateServiceMetrics(
- val vmTotalCount: Int = 0,
- val vmWaitingCount: Int = 0,
- val vmActiveCount: Int = 0,
- val vmInactiveCount: Int = 0,
- val vmFailedCount: Int = 0
- )
-
/**
* Collect the results of the simulation.
*/
fun collectResults(): Results {
+ val hostAggregateMetrics = hostAggregateMetrics
+ val hostMetrics = hostMetrics
+ val serviceData = serviceData
+
return Results(
hostAggregateMetrics.totalActiveTime,
hostAggregateMetrics.totalIdleTime,
@@ -112,10 +103,10 @@ internal class WebComputeMonitor : ComputeMonitor {
hostAggregateMetrics.totalPowerDraw,
hostAggregateMetrics.totalFailureSlices.roundToLong(),
hostAggregateMetrics.totalFailureVmSlices.roundToLong(),
- serviceMetrics.vmTotalCount,
- serviceMetrics.vmWaitingCount,
- serviceMetrics.vmInactiveCount,
- serviceMetrics.vmFailedCount
+ serviceData.serversTotal,
+ serviceData.serversPending,
+ serviceData.serversTotal - serviceData.serversPending - serviceData.serversActive,
+ serviceData.attemptsError + serviceData.attemptsFailure
)
}