summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt40
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt148
2 files changed, 93 insertions, 95 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 0bbf1443..6fd85e8c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,12 +23,14 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
+import kotlinx.coroutines.*
+import org.opendc.compute.api.Server
import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
-import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
+import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor
import org.opendc.compute.workload.grid5000
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -37,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMetricReader
import java.io.File
import java.time.Duration
import java.util.*
@@ -97,7 +99,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
else
null
val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder)
- val telemetry = SdkTelemetryManager(clock)
+ val telemetry = NoopTelemetryManager()
val runner = ComputeServiceHelper(
coroutineContext,
clock,
@@ -107,23 +109,35 @@ abstract class Portfolio(name: String) : Experiment(name) {
interferenceModel?.withSeed(repeat.toLong())
)
- val exporter = ParquetComputeMetricExporter(
- File(config.getString("output-path")),
- "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
- 4096
- )
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
-
val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
+ val servers = mutableListOf<Server>()
+ val exporter = ComputeMetricReader(
+ this,
+ clock,
+ runner.service,
+ servers,
+ ParquetComputeMonitor(
+ File(config.getString("output-path")),
+ "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
+ bufferSize = 4096
+ ),
+ exportInterval = Duration.ofMinutes(5)
+ )
try {
// Instantiate the desired topology
runner.apply(topology)
- // Run the workload trace
- runner.run(vms, seeder.nextLong())
+ coroutineScope {
+ // Run the workload trace
+ runner.run(vms, seeder.nextLong(), servers)
+
+ // Stop the metric collection
+ exporter.close()
+ }
} finally {
runner.close()
+ exporter.close()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 01b2a8fe..62cdf123 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.compute.api.Server
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMetricReader
+import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.telemetry.compute.table.ServiceData
-import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
-import java.time.Instant
import java.util.*
/**
@@ -54,7 +52,7 @@ class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var exporter: TestComputeMetricExporter
+ private lateinit var monitor: TestComputeMonitor
/**
* The [FilterScheduler] to use for all experiments.
@@ -67,11 +65,11 @@ class CapelinIntegrationTest {
private lateinit var workloadLoader: ComputeWorkloadLoader
/**
- * Setup the experimental environment.
+ * Set up the experimental environment.
*/
@BeforeEach
fun setUp() {
- exporter = TestComputeMetricExporter()
+ monitor = TestComputeMonitor()
computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -85,22 +83,22 @@ class CapelinIntegrationTest {
@Test
fun testLarge() = runBlockingSimulation {
val (workload, _) = createTestWorkload(1.0)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology()
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, 0)
+ runner.run(workload, 0, servers)
- val serviceMetrics = exporter.serviceMetrics
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -116,15 +114,15 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
}
@@ -135,41 +133,41 @@ class CapelinIntegrationTest {
fun testSmall() = runBlockingSimulation {
val seed = 1
val (workload, _) = createTestWorkload(0.25, seed)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, seed.toLong())
+ runner.run(workload, seed.toLong(), servers)
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
+ { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -181,41 +179,41 @@ class CapelinIntegrationTest {
val seed = 0
val (workload, interferenceModel) = createTestWorkload(1.0, seed)
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
interferenceModel = interferenceModel?.withSeed(seed.toLong())
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -225,43 +223,43 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
val (workload, _) = createTestWorkload(0.25, seed)
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } }
+ { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -281,8 +279,7 @@ class CapelinIntegrationTest {
return stream.use { clusterTopology(stream) }
}
- class TestComputeMetricExporter : ComputeMetricExporter() {
- var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0)
+ class TestComputeMonitor : ComputeMonitor {
var idleTime = 0L
var activeTime = 0L
var stealTime = 0L
@@ -290,19 +287,6 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
- override fun record(reader: ServiceTableReader) {
- serviceMetrics = ServiceData(
- reader.timestamp,
- reader.hostsUp,
- reader.hostsDown,
- reader.serversPending,
- reader.serversActive,
- reader.attemptsSuccess,
- reader.attemptsFailure,
- reader.attemptsError
- )
- }
-
override fun record(reader: HostTableReader) {
idleTime += reader.cpuIdleTime
activeTime += reader.cpuActiveTime