From 5fa0cf915ecf643e94a0de972125e8f862308f80 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 22 Sep 2021 11:22:00 +0200 Subject: fix(telemetry): Ensure shutdown of exporter is called This change updates the CoroutineMetricReader to ensure that the exporter is shutdown when the metric reader fails or is shutdown. --- .../sdk/metrics/export/CoroutineMetricReader.kt | 28 +++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 07f0ff7f..1de235e7 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -22,7 +22,6 @@ package org.opendc.telemetry.sdk.metrics.export -import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* @@ -54,24 +53,25 @@ public class CoroutineMetricReader( private val job = scope.launch { val intervalMs = exportInterval.toMillis() - while (isActive) { - delay(intervalMs) + try { + while (isActive) { + delay(intervalMs) - val metrics = mutableListOf() - for (producer in producers) { - metrics.addAll(producer.collectAllMetrics()) - } + val metrics = producers.flatMap(MetricProducer::collectAllMetrics) - try { - val result = exporter.export(metrics) - result.whenComplete { - if (!result.isSuccess) { - logger.trace { "Exporter failed" } + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.warn { "Exporter failed" } + } } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } } - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } } + } finally { + exporter.shutdown() } } -- cgit v1.2.3 From 30cd010f1f98262aa7f264bb3c3eb6028b8495c5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 22 Sep 2021 12:43:01 +0200 Subject: refactor(telemetry): Do not require clock for ComputeMetricExporter This change drops the requirement for a clock parameter when constructing a ComputeMetricExporter, since it will now derive the timestamp from the recorded metrics. --- .../org/opendc/compute/simulator/SimHostTest.kt | 41 +++--- .../export/parquet/ParquetComputeMetricExporter.kt | 71 +++++++++++ .../export/parquet/ParquetExportMonitor.kt | 67 ---------- .../org/opendc/experiments/capelin/Portfolio.kt | 10 +- .../experiments/capelin/CapelinIntegrationTest.kt | 59 +++++---- .../telemetry/compute/ComputeMetricAggregator.kt | 52 ++++++-- .../telemetry/compute/ComputeMetricExporter.kt | 6 +- .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 15 +-- .../src/main/kotlin/org/opendc/web/runner/Main.kt | 13 +- .../org/opendc/web/runner/ScenarioManager.kt | 2 +- .../opendc/web/runner/WebComputeMetricExporter.kt | 137 +++++++++++++++++++++ .../org/opendc/web/runner/WebComputeMonitor.kt | 136 -------------------- 12 files changed, 316 insertions(+), 293 deletions(-) create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt create mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt delete mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 9c879e5e..e75c31a0 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -43,7 +43,6 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.HOST_ID import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServerData @@ -138,16 +137,13 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - ComputeMetricExporter( - clock, - object : ComputeMonitor { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - stealTime += data.cpuStealTime - } + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime } - ), + }, exportInterval = Duration.ofSeconds(duration) ) @@ -237,22 +233,19 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - ComputeMetricExporter( - clock, - object : ComputeMonitor { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - uptime += data.uptime - downtime += data.downtime - } + object : ComputeMetricExporter() { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime + } - override fun record(data: ServerData) { - guestUptime += data.uptime - guestDowntime += data.downtime - } + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime } - ), + }, exportInterval = Duration.ofSeconds(duration) ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt new file mode 100644 index 00000000..ad182d67 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import io.opentelemetry.sdk.common.CompletableResultCode +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(data: ServerData) { + serverWriter.write(data) + } + + override fun record(data: HostData) { + hostWriter.write(data) + } + + override fun record(data: ServiceData) { + serviceWriter.write(data) + } + + override fun shutdown(): CompletableResultCode { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + + return CompletableResultCode.ofSuccess() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt deleted file mode 100644 index f41a2241..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.export.parquet - -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData -import java.io.File - -/** - * A [ComputeMonitor] that logs the events to a Parquet file. - */ -public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { - private val serverWriter = ParquetServerDataWriter( - File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val hostWriter = ParquetHostDataWriter( - File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val serviceWriter = ParquetServiceDataWriter( - File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun record(data: ServerData) { - serverWriter.write(data) - } - - override fun record(data: HostData) { - hostWriter.write(data) - } - - override fun record(data: ServiceData) { - serviceWriter.write(data) - } - - override fun close() { - hostWriter.close() - serviceWriter.close() - serverWriter.close() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 2201a6b4..21ff3ab0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -27,7 +27,7 @@ import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetExportMonitor +import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.PerformanceInterferenceReader @@ -39,7 +39,6 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File @@ -120,12 +119,12 @@ abstract class Portfolio(name: String) : Experiment(name) { performanceInterferenceModel ) - val monitor = ParquetExportMonitor( + val exporter = ParquetComputeMetricExporter( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) - val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, exporter) val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) try { @@ -137,10 +136,9 @@ abstract class Portfolio(name: String) : Experiment(name) { } finally { runner.close() metricReader.close() - monitor.close() } - val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0]) + val monitorResults = collectServiceMetrics(runner.producers[0]) logger.debug { "Scheduler " + "Success=${monitorResults.attemptsSuccess} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index ac2ea646..30cc1466 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -39,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader @@ -54,7 +53,7 @@ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestExperimentReporter + private lateinit var exporter: TestComputeMetricExporter /** * The [FilterScheduler] to use for all experiments. @@ -71,7 +70,7 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - monitor = TestExperimentReporter() + exporter = TestComputeMetricExporter() computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -91,7 +90,7 @@ class CapelinIntegrationTest { computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, exporter) try { runner.apply(topology) @@ -101,7 +100,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0]) + val serviceMetrics = collectServiceMetrics(runner.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -117,11 +116,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223331032, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(67006568, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3159379, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.841120890240688E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -139,7 +138,7 @@ class CapelinIntegrationTest { computeScheduler ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -149,7 +148,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -161,10 +160,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10998110, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740290, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(10998110, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740290, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -188,7 +187,7 @@ class CapelinIntegrationTest { interferenceModel = performanceInterferenceModel ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -198,7 +197,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -210,10 +209,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6013899, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(14724501, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(12530742, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(473394, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -231,7 +230,7 @@ class CapelinIntegrationTest { ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -241,7 +240,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -253,11 +252,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(11134319, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9604081, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559005056, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(11134319, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9604081, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } ) } @@ -277,7 +276,7 @@ class CapelinIntegrationTest { return stream.use { clusterTopology(stream) } } - class TestExperimentReporter : ComputeMonitor { + class TestComputeMetricExporter : ComputeMetricExporter() { var idleTime = 0L var activeTime = 0L var stealTime = 0L diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index e9449634..679d5944 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -55,6 +55,9 @@ public class ComputeMetricAggregator { // ComputeService "scheduler.hosts" -> { for (point in metric.longSumData.points) { + // Record the timestamp for the service + service.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> service.hostsUp = point.value.toInt() "down" -> service.hostsDown = point.value.toInt() @@ -163,12 +166,16 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { + server.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> server.uptime = point.value "down" -> server.downtime = point.value } server.host = agg.host } else { + agg.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> agg.uptime = point.value "down" -> agg.downtime = point.value @@ -197,15 +204,15 @@ public class ComputeMetricAggregator { /** * Collect the data via the [monitor]. */ - public fun collect(now: Instant, monitor: ComputeMonitor) { - monitor.record(_service.collect(now)) + public fun collect(monitor: ComputeMonitor) { + monitor.record(_service.collect()) for (host in _hosts.values) { - monitor.record(host.collect(now)) + monitor.record(host.collect()) } for (server in _servers.values) { - monitor.record(server.collect(now)) + monitor.record(server.collect()) } } @@ -237,6 +244,8 @@ public class ComputeMetricAggregator { * An aggregator for service metrics before they are reported. */ internal class ServiceAggregator { + private var timestamp = Long.MIN_VALUE + @JvmField var hostsUp = 0 @JvmField var hostsDown = 0 @@ -250,7 +259,10 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServiceData = toServiceData(now) + fun collect(): ServiceData { + val now = Instant.ofEpochMilli(timestamp) + return toServiceData(now) + } /** * Convert the aggregator state to an immutable [ServiceData]. @@ -258,6 +270,13 @@ public class ComputeMetricAggregator { private fun toServiceData(now: Instant): ServiceData { return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -275,6 +294,8 @@ public class ComputeMetricAggregator { resource.attributes[HOST_MEM_CAPACITY] ?: 0, ) + private var timestamp = Long.MIN_VALUE + @JvmField var guestsTerminated = 0 @JvmField var guestsRunning = 0 @JvmField var guestsError = 0 @@ -307,7 +328,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): HostData { + fun collect(): HostData { + val now = Instant.ofEpochMilli(timestamp) val data = toHostData(now) // Reset intermediate state for next aggregation @@ -360,6 +382,13 @@ public class ComputeMetricAggregator { if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -385,6 +414,7 @@ public class ComputeMetricAggregator { */ var host: HostInfo? = null + private var timestamp = Long.MIN_VALUE @JvmField var uptime: Long = 0 private var previousUptime = 0L @JvmField var downtime: Long = 0 @@ -404,7 +434,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServerData { + fun collect(): ServerData { + val now = Instant.ofEpochMilli(timestamp) val data = toServerData(now) previousUptime = uptime @@ -439,6 +470,13 @@ public class ComputeMetricAggregator { cpuLostTime - previousCpuLostTime ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } private companion object { diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index ea96f721..580cc6fb 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -25,12 +25,11 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter -import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { +public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { /** * A [ComputeMetricAggregator] that actually performs the aggregation. */ @@ -39,7 +38,8 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor override fun export(metrics: Collection): CompletableResultCode { return try { agg.process(metrics) - agg.collect(clock.instant(), monitor) + agg.collect(this) + CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 25d346fb..ce89061b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,22 +22,13 @@ package org.opendc.telemetry.compute -import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData -import java.time.Instant /** * Collect the metrics of the compute service. */ -public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData { - return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) -} - -/** - * Extract a [ServiceData] object from the specified list of metric data. - */ -public fun extractServiceMetrics(timestamp: Instant, metrics: Collection): ServiceData { +public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { lateinit var serviceData: ServiceData val agg = ComputeMetricAggregator() val monitor = object : ComputeMonitor { @@ -46,7 +37,7 @@ public fun extractServiceMetrics(timestamp: Instant, metrics: Collection { + private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List { val id = scenario.id logger.info { "Constructing performance interference model" } @@ -167,8 +166,8 @@ class RunnerCli : CliktCommand(name = "runner") { topology: Topology, workloadLoader: ComputeWorkloadLoader, interferenceModel: VmInterferenceModel? - ): WebComputeMonitor.Result { - val monitor = WebComputeMonitor() + ): WebComputeMetricExporter.Result { + val exporter = WebComputeMetricExporter() try { runBlockingSimulation { @@ -195,7 +194,7 @@ class RunnerCli : CliktCommand(name = "runner") { interferenceModel.takeIf { operational.performanceInterferenceEnabled } ) - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter, exportInterval = Duration.ofHours(1)) try { // Instantiate the topology onto the simulator @@ -207,7 +206,7 @@ class RunnerCli : CliktCommand(name = "runner") { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) logger.debug { "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -221,7 +220,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.warn(cause) { "Experiment failed" } } - return monitor.getResult() + return exporter.getResult() } private val POLL_INTERVAL = 30000L // ms = 30 s diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt index a0c281e8..1ee835a6 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -61,7 +61,7 @@ public class ScenarioManager(private val client: ApiClient) { /** * Persist the specified results. */ - public suspend fun finish(id: String, results: List) { + public suspend fun finish(id: String, results: List) { client.updateJob( id, SimulationState.FINISHED, mapOf( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt new file mode 100644 index 00000000..7913660d --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServiceData +import kotlin.math.max +import kotlin.math.roundToLong + +/** + * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. + */ +class WebComputeMetricExporter : ComputeMetricExporter() { + override fun record(data: HostData) { + val slices = data.downtime / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalActiveTime + data.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + data.cpuIdleTime, + hostAggregateMetrics.totalStealTime + data.cpuStealTime, + hostAggregateMetrics.totalLostTime + data.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + data.powerTotal, + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices + ) + + hostMetrics.compute(data.host.id) { _, prev -> + HostMetrics( + data.cpuUsage + (prev?.cpuUsage ?: 0.0), + data.cpuDemand + (prev?.cpuDemand ?: 0.0), + data.guestsRunning + (prev?.instanceCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap = mutableMapOf() + private val SLICE_LENGTH: Long = 5 * 60L + + data class AggregateHostMetrics( + val totalActiveTime: Long = 0L, + val totalIdleTime: Long = 0L, + val totalStealTime: Long = 0L, + val totalLostTime: Long = 0L, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, + ) + + data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val instanceCount: Long, + val count: Long + ) + + private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + + override fun record(data: ServiceData) { + serviceMetrics = AggregateServiceMetrics( + max(data.attemptsSuccess, serviceMetrics.vmTotalCount), + max(data.serversPending, serviceMetrics.vmWaitingCount), + max(data.serversActive, serviceMetrics.vmActiveCount), + max(0, serviceMetrics.vmInactiveCount), + max(data.attemptsFailure, serviceMetrics.vmFailedCount), + ) + } + + data class AggregateServiceMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + fun getResult(): Result { + return Result( + hostAggregateMetrics.totalActiveTime, + hostAggregateMetrics.totalIdleTime, + hostAggregateMetrics.totalStealTime, + hostAggregateMetrics.totalLostTime, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), + serviceMetrics.vmTotalCount, + serviceMetrics.vmWaitingCount, + serviceMetrics.vmInactiveCount, + serviceMetrics.vmFailedCount, + ) + } + + data class Result( + val totalActiveTime: Long, + val totalIdleTime: Long, + val totalStealTime: Long, + val totalLostTime: Long, + val meanCpuUsage: Double, + val meanCpuDemand: Double, + val meanNumDeployedImages: Double, + val maxNumDeployedImages: Double, + val totalPowerDraw: Double, + val totalFailureSlices: Long, + val totalFailureVmSlices: Long, + val totalVmsSubmitted: Int, + val totalVmsQueued: Int, + val totalVmsFinished: Int, + val totalVmsFailed: Int + ) +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt deleted file mode 100644 index bb412738..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner - -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServiceData -import kotlin.math.max -import kotlin.math.roundToLong - -/** - * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. - */ -class WebComputeMonitor : ComputeMonitor { - override fun record(data: HostData) { - val slices = data.downtime / SLICE_LENGTH - - hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalActiveTime + data.cpuActiveTime, - hostAggregateMetrics.totalIdleTime + data.cpuIdleTime, - hostAggregateMetrics.totalStealTime + data.cpuStealTime, - hostAggregateMetrics.totalLostTime + data.cpuLostTime, - hostAggregateMetrics.totalPowerDraw + data.powerTotal, - hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices - ) - - hostMetrics.compute(data.host.id) { _, prev -> - HostMetrics( - data.cpuUsage + (prev?.cpuUsage ?: 0.0), - data.cpuDemand + (prev?.cpuDemand ?: 0.0), - data.guestsRunning + (prev?.instanceCount ?: 0), - 1 + (prev?.count ?: 0) - ) - } - } - - private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap = mutableMapOf() - private val SLICE_LENGTH: Long = 5 * 60 - - data class AggregateHostMetrics( - val totalActiveTime: Long = 0L, - val totalIdleTime: Long = 0L, - val totalStealTime: Long = 0L, - val totalLostTime: Long = 0L, - val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Double = 0.0, - val totalFailureVmSlices: Double = 0.0, - ) - - data class HostMetrics( - val cpuUsage: Double, - val cpuDemand: Double, - val instanceCount: Long, - val count: Long - ) - - private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() - - override fun record(data: ServiceData) { - serviceMetrics = AggregateServiceMetrics( - max(data.attemptsSuccess, serviceMetrics.vmTotalCount), - max(data.serversPending, serviceMetrics.vmWaitingCount), - max(data.serversActive, serviceMetrics.vmActiveCount), - max(0, serviceMetrics.vmInactiveCount), - max(data.attemptsFailure, serviceMetrics.vmFailedCount), - ) - } - - data class AggregateServiceMetrics( - val vmTotalCount: Int = 0, - val vmWaitingCount: Int = 0, - val vmActiveCount: Int = 0, - val vmInactiveCount: Int = 0, - val vmFailedCount: Int = 0 - ) - - fun getResult(): Result { - return Result( - hostAggregateMetrics.totalActiveTime, - hostAggregateMetrics.totalIdleTime, - hostAggregateMetrics.totalStealTime, - hostAggregateMetrics.totalLostTime, - hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), - hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, - hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices.roundToLong(), - hostAggregateMetrics.totalFailureVmSlices.roundToLong(), - serviceMetrics.vmTotalCount, - serviceMetrics.vmWaitingCount, - serviceMetrics.vmInactiveCount, - serviceMetrics.vmFailedCount, - ) - } - - data class Result( - val totalActiveTime: Long, - val totalIdleTime: Long, - val totalStealTime: Long, - val totalLostTime: Long, - val meanCpuUsage: Double, - val meanCpuDemand: Double, - val meanNumDeployedImages: Double, - val maxNumDeployedImages: Double, - val totalPowerDraw: Double, - val totalFailureSlices: Long, - val totalFailureVmSlices: Long, - val totalVmsSubmitted: Int, - val totalVmsQueued: Int, - val totalVmsFinished: Int, - val totalVmsFailed: Int - ) -} -- cgit v1.2.3 From 3b928a66d992ec32ff28bfd4c11b0c7b2ac631ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Sep 2021 14:44:46 +0200 Subject: fix(compute): Do not recover guests in non-error state --- .../main/kotlin/org/opendc/compute/simulator/SimHost.kt | 4 ++-- .../kotlin/org/opendc/compute/simulator/internal/Guest.kt | 15 +++++++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index ff55c585..fdb3f1dc 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -262,7 +262,7 @@ public class SimHost( } override suspend fun delete(server: Server) { - val guest = guests.remove(server) ?: return + val guest = guests[server] ?: return guest.terminate() } @@ -296,7 +296,7 @@ public class SimHost( _bootTime = clock.millis() for (guest in guests.values) { - guest.start() + guest.recover() } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 90562e2f..7f33154a 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -96,8 +96,8 @@ internal class Guest( } ServerState.RUNNING -> return ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") + logger.warn { "User tried to start deleted server" } + throw IllegalArgumentException("Server is deleted") } else -> assert(false) { "Invalid state transition" } } @@ -143,6 +143,17 @@ internal class Guest( doStop(ServerState.ERROR) } + /** + * Recover the guest if it is in an error state. + */ + suspend fun recover() { + if (state != ServerState.ERROR) { + return + } + + doStart() + } + /** * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. */ -- cgit v1.2.3 From 6e0ea49e4d88a79db576dd281a8d4f17b819e5ae Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Sep 2021 14:45:18 +0200 Subject: fix(compute): Write null values explicitly in Parquet exporter --- .../opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt | 4 +--- .../opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 37066a0d..98a0739e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -54,9 +54,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : builder["uptime"] = data.uptime builder["downtime"] = data.downtime val bootTime = data.bootTime - if (bootTime != null) { - builder["boot_time"] = bootTime.toEpochMilli() - } + builder["boot_time"] = bootTime?.toEpochMilli() builder["cpu_count"] = data.host.cpuCount builder["cpu_limit"] = data.cpuLimit diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index bea23d32..0d11ec23 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -56,9 +56,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : builder["uptime"] = data.uptime builder["downtime"] = data.downtime val bootTime = data.bootTime - if (bootTime != null) { - builder["boot_time"] = bootTime.toEpochMilli() - } + builder["boot_time"] = bootTime?.toEpochMilli() builder["scheduling_latency"] = data.schedulingLatency builder["cpu_count"] = data.server.cpuCount -- cgit v1.2.3 From 94d8ee69e52dcd375a662a08c198aa29670362fb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Sep 2021 14:46:57 +0200 Subject: fix(telemetry): Report cause of compute exporter failure --- .../kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt | 2 +- .../kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index 679d5944..738ec38b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -412,7 +412,7 @@ public class ComputeMetricAggregator { /** * The [HostInfo] of the host on which the server is hosted. */ - var host: HostInfo? = null + @JvmField var host: HostInfo? = null private var timestamp = Long.MIN_VALUE @JvmField var uptime: Long = 0 diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 580cc6fb..3ab6c7b2 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -25,11 +25,17 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter +import mu.KotlinLogging /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { + /** + * The logging instance for this exporter. + */ + private val logger = KotlinLogging.logger {} + /** * A [ComputeMetricAggregator] that actually performs the aggregation. */ @@ -42,6 +48,7 @@ public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { CompletableResultCode.ofSuccess() } catch (e: Throwable) { + logger.warn(e) { "Failed to export results" } CompletableResultCode.ofFailure() } } -- cgit v1.2.3