summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt21
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt)8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt40
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt148
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts1
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt424
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt1
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt29
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt2
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt (renamed from opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt)3
10 files changed, 553 insertions, 124 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index 4b0b343f..21cfdad2 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -26,6 +26,7 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
+import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
@@ -81,9 +82,19 @@ public class ComputeServiceHelper(
}
/**
- * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace].
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
+ *
+ * @param trace The trace to simulate.
+ * @param seed The seed for the simulation.
+ * @param servers A list to which the created servers is added.
+ * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
*/
- public suspend fun run(trace: List<VirtualMachine>, seed: Long, submitImmediately: Boolean = false) {
+ public suspend fun run(
+ trace: List<VirtualMachine>,
+ seed: Long,
+ servers: MutableList<Server>? = null,
+ submitImmediately: Boolean = false
+ ) {
val random = Random(seed)
val injector = failureModel?.createInjector(context, clock, service, random)
val client = service.newClient()
@@ -129,12 +140,14 @@ public class ComputeServiceHelper(
meta = mapOf("workload" to workload)
)
+ servers?.add(server)
+
// Wait for the server reach its end time
val endTime = entry.stopTime.toEpochMilli()
delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
+ // Stop the server after reaching the end-time of the virtual machine
+ server.stop()
}
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt
index a46885f4..6c515118 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt
@@ -22,8 +22,6 @@
package org.opendc.compute.workload.export.parquet
-import io.opentelemetry.sdk.common.CompletableResultCode
-import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.telemetry.compute.table.ServerTableReader
@@ -33,7 +31,7 @@ import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() {
+public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
private val serverWriter = ParquetServerDataWriter(
File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
@@ -61,11 +59,9 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS
serviceWriter.write(reader)
}
- override fun shutdown(): CompletableResultCode {
+ override fun close() {
hostWriter.close()
serviceWriter.close()
serverWriter.close()
-
- return CompletableResultCode.ofSuccess()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 0bbf1443..6fd85e8c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,12 +23,14 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
+import kotlinx.coroutines.*
+import org.opendc.compute.api.Server
import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
-import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
+import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor
import org.opendc.compute.workload.grid5000
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -37,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMetricReader
import java.io.File
import java.time.Duration
import java.util.*
@@ -97,7 +99,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
else
null
val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder)
- val telemetry = SdkTelemetryManager(clock)
+ val telemetry = NoopTelemetryManager()
val runner = ComputeServiceHelper(
coroutineContext,
clock,
@@ -107,23 +109,35 @@ abstract class Portfolio(name: String) : Experiment(name) {
interferenceModel?.withSeed(repeat.toLong())
)
- val exporter = ParquetComputeMetricExporter(
- File(config.getString("output-path")),
- "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
- 4096
- )
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
-
val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
+ val servers = mutableListOf<Server>()
+ val exporter = ComputeMetricReader(
+ this,
+ clock,
+ runner.service,
+ servers,
+ ParquetComputeMonitor(
+ File(config.getString("output-path")),
+ "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
+ bufferSize = 4096
+ ),
+ exportInterval = Duration.ofMinutes(5)
+ )
try {
// Instantiate the desired topology
runner.apply(topology)
- // Run the workload trace
- runner.run(vms, seeder.nextLong())
+ coroutineScope {
+ // Run the workload trace
+ runner.run(vms, seeder.nextLong(), servers)
+
+ // Stop the metric collection
+ exporter.close()
+ }
} finally {
runner.close()
+ exporter.close()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 01b2a8fe..62cdf123 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.compute.api.Server
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMetricReader
+import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.telemetry.compute.table.ServiceData
-import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
-import java.time.Instant
import java.util.*
/**
@@ -54,7 +52,7 @@ class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var exporter: TestComputeMetricExporter
+ private lateinit var monitor: TestComputeMonitor
/**
* The [FilterScheduler] to use for all experiments.
@@ -67,11 +65,11 @@ class CapelinIntegrationTest {
private lateinit var workloadLoader: ComputeWorkloadLoader
/**
- * Setup the experimental environment.
+ * Set up the experimental environment.
*/
@BeforeEach
fun setUp() {
- exporter = TestComputeMetricExporter()
+ monitor = TestComputeMonitor()
computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -85,22 +83,22 @@ class CapelinIntegrationTest {
@Test
fun testLarge() = runBlockingSimulation {
val (workload, _) = createTestWorkload(1.0)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology()
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, 0)
+ runner.run(workload, 0, servers)
- val serviceMetrics = exporter.serviceMetrics
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -116,15 +114,15 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
}
@@ -135,41 +133,41 @@ class CapelinIntegrationTest {
fun testSmall() = runBlockingSimulation {
val seed = 1
val (workload, _) = createTestWorkload(0.25, seed)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, seed.toLong())
+ runner.run(workload, seed.toLong(), servers)
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
+ { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -181,41 +179,41 @@ class CapelinIntegrationTest {
val seed = 0
val (workload, interferenceModel) = createTestWorkload(1.0, seed)
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
interferenceModel = interferenceModel?.withSeed(seed.toLong())
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -225,43 +223,43 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
val (workload, _) = createTestWorkload(0.25, seed)
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } }
+ { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -281,8 +279,7 @@ class CapelinIntegrationTest {
return stream.use { clusterTopology(stream) }
}
- class TestComputeMetricExporter : ComputeMetricExporter() {
- var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0)
+ class TestComputeMonitor : ComputeMonitor {
var idleTime = 0L
var activeTime = 0L
var stealTime = 0L
@@ -290,19 +287,6 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
- override fun record(reader: ServiceTableReader) {
- serviceMetrics = ServiceData(
- reader.timestamp,
- reader.hostsUp,
- reader.hostsDown,
- reader.serversPending,
- reader.serversActive,
- reader.attemptsSuccess,
- reader.attemptsFailure,
- reader.attemptsError
- )
- }
-
override fun record(reader: HostTableReader) {
idleTime += reader.cpuIdleTime
activeTime += reader.cpuActiveTime
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
index 47e30a14..b476a669 100644
--- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -30,6 +30,7 @@ plugins {
dependencies {
api(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcCompute.opendcComputeService)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt
new file mode 100644
index 00000000..593203fc
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt
@@ -0,0 +1,424 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.driver.Host
+import org.opendc.telemetry.compute.table.*
+import java.time.Clock
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
+ * export interval.
+ *
+ * @param scope The [CoroutineScope] to run the reader in.
+ * @param clock The virtual clock.
+ * @param service The [ComputeService] to monitor.
+ * @param servers The [Server]s to monitor.
+ * @param monitor The monitor to export the metrics to.
+ * @param exportInterval The export interval.
+ */
+public class ComputeMetricReader(
+ scope: CoroutineScope,
+ clock: Clock,
+ private val service: ComputeService,
+ private val servers: List<Server>,
+ private val monitor: ComputeMonitor,
+ private val exportInterval: Duration = Duration.ofMinutes(5)
+) : AutoCloseable {
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * Aggregator for service metrics.
+ */
+ private val serviceTableReader = ServiceTableReaderImpl(service)
+
+ /**
+ * Mapping from [Host] instances to [HostTableReaderImpl]
+ */
+ private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>()
+
+ /**
+ * Mapping from [Server] instances to [ServerTableReaderImpl]
+ */
+ private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>()
+
+ /**
+ * The background job that is responsible for collecting the metrics every cycle.
+ */
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
+
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ try {
+ val now = clock.instant()
+
+ for (host in service.hosts) {
+ val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
+ reader.record(now)
+ monitor.record(reader)
+ reader.reset()
+ }
+
+ for (server in servers) {
+ val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
+ reader.record(now)
+ monitor.record(reader)
+ reader.reset()
+ }
+
+ serviceTableReader.record(now)
+ monitor.record(serviceTableReader)
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
+ }
+ }
+ } finally {
+ if (monitor is AutoCloseable) {
+ monitor.close()
+ }
+ }
+ }
+
+ override fun close() {
+ job.cancel()
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val hostsUp: Int
+ get() = _hostsUp
+ private var _hostsUp = 0
+
+ override val hostsDown: Int
+ get() = _hostsDown
+ private var _hostsDown = 0
+
+ override val serversPending: Int
+ get() = _serversPending
+ private var _serversPending = 0
+
+ override val serversActive: Int
+ get() = _serversActive
+ private var _serversActive = 0
+
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ private var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ private var _attemptsFailure = 0
+
+ override val attemptsError: Int
+ get() = _attemptsError
+ private var _attemptsError = 0
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ _timestamp = now
+
+ val stats = service.getSchedulerStats()
+ _hostsUp = stats.hostsAvailable
+ _hostsDown = stats.hostsUnavailable
+ _serversPending = stats.serversPending
+ _serversActive = stats.serversActive
+ _attemptsSuccess = stats.attemptsSuccess.toInt()
+ _attemptsFailure = stats.attemptsFailure.toInt()
+ _attemptsError = stats.attemptsError.toInt()
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ private class HostTableReaderImpl(host: Host) : HostTableReader {
+ private val _host = host
+
+ override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity)
+
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ private var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ private var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ private var _guestsError = 0
+
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ private var _guestsInvalid = 0
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ private var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ private var _cpuDemand = 0.0
+
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ private var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val powerUsage: Double
+ get() = _powerUsage
+ private var _powerUsage = 0.0
+
+ override val powerTotal: Double
+ get() = _powerTotal - previousPowerTotal
+ private var _powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime = 0L
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime = 0L
+ private var previousDowntime = 0L
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val hostCpuStats = _host.getCpuStats()
+ val hostSysStats = _host.getSystemStats()
+
+ _timestamp = now
+ _guestsTerminated = hostSysStats.guestsTerminated
+ _guestsRunning = hostSysStats.guestsRunning
+ _guestsError = hostSysStats.guestsError
+ _guestsInvalid = hostSysStats.guestsInvalid
+ _cpuLimit = hostCpuStats.capacity
+ _cpuDemand = hostCpuStats.demand
+ _cpuUsage = hostCpuStats.usage
+ _cpuUtilization = hostCpuStats.utilization
+ _cpuActiveTime = hostCpuStats.activeTime
+ _cpuIdleTime = hostCpuStats.idleTime
+ _cpuStealTime = hostCpuStats.stealTime
+ _cpuLostTime = hostCpuStats.lostTime
+ _powerUsage = hostSysStats.powerUsage
+ _powerTotal = hostSysStats.energyUsage
+ _uptime = hostSysStats.uptime.toMillis()
+ _downtime = hostSysStats.downtime.toMillis()
+ _bootTime = hostSysStats.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousPowerTotal = _powerTotal
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerUsage = 0.0
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader {
+ private val _server = server
+
+ /**
+ * The static information about this server.
+ */
+ override val server = ServerInfo(
+ server.uid.toString(),
+ server.name,
+ "vm",
+ "x86",
+ server.image.uid.toString(),
+ server.image.name,
+ server.flavor.cpuCount,
+ server.flavor.memorySize
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ override var host: HostInfo? = null
+ private var _host: Host? = null
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime: Long = 0
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime: Long = 0
+ private var previousDowntime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ private var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val newHost = service.lookupHost(_server)
+ if (newHost != null && newHost.uid != _host?.uid) {
+ _host = newHost
+ host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity)
+ }
+
+ val cpuStats = _host?.getCpuStats(_server)
+ val sysStats = _host?.getSystemStats(_server)
+
+ _timestamp = now
+ _cpuLimit = cpuStats?.capacity ?: 0.0
+ _cpuActiveTime = cpuStats?.activeTime ?: 0
+ _cpuIdleTime = cpuStats?.idleTime ?: 0
+ _cpuStealTime = cpuStats?.stealTime ?: 0
+ _cpuLostTime = cpuStats?.lostTime ?: 0
+ _uptime = sysStats?.uptime?.toMillis() ?: 0
+ _downtime = sysStats?.downtime?.toMillis() ?: 0
+ _provisionTime = _server.launchedAt
+ _bootTime = sysStats?.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+
+ _host = null
+ _cpuLimit = 0.0
+ }
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index a9290c47..ca5da079 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -31,7 +31,6 @@ import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.time.Duration
-import java.util.*
/**
* A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index a150de4e..7c0c43ed 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -23,8 +23,9 @@
package org.opendc.web.runner
import mu.KotlinLogging
+import org.opendc.compute.api.Server
import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
@@ -35,13 +36,12 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMetricReader
import org.opendc.web.client.runner.OpenDCRunnerClient
import org.opendc.web.proto.runner.Job
import org.opendc.web.proto.runner.Scenario
import org.opendc.web.runner.internal.JobManager
-import org.opendc.web.runner.internal.WebComputeMetricExporter
+import org.opendc.web.runner.internal.WebComputeMonitor
import java.io.File
import java.time.Duration
import java.util.*
@@ -180,9 +180,9 @@ public class OpenDCRunner(
private val scenario: Scenario,
private val repeat: Int,
private val topology: Topology,
- ) : RecursiveTask<WebComputeMetricExporter.Results>() {
- override fun compute(): WebComputeMetricExporter.Results {
- val exporter = WebComputeMetricExporter()
+ ) : RecursiveTask<WebComputeMonitor.Results>() {
+ override fun compute(): WebComputeMonitor.Results {
+ val monitor = WebComputeMonitor()
// Schedule task that interrupts the simulation if it runs for too long.
val currentThread = Thread.currentThread()
@@ -206,25 +206,24 @@ public class OpenDCRunner(
else
null
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
failureModel,
interferenceModel.takeIf { phenomena.interference }
)
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
// Instantiate the topology onto the simulator
simulator.apply(topology)
// Run workload trace
- simulator.run(vms, seeder.nextLong())
+ simulator.run(vms, seeder.nextLong(), servers)
- val serviceMetrics = collectServiceMetrics(telemetry.metricProducer)
+ val serviceMetrics = simulator.service.getSchedulerStats()
logger.debug {
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -235,14 +234,14 @@ public class OpenDCRunner(
}
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
}
} finally {
interruptTask.cancel(false)
}
- return exporter.collectResults()
+ return monitor.collectResults()
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
index 8de0cee4..99b8aaf1 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
@@ -62,7 +62,7 @@ internal class JobManager(private val client: OpenDCRunnerClient) {
/**
* Persist the specified results.
*/
- fun finish(id: Long, results: List<WebComputeMetricExporter.Results>) {
+ fun finish(id: Long, results: List<WebComputeMonitor.Results>) {
client.jobs.update(
id,
Job.Update(
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
index 04437a5f..69350d8c 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
@@ -22,7 +22,6 @@
package org.opendc.web.runner.internal
-import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.telemetry.compute.table.ServiceTableReader
@@ -32,7 +31,7 @@ import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-internal class WebComputeMetricExporter : ComputeMetricExporter() {
+internal class WebComputeMonitor : ComputeMonitor {
override fun record(reader: HostTableReader) {
val slices = reader.downtime / SLICE_LENGTH