summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-web/opendc-web-runner')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt252
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt325
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt (renamed from opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt)10
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt (renamed from opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt)24
4 files changed, 356 insertions, 255 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 561dcd59..7bf7e220 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -25,41 +25,19 @@ package org.opendc.web.runner
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.long
-import kotlinx.coroutines.*
+import com.github.ajalt.clikt.parameters.types.int
import mu.KotlinLogging
-import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
-import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.compute.workload.topology.Topology
-import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.util.VmInterferenceModelReader
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.web.client.auth.OpenIdAuthController
import org.opendc.web.client.runner.OpenDCRunnerClient
-import org.opendc.web.proto.runner.Job
-import org.opendc.web.proto.runner.Scenario
import java.io.File
import java.net.URI
-import java.time.Duration
-import java.util.*
-import org.opendc.web.proto.runner.Topology as ClientTopology
private val logger = KotlinLogging.logger {}
/**
* Represents the CLI command for starting the OpenDC web runner.
*/
-class RunnerCli : CliktCommand(name = "runner") {
+class RunnerCli : CliktCommand(name = "opendc-runner") {
/**
* The URL to the OpenDC API.
*/
@@ -123,231 +101,23 @@ class RunnerCli : CliktCommand(name = "runner") {
.defaultLazy { File("traces/") }
/**
- * The maximum duration of a single experiment run.
+ * The number of threads used for simulations..
*/
- private val runTimeout by option(
- "--run-timeout",
- help = "maximum duration of experiment in seconds",
- envvar = "OPENDC_RUN_TIMEOUT"
+ private val parallelism by option(
+ "--parallelism",
+ help = "maximum number of threads for simulations",
)
- .long()
- .default(60L * 3) // Experiment may run for a maximum of three minutes
+ .int()
+ .default(Runtime.getRuntime().availableProcessors() - 1)
- /**
- * Run a simulation job.
- */
- private suspend fun runJob(job: Job, topology: Topology): List<WebComputeMetricExporter.Result> {
- val id = job.id
- val scenario = job.scenario
-
- logger.info { "Constructing performance interference model" }
-
- val workloadLoader = ComputeWorkloadLoader(tracePath)
- val interferenceModel = let {
- val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json")
- val enabled = scenario.phenomena.interference
-
- if (!enabled || !path.exists()) {
- return@let null
- }
-
- VmInterferenceModelReader().read(path.inputStream())
- }
-
- val results = (0 until scenario.portfolio.targets.repeats).map { repeat ->
- logger.info { "Starting repeat $repeat" }
- withTimeout(runTimeout * 1000) {
- runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong()))
- }
- }
-
- logger.info { "Finished simulation for scenario $id" }
-
- return results
- }
-
- /**
- * Run a single repeat.
- */
- private suspend fun runRepeat(
- scenario: Scenario,
- repeat: Int,
- topology: Topology,
- workloadLoader: ComputeWorkloadLoader,
- interferenceModel: VmInterferenceModel?
- ): WebComputeMetricExporter.Result {
- val exporter = WebComputeMetricExporter()
-
- try {
- runBlockingSimulation {
- val workloadName = scenario.workload.trace.id
- val workloadFraction = scenario.workload.samplingFraction
-
- val seeder = Random(repeat.toLong())
-
- val phenomena = scenario.phenomena
- val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder)
- val workload = trace(workloadName).sampleByLoad(workloadFraction)
-
- val failureModel =
- if (phenomena.failures)
- grid5000(Duration.ofDays(7))
- else
- null
-
- val telemetry = SdkTelemetryManager(clock)
- val simulator = ComputeServiceHelper(
- coroutineContext,
- clock,
- telemetry,
- computeScheduler,
- failureModel,
- interferenceModel
- )
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
-
- try {
- // Instantiate the topology onto the simulator
- simulator.apply(topology)
- // Run workload trace
- simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong())
-
- val serviceMetrics = collectServiceMetrics(telemetry.metricProducer)
- logger.debug {
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- }
- } finally {
- simulator.close()
- telemetry.close()
- }
- }
- } catch (cause: Throwable) {
- logger.warn(cause) { "Experiment failed" }
- }
-
- return exporter.getResult()
- }
-
- private val POLL_INTERVAL = 30000L // ms = 30 s
- private val HEARTBEAT_INTERVAL = 60000L // ms = 1 min
-
- override fun run(): Unit = runBlocking(Dispatchers.Default) {
+ override fun run() {
logger.info { "Starting OpenDC web runner" }
val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience))
- val manager = ScenarioManager(client)
+ val runner = OpenDCRunner(client, tracePath, parallelism = parallelism)
logger.info { "Watching for queued scenarios" }
-
- while (true) {
- val job = manager.findNext()
- if (job == null) {
- delay(POLL_INTERVAL)
- continue
- }
-
- val id = job.id
-
- logger.info { "Found queued scenario $id: attempting to claim" }
-
- if (!manager.claim(id)) {
- logger.info { "Failed to claim scenario" }
- continue
- }
-
- coroutineScope {
- // Launch heartbeat process
- val heartbeat = launch {
- while (true) {
- manager.heartbeat(id)
- delay(HEARTBEAT_INTERVAL)
- }
- }
-
- try {
- val environment = convert(job.scenario.topology)
- val results = runJob(job, environment)
-
- logger.info { "Writing results to database" }
-
- manager.finish(id, results)
-
- logger.info { "Successfully finished scenario $id" }
- } catch (e: Exception) {
- logger.error(e) { "Scenario failed to finish" }
- manager.fail(id)
- } finally {
- heartbeat.cancel()
- }
- }
- }
- }
-
- /**
- * Convert the specified [topology] into an [Topology] understood by OpenDC.
- */
- private fun convert(topology: ClientTopology): Topology {
- return object : Topology {
-
- override fun resolve(): List<HostSpec> {
- val res = mutableListOf<HostSpec>()
- val random = Random(0)
-
- val machines = topology.rooms.asSequence()
- .flatMap { room ->
- room.tiles.flatMap { tile ->
- val rack = tile.rack
- rack?.machines?.map { machine -> rack to machine } ?: emptyList()
- }
- }
- for ((rack, machine) in machines) {
- val clusterId = rack.id
- val position = machine.position
-
- val processors = machine.cpus.flatMap { cpu ->
- val cores = cpu.numberOfCores
- val speed = cpu.clockRateMhz
- // TODO Remove hard coding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
- List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
- }
- }
- val memoryUnits = machine.memory.map { memory ->
- MemoryUnit(
- "Samsung",
- memory.name,
- memory.speedMbPerS,
- memory.sizeMb.toLong()
- )
- }
-
- val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
- val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
- val powerDriver = SimplePowerDriver(powerModel)
-
- val spec = HostSpec(
- UUID(random.nextLong(), random.nextLong()),
- "node-$clusterId-$position",
- mapOf("cluster" to clusterId),
- MachineModel(processors, memoryUnits),
- powerDriver
- )
-
- res += spec
- }
-
- return res
- }
-
- override fun toString(): String = "WebRunnerTopologyFactory"
- }
+ runner.run()
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
new file mode 100644
index 00000000..bd770574
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -0,0 +1,325 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.*
+import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.compute.workload.topology.Topology
+import org.opendc.compute.workload.topology.apply
+import org.opendc.compute.workload.util.VmInterferenceModelReader
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.web.client.runner.OpenDCRunnerClient
+import org.opendc.web.proto.runner.Job
+import org.opendc.web.proto.runner.Scenario
+import org.opendc.web.runner.internal.JobManager
+import org.opendc.web.runner.internal.WebComputeMetricExporter
+import java.io.File
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.*
+
+/**
+ * Class to execute the pending jobs via the OpenDC web API.
+ *
+ * @param client The [OpenDCRunnerClient] to connect to the OpenDC web API.
+ * @param tracePath The directory where the traces are located.
+ * @param jobTimeout The maximum duration of a simulation job.
+ * @param pollInterval The interval to poll the API with.
+ * @param heartbeatInterval The interval to send a heartbeat to the API server.
+ */
+public class OpenDCRunner(
+ client: OpenDCRunnerClient,
+ private val tracePath: File,
+ parallelism: Int = Runtime.getRuntime().availableProcessors(),
+ private val jobTimeout: Duration = Duration.ofMillis(10),
+ private val pollInterval: Duration = Duration.ofSeconds(30),
+ private val heartbeatInterval: Duration = Duration.ofMinutes(1)
+) : Runnable {
+ /**
+ * Logging instance for this runner.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * Helper class to manage the available jobs.
+ */
+ private val manager = JobManager(client)
+
+ /**
+ * Helper class to load the workloads.
+ */
+ private val workloadLoader = ComputeWorkloadLoader(tracePath)
+
+ /**
+ * The [ForkJoinPool] that is used to execute the simulation jobs.
+ */
+ private val pool = ForkJoinPool(parallelism)
+
+ /**
+ * A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of
+ * individual simulations.
+ */
+ private val scheduler = Executors.newSingleThreadScheduledExecutor()
+
+ /**
+ * Start the runner process.
+ *
+ * This method will block until interrupted and poll the OpenDC API for new jobs to execute.
+ */
+ override fun run() {
+ try {
+ while (true) {
+ // Check if anyone has interrupted the thread
+ if (Thread.interrupted()) {
+ throw InterruptedException()
+ }
+
+ val job = manager.findNext()
+ if (job == null) {
+ Thread.sleep(pollInterval.toMillis())
+ continue
+ }
+
+ val id = job.id
+
+ logger.info { "Found queued job $id: attempting to claim" }
+
+ if (!manager.claim(id)) {
+ logger.info { "Failed to claim scenario" }
+ continue
+ }
+
+ pool.submit(JobAction(job))
+ }
+ } finally {
+ workloadLoader.reset()
+
+ pool.shutdown()
+ scheduler.shutdown()
+
+ pool.awaitTermination(5, TimeUnit.SECONDS)
+ }
+ }
+
+ /**
+ * A [RecursiveAction] that runs a simulation job (consisting of possible multiple simulations).
+ *
+ * @param job The job to simulate.
+ */
+ private inner class JobAction(private val job: Job) : RecursiveAction() {
+ override fun compute() {
+ val id = job.id
+ val scenario = job.scenario
+
+ val heartbeat = scheduler.scheduleWithFixedDelay({ manager.heartbeat(id) }, 0, heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS)
+
+ try {
+ logger.debug { "Constructing performance interference model" }
+
+ val interferenceModel = let {
+ val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json")
+ val enabled = scenario.phenomena.interference
+
+ if (!enabled || !path.exists()) {
+ return@let null
+ }
+
+ VmInterferenceModelReader().read(path.inputStream())
+ }
+
+ val topology = convertTopology(scenario.topology)
+ val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology, interferenceModel) }
+ val results = invokeAll(jobs)
+
+ logger.info { "Finished simulation for job $id" }
+
+ heartbeat.cancel(true)
+ manager.finish(id, results.map { it.rawResult })
+ } catch (e: Exception) {
+ // Check whether the job failed due to exceeding its time budget
+ if (Thread.interrupted()) {
+ logger.info { "Simulation job $id exceeded time limit" }
+ } else {
+ logger.info(e) { "Simulation job $id failed" }
+ }
+
+ try {
+ heartbeat.cancel(true)
+ manager.fail(id)
+ } catch (e: Throwable) {
+ logger.error(e) { "Failed to update job" }
+ }
+ }
+ }
+ }
+
+ /**
+ * A [RecursiveTask] that simulates a single scenario.
+ *
+ * @param scenario The scenario to simulate.
+ * @param repeat The repeat number used to seed the simulation.
+ * @param topology The topology to simulate.
+ * @param interferenceModel The [VmInterferenceModel] used in this scenario.
+ */
+ private inner class SimulationTask(
+ private val scenario: Scenario,
+ private val repeat: Int,
+ private val topology: Topology,
+ private val interferenceModel: VmInterferenceModel?
+ ) : RecursiveTask<WebComputeMetricExporter.Results>() {
+ override fun compute(): WebComputeMetricExporter.Results {
+ val exporter = WebComputeMetricExporter()
+
+ // Schedule task that interrupts the simulation if it runs for too long.
+ val currentThread = Thread.currentThread()
+ val interruptTask = scheduler.schedule({ currentThread.interrupt() }, jobTimeout.toMillis(), TimeUnit.MILLISECONDS)
+
+ try {
+ runBlockingSimulation {
+ val workloadName = scenario.workload.trace.id
+ val workloadFraction = scenario.workload.samplingFraction
+
+ val seeder = Random(repeat.toLong())
+
+ val phenomena = scenario.phenomena
+ val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder)
+ val workload = trace(workloadName).sampleByLoad(workloadFraction)
+
+ val failureModel =
+ if (phenomena.failures)
+ grid5000(Duration.ofDays(7))
+ else
+ null
+
+ val telemetry = SdkTelemetryManager(clock)
+ val simulator = ComputeServiceHelper(
+ coroutineContext,
+ clock,
+ telemetry,
+ computeScheduler,
+ failureModel,
+ interferenceModel
+ )
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
+
+ try {
+ // Instantiate the topology onto the simulator
+ simulator.apply(topology)
+ // Run workload trace
+ simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong())
+
+ val serviceMetrics = collectServiceMetrics(telemetry.metricProducer)
+ logger.debug {
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ }
+ } finally {
+ simulator.close()
+ telemetry.close()
+ }
+ }
+ } finally {
+ interruptTask.cancel(false)
+ }
+
+ return exporter.collectResults()
+ }
+ }
+
+ /**
+ * Convert the specified [topology] into an [Topology] understood by OpenDC.
+ */
+ private fun convertTopology(topology: org.opendc.web.proto.runner.Topology): Topology {
+ return object : Topology {
+
+ override fun resolve(): List<HostSpec> {
+ val res = mutableListOf<HostSpec>()
+ val random = Random(0)
+
+ val machines = topology.rooms.asSequence()
+ .flatMap { room ->
+ room.tiles.flatMap { tile ->
+ val rack = tile.rack
+ rack?.machines?.map { machine -> rack to machine } ?: emptyList()
+ }
+ }
+ for ((rack, machine) in machines) {
+ val clusterId = rack.id
+ val position = machine.position
+
+ val processors = machine.cpus.flatMap { cpu ->
+ val cores = cpu.numberOfCores
+ val speed = cpu.clockRateMhz
+ // TODO Remove hard coding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.memory.map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.name,
+ memory.speedMbPerS,
+ memory.sizeMb.toLong()
+ )
+ }
+
+ val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+ val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
+ val powerDriver = SimplePowerDriver(powerModel)
+
+ val spec = HostSpec(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ MachineModel(processors, memoryUnits),
+ powerDriver
+ )
+
+ res += spec
+ }
+
+ return res
+ }
+
+ override fun toString(): String = "WebRunnerTopologyFactory"
+ }
+ }
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
index 7374f0c9..8de0cee4 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.web.runner
+package org.opendc.web.runner.internal
import org.opendc.web.client.runner.OpenDCRunnerClient
import org.opendc.web.proto.JobState
import org.opendc.web.proto.runner.Job
/**
- * Manages the queue of scenarios that need to be processed.
+ * Helper class to manage the queue of jobs that need to be simulated.
*/
-class ScenarioManager(private val client: OpenDCRunnerClient) {
+internal class JobManager(private val client: OpenDCRunnerClient) {
/**
* Find the next job that the simulator needs to process.
*/
@@ -62,7 +62,7 @@ class ScenarioManager(private val client: OpenDCRunnerClient) {
/**
* Persist the specified results.
*/
- public fun finish(id: Long, results: List<WebComputeMetricExporter.Result>) {
+ fun finish(id: Long, results: List<WebComputeMetricExporter.Results>) {
client.jobs.update(
id,
Job.Update(
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt
index d39a0c74..04437a5f 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.web.runner
+package org.opendc.web.runner.internal
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
@@ -32,7 +32,7 @@ import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-class WebComputeMetricExporter : ComputeMetricExporter() {
+internal class WebComputeMetricExporter : ComputeMetricExporter() {
override fun record(reader: HostTableReader) {
val slices = reader.downtime / SLICE_LENGTH
@@ -60,7 +60,7 @@ class WebComputeMetricExporter : ComputeMetricExporter() {
private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
private val SLICE_LENGTH: Long = 5 * 60L
- data class AggregateHostMetrics(
+ private data class AggregateHostMetrics(
val totalActiveTime: Long = 0L,
val totalIdleTime: Long = 0L,
val totalStealTime: Long = 0L,
@@ -70,7 +70,7 @@ class WebComputeMetricExporter : ComputeMetricExporter() {
val totalFailureVmSlices: Double = 0.0,
)
- data class HostMetrics(
+ private data class HostMetrics(
val cpuUsage: Double,
val cpuDemand: Double,
val instanceCount: Long,
@@ -89,7 +89,7 @@ class WebComputeMetricExporter : ComputeMetricExporter() {
)
}
- data class AggregateServiceMetrics(
+ private data class AggregateServiceMetrics(
val vmTotalCount: Int = 0,
val vmWaitingCount: Int = 0,
val vmActiveCount: Int = 0,
@@ -97,8 +97,11 @@ class WebComputeMetricExporter : ComputeMetricExporter() {
val vmFailedCount: Int = 0
)
- fun getResult(): Result {
- return Result(
+ /**
+ * Collect the results of the simulation.
+ */
+ fun collectResults(): Results {
+ return Results(
hostAggregateMetrics.totalActiveTime,
hostAggregateMetrics.totalIdleTime,
hostAggregateMetrics.totalStealTime,
@@ -117,7 +120,10 @@ class WebComputeMetricExporter : ComputeMetricExporter() {
)
}
- data class Result(
+ /**
+ * Structure of the results of a single simulation.
+ */
+ data class Results(
val totalActiveTime: Long,
val totalIdleTime: Long,
val totalStealTime: Long,