diff options
10 files changed, 117 insertions, 94 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 9c879e5e..e75c31a0 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -43,7 +43,6 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.HOST_ID import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServerData @@ -138,16 +137,13 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - ComputeMetricExporter( - clock, - object : ComputeMonitor { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - stealTime += data.cpuStealTime - } + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime } - ), + }, exportInterval = Duration.ofSeconds(duration) ) @@ -237,22 +233,19 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - ComputeMetricExporter( - clock, - object : ComputeMonitor { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - uptime += data.uptime - downtime += data.downtime - } + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime + } - override fun record(data: ServerData) { - guestUptime += data.uptime - guestDowntime += data.downtime - } + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime } - ), + }, exportInterval = Duration.ofSeconds(duration) ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt index f41a2241..ad182d67 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -22,6 +22,8 @@ package org.opendc.compute.workload.export.parquet +import io.opentelemetry.sdk.common.CompletableResultCode +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServerData @@ -31,7 +33,7 @@ import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { +public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { private val serverWriter = ParquetServerDataWriter( File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize @@ -59,9 +61,11 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int serviceWriter.write(data) } - override fun close() { + override fun shutdown(): CompletableResultCode { hostWriter.close() serviceWriter.close() serverWriter.close() + + return CompletableResultCode.ofSuccess() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 2201a6b4..21ff3ab0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -27,7 +27,7 @@ import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetExportMonitor +import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.PerformanceInterferenceReader @@ -39,7 +39,6 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File @@ -120,12 +119,12 @@ abstract class Portfolio(name: String) : Experiment(name) { performanceInterferenceModel ) - val monitor = ParquetExportMonitor( + val exporter = ParquetComputeMetricExporter( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) - val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, exporter) val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) try { @@ -137,10 +136,9 @@ abstract class Portfolio(name: String) : Experiment(name) { } finally { runner.close() metricReader.close() - monitor.close() } - val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0]) + val monitorResults = collectServiceMetrics(runner.producers[0]) logger.debug { "Scheduler " + "Success=${monitorResults.attemptsSuccess} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index ac2ea646..30cc1466 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -39,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader @@ -54,7 +53,7 @@ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestExperimentReporter + private lateinit var exporter: TestComputeMetricExporter /** * The [FilterScheduler] to use for all experiments. @@ -71,7 +70,7 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - monitor = TestExperimentReporter() + exporter = TestComputeMetricExporter() computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -91,7 +90,7 @@ class CapelinIntegrationTest { computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, exporter) try { runner.apply(topology) @@ -101,7 +100,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0]) + val serviceMetrics = collectServiceMetrics(runner.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -117,11 +116,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223331032, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(67006568, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3159379, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.841120890240688E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -139,7 +138,7 @@ class CapelinIntegrationTest { computeScheduler ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -149,7 +148,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -161,10 +160,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10998110, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740290, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(10998110, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740290, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -188,7 +187,7 @@ class CapelinIntegrationTest { interferenceModel = performanceInterferenceModel ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -198,7 +197,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -210,10 +209,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6013899, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(14724501, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(12530742, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(473394, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -231,7 +230,7 @@ class CapelinIntegrationTest { ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -241,7 +240,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -253,11 +252,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(11134319, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9604081, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559005056, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(11134319, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9604081, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } ) } @@ -277,7 +276,7 @@ class CapelinIntegrationTest { return stream.use { clusterTopology(stream) } } - class TestExperimentReporter : ComputeMonitor { + class TestComputeMetricExporter : ComputeMetricExporter() { var idleTime = 0L var activeTime = 0L var stealTime = 0L diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index e9449634..679d5944 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -55,6 +55,9 @@ public class ComputeMetricAggregator { // ComputeService "scheduler.hosts" -> { for (point in metric.longSumData.points) { + // Record the timestamp for the service + service.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> service.hostsUp = point.value.toInt() "down" -> service.hostsDown = point.value.toInt() @@ -163,12 +166,16 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { + server.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> server.uptime = point.value "down" -> server.downtime = point.value } server.host = agg.host } else { + agg.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> agg.uptime = point.value "down" -> agg.downtime = point.value @@ -197,15 +204,15 @@ public class ComputeMetricAggregator { /** * Collect the data via the [monitor]. */ - public fun collect(now: Instant, monitor: ComputeMonitor) { - monitor.record(_service.collect(now)) + public fun collect(monitor: ComputeMonitor) { + monitor.record(_service.collect()) for (host in _hosts.values) { - monitor.record(host.collect(now)) + monitor.record(host.collect()) } for (server in _servers.values) { - monitor.record(server.collect(now)) + monitor.record(server.collect()) } } @@ -237,6 +244,8 @@ public class ComputeMetricAggregator { * An aggregator for service metrics before they are reported. */ internal class ServiceAggregator { + private var timestamp = Long.MIN_VALUE + @JvmField var hostsUp = 0 @JvmField var hostsDown = 0 @@ -250,7 +259,10 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServiceData = toServiceData(now) + fun collect(): ServiceData { + val now = Instant.ofEpochMilli(timestamp) + return toServiceData(now) + } /** * Convert the aggregator state to an immutable [ServiceData]. @@ -258,6 +270,13 @@ public class ComputeMetricAggregator { private fun toServiceData(now: Instant): ServiceData { return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -275,6 +294,8 @@ public class ComputeMetricAggregator { resource.attributes[HOST_MEM_CAPACITY] ?: 0, ) + private var timestamp = Long.MIN_VALUE + @JvmField var guestsTerminated = 0 @JvmField var guestsRunning = 0 @JvmField var guestsError = 0 @@ -307,7 +328,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): HostData { + fun collect(): HostData { + val now = Instant.ofEpochMilli(timestamp) val data = toHostData(now) // Reset intermediate state for next aggregation @@ -360,6 +382,13 @@ public class ComputeMetricAggregator { if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -385,6 +414,7 @@ public class ComputeMetricAggregator { */ var host: HostInfo? = null + private var timestamp = Long.MIN_VALUE @JvmField var uptime: Long = 0 private var previousUptime = 0L @JvmField var downtime: Long = 0 @@ -404,7 +434,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServerData { + fun collect(): ServerData { + val now = Instant.ofEpochMilli(timestamp) val data = toServerData(now) previousUptime = uptime @@ -439,6 +470,13 @@ public class ComputeMetricAggregator { cpuLostTime - previousCpuLostTime ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } private companion object { diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index ea96f721..580cc6fb 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -25,12 +25,11 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter -import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { +public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { /** * A [ComputeMetricAggregator] that actually performs the aggregation. */ @@ -39,7 +38,8 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor override fun export(metrics: Collection<MetricData>): CompletableResultCode { return try { agg.process(metrics) - agg.collect(clock.instant(), monitor) + agg.collect(this) + CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 25d346fb..ce89061b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,22 +22,13 @@ package org.opendc.telemetry.compute -import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData -import java.time.Instant /** * Collect the metrics of the compute service. */ -public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData { - return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) -} - -/** - * Extract a [ServiceData] object from the specified list of metric data. - */ -public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData { +public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { lateinit var serviceData: ServiceData val agg = ComputeMetricAggregator() val monitor = object : ComputeMonitor { @@ -46,7 +37,7 @@ public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricD } } - agg.process(metrics) - agg.collect(timestamp, monitor) + agg.process(metricProducer.collectAllMetrics()) + agg.collect(monitor) return serviceData } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 40a7ea62..96b300d7 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -41,7 +41,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.web.client.ApiClient @@ -126,7 +125,7 @@ class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMonitor.Result> { + private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> { val id = scenario.id logger.info { "Constructing performance interference model" } @@ -167,8 +166,8 @@ class RunnerCli : CliktCommand(name = "runner") { topology: Topology, workloadLoader: ComputeWorkloadLoader, interferenceModel: VmInterferenceModel? - ): WebComputeMonitor.Result { - val monitor = WebComputeMonitor() + ): WebComputeMetricExporter.Result { + val exporter = WebComputeMetricExporter() try { runBlockingSimulation { @@ -195,7 +194,7 @@ class RunnerCli : CliktCommand(name = "runner") { interferenceModel.takeIf { operational.performanceInterferenceEnabled } ) - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter, exportInterval = Duration.ofHours(1)) try { // Instantiate the topology onto the simulator @@ -207,7 +206,7 @@ class RunnerCli : CliktCommand(name = "runner") { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) logger.debug { "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -221,7 +220,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.warn(cause) { "Experiment failed" } } - return monitor.getResult() + return exporter.getResult() } private val POLL_INTERVAL = 30000L // ms = 30 s diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt index a0c281e8..1ee835a6 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -61,7 +61,7 @@ public class ScenarioManager(private val client: ApiClient) { /** * Persist the specified results. */ - public suspend fun finish(id: String, results: List<WebComputeMonitor.Result>) { + public suspend fun finish(id: String, results: List<WebComputeMetricExporter.Result>) { client.updateJob( id, SimulationState.FINISHED, mapOf( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt index bb412738..7913660d 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt @@ -22,6 +22,7 @@ package org.opendc.web.runner +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServiceData @@ -31,7 +32,7 @@ import kotlin.math.roundToLong /** * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ -class WebComputeMonitor : ComputeMonitor { +class WebComputeMetricExporter : ComputeMetricExporter() { override fun record(data: HostData) { val slices = data.downtime / SLICE_LENGTH @@ -57,7 +58,7 @@ class WebComputeMonitor : ComputeMonitor { private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf() - private val SLICE_LENGTH: Long = 5 * 60 + private val SLICE_LENGTH: Long = 5 * 60L data class AggregateHostMetrics( val totalActiveTime: Long = 0L, |
