diff options
Diffstat (limited to 'opendc-web/opendc-web-runner')
4 files changed, 50 insertions, 37 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt index 50aa03d8..d6c06889 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt @@ -42,18 +42,22 @@ public interface JobManager { /** * Update the heartbeat of the specified job. + * + * @param id The identifier of the job. + * @param runtime The total runtime of the job. + * @return `true` if the job can continue, `false` if the job has been cancelled. */ - public fun heartbeat(id: Long) + public fun heartbeat(id: Long, runtime: Int): Boolean /** * Mark the job as failed. */ - public fun fail(id: Long) + public fun fail(id: Long, runtime: Int) /** * Persist the specified results for the specified job. */ - public fun finish(id: Long, results: Map<String, Any>) + public fun finish(id: Long, runtime: Int, results: Map<String, Any>) public companion object { /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 226bad47..1bc4e938 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -48,6 +48,8 @@ import org.opendc.web.proto.runner.Topology import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Random import java.util.UUID import java.util.concurrent.Executors @@ -72,7 +74,7 @@ public class OpenDCRunner( private val manager: JobManager, private val tracePath: File, parallelism: Int = Runtime.getRuntime().availableProcessors(), - private val jobTimeout: Duration = Duration.ofMillis(10), + private val jobTimeout: Duration = Duration.ofMinutes(10), private val pollInterval: Duration = Duration.ofSeconds(30), private val heartbeatInterval: Duration = Duration.ofMinutes(1) ) : Runnable { @@ -144,9 +146,15 @@ public class OpenDCRunner( override fun compute() { val id = job.id val scenario = job.scenario + val startTime = Instant.now() + val currentThread = Thread.currentThread() val heartbeat = scheduler.scheduleWithFixedDelay( - { manager.heartbeat(id) }, + { + if (!manager.heartbeat(id, startTime.secondsSince())) { + currentThread.interrupt() + } + }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS @@ -163,12 +171,14 @@ public class OpenDCRunner( } val results = invokeAll(jobs).map { it.rawResult } - logger.info { "Finished simulation for job $id" } - heartbeat.cancel(true) + val duration = startTime.secondsSince() + logger.info { "Finished simulation for job $id (in $duration seconds)" } + manager.finish( id, + duration, mapOf( "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, "total_granted_burst" to results.map { it.totalActiveTime }, @@ -190,19 +200,26 @@ public class OpenDCRunner( } catch (e: Exception) { // Check whether the job failed due to exceeding its time budget if (Thread.interrupted()) { - logger.info { "Simulation job $id exceeded time limit" } + logger.info { "Simulation job $id exceeded time limit (${startTime.secondsSince()} seconds)" } } else { logger.info(e) { "Simulation job $id failed" } } try { heartbeat.cancel(true) - manager.fail(id) + manager.fail(id, startTime.secondsSince()) } catch (e: Throwable) { logger.error(e) { "Failed to update job" } } } } + + /** + * Calculate the seconds since the specified instant. + */ + private fun Instant.secondsSince(): Int { + return ChronoUnit.SECONDS.between(this, Instant.now()).toInt() + } } /** diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt index 39a6851c..5b1b7132 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt @@ -37,22 +37,23 @@ internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManag override fun claim(id: Long): Boolean { return try { - client.jobs.update(id, Job.Update(JobState.CLAIMED)) + client.jobs.update(id, Job.Update(JobState.CLAIMED, 0)) true } catch (e: IllegalStateException) { false } } - override fun heartbeat(id: Long) { - client.jobs.update(id, Job.Update(JobState.RUNNING)) + override fun heartbeat(id: Long, runtime: Int): Boolean { + val res = client.jobs.update(id, Job.Update(JobState.RUNNING, runtime)) + return res?.state != JobState.FAILED } - override fun fail(id: Long) { - client.jobs.update(id, Job.Update(JobState.FAILED)) + override fun fail(id: Long, runtime: Int) { + client.jobs.update(id, Job.Update(JobState.FAILED, runtime)) } - override fun finish(id: Long, results: Map<String, Any>) { - client.jobs.update(id, Job.Update(JobState.FINISHED, results)) + override fun finish(id: Long, runtime: Int, results: Map<String, Any>) { + client.jobs.update(id, Job.Update(JobState.FINISHED, runtime)) } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 4db70d3d..d6722115 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -24,8 +24,9 @@ package org.opendc.web.runner.internal import org.opendc.experiments.compute.telemetry.ComputeMonitor import org.opendc.experiments.compute.telemetry.table.HostTableReader +import org.opendc.experiments.compute.telemetry.table.ServiceData import org.opendc.experiments.compute.telemetry.table.ServiceTableReader -import kotlin.math.max +import org.opendc.experiments.compute.telemetry.table.toServiceData import kotlin.math.roundToLong /** @@ -76,30 +77,20 @@ internal class WebComputeMonitor : ComputeMonitor { val count: Long ) - private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + private lateinit var serviceData: ServiceData override fun record(reader: ServiceTableReader) { - serviceMetrics = AggregateServiceMetrics( - max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), - max(reader.serversPending, serviceMetrics.vmWaitingCount), - max(reader.serversActive, serviceMetrics.vmActiveCount), - max(0, serviceMetrics.vmInactiveCount), - max(reader.attemptsFailure, serviceMetrics.vmFailedCount) - ) + serviceData = reader.toServiceData() } - private data class AggregateServiceMetrics( - val vmTotalCount: Int = 0, - val vmWaitingCount: Int = 0, - val vmActiveCount: Int = 0, - val vmInactiveCount: Int = 0, - val vmFailedCount: Int = 0 - ) - /** * Collect the results of the simulation. */ fun collectResults(): Results { + val hostAggregateMetrics = hostAggregateMetrics + val hostMetrics = hostMetrics + val serviceData = serviceData + return Results( hostAggregateMetrics.totalActiveTime, hostAggregateMetrics.totalIdleTime, @@ -112,10 +103,10 @@ internal class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalPowerDraw, hostAggregateMetrics.totalFailureSlices.roundToLong(), hostAggregateMetrics.totalFailureVmSlices.roundToLong(), - serviceMetrics.vmTotalCount, - serviceMetrics.vmWaitingCount, - serviceMetrics.vmInactiveCount, - serviceMetrics.vmFailedCount + serviceData.serversTotal, + serviceData.serversPending, + serviceData.serversTotal - serviceData.serversPending - serviceData.serversActive, + serviceData.attemptsError + serviceData.attemptsFailure ) } |
