summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-25 21:50:45 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-26 15:41:05 +0100
commit608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 (patch)
treef0130622f189815e41837993b6f66ba3fc11b899 /simulator/opendc-experiments
parent0d66ef47d6e1ec0861b4939800c5070f96600ca0 (diff)
compute: Integrate OpenTelemetry Metrics in OpenDC Compute
This change integrates the OpenTelemetry Metrics API in the OpenDC Compute Service implementation. This replaces the old infrastructure for gathering metrics.
Diffstat (limited to 'simulator/opendc-experiments')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt131
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt59
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt12
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt26
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt129
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt2
7 files changed, 205 insertions, 156 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 2d0da1bf..b2d7cc30 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -47,4 +47,6 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
+
+ implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 6f99a44e..4f48bba7 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,6 +22,11 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.launchIn
@@ -29,7 +34,6 @@ import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
@@ -45,8 +49,10 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Clock
+import kotlin.coroutines.coroutineContext
import kotlin.coroutines.resume
import kotlin.math.ln
import kotlin.math.max
@@ -130,12 +136,13 @@ public fun createTraceReader(
/**
* Construct the environment for a simulated compute service..
*/
-public fun createComputeService(
- coroutineScope: CoroutineScope,
+public suspend fun withComputeService(
clock: Clock,
+ meter: Meter,
environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy
-): ComputeService {
+ allocationPolicy: AllocationPolicy,
+ block: suspend CoroutineScope.(ComputeService) -> Unit
+): Unit = coroutineScope {
val hosts = environmentReader
.use { it.read() }
.map { def ->
@@ -144,7 +151,7 @@ public fun createComputeService(
def.name,
def.model,
def.meta,
- coroutineScope.coroutineContext,
+ coroutineContext,
clock,
SimFairShareHypervisorProvider(),
def.powerModel
@@ -152,26 +159,33 @@ public fun createComputeService(
}
val scheduler =
- ComputeService(coroutineScope.coroutineContext, clock, allocationPolicy)
+ ComputeService(coroutineContext, clock, meter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
}
- return scheduler
+ try {
+ block(this, scheduler)
+ } finally {
+ scheduler.close()
+ hosts.forEach(SimHost::close)
+ }
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public fun attachMonitor(
- coroutineScope: CoroutineScope,
+public suspend fun withMonitor(
+ monitor: ExperimentMonitor,
clock: Clock,
+ metricProducer: MetricProducer,
scheduler: ComputeService,
- monitor: ExperimentMonitor
-): MonitorResults {
- val results = MonitorResults()
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ val monitorJobs = mutableSetOf<Job>()
+
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -181,7 +195,7 @@ public fun attachMonitor(
}
})
- host.events
+ monitorJobs += host.events
.onEach { event ->
when (event) {
is HostEvent.SliceFinished -> monitor.reportHostSlice(
@@ -197,37 +211,81 @@ public fun attachMonitor(
)
}
}
- .launchIn(coroutineScope)
+ .launchIn(this)
- (host as SimHost).machine.powerDraw
+ monitorJobs += (host as SimHost).machine.powerDraw
.onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(coroutineScope)
+ .launchIn(this)
}
- scheduler.events
- .onEach { event ->
- when (event) {
- is ComputeServiceEvent.MetricsAvailable -> {
- results.submittedVms = event.totalVmCount
- results.queuedVms = event.waitingVmCount
- results.runningVms = event.activeVmCount
- results.finishedVms = event.inactiveVmCount
- results.unscheduledVms = event.failedVmCount
- monitor.reportProvisionerMetrics(clock.millis(), event)
- }
+ val reader = CoroutineMetricReader(
+ this, listOf(metricProducer),
+ object : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+
+ val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ return CompletableResultCode.ofSuccess()
}
- }
- .launchIn(coroutineScope)
- return results
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ },
+ exportInterval = 5 * 60 * 1000
+ )
+
+ try {
+ block(this)
+ } finally {
+ monitorJobs.forEach(Job::cancel)
+ reader.close()
+ monitor.close()
+ }
}
-public class MonitorResults {
+public class ComputeMetrics {
public var submittedVms: Int = 0
public var queuedVms: Int = 0
public var runningVms: Int = 0
- public var finishedVms: Int = 0
public var unscheduledVms: Int = 0
+ public var finishedVms: Int = 0
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+ val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ val res = ComputeMetrics()
+ try {
+ // Hack to extract metrics from OpenTelemetry SDK
+ res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Failed to collect metrics" }
+ }
+ return res
}
/**
@@ -242,12 +300,17 @@ public suspend fun processTrace(
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
+ var offset = Long.MIN_VALUE
try {
coroutineScope {
while (reader.hasNext()) {
val entry = reader.next()
- delay(max(0, entry.start - clock.millis()))
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
val server = client.newServer(
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 46e0bcb9..2921daba 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -22,11 +22,13 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.opendc.compute.service.scheduler.*
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -41,6 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
@@ -110,15 +113,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
* Perform a single trial for this portfolio.
*/
@OptIn(ExperimentalCoroutinesApi::class)
- override fun doRun(repeat: Int) {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
+ override fun doRun(repeat: Int): Unit = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seeder = Random(repeat)
val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createAllocationPolicy(seeder)
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val meter = meterProvider.get("opendc-compute")
+
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -144,14 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy
- )
-
+ withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
@@ -166,30 +168,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
null
}
- val monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
-
- logger.debug("SUBMIT=${monitorResults.submittedVms}")
- logger.debug("FAIL=${monitorResults.unscheduledVms}")
- logger.debug("QUEUED=${monitorResults.queuedVms}")
- logger.debug("RUNNING=${monitorResults.runningVms}")
- logger.debug("FINISHED=${monitorResults.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
}
- try {
- testScope.advanceUntilIdle()
- } finally {
- monitor.close()
- }
+ val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
/**
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 14cc06dc..a57c8d78 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.monitor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import java.io.Closeable
@@ -68,5 +67,14 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index c9d57a98..0e675d87 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.telemetry.HostEvent
@@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerWriter.write(
ProvisionerEvent(
time,
- event.totalHostCount,
- event.availableHostCount,
- event.totalVmCount,
- event.activeVmCount,
- event.inactiveVmCount,
- event.waitingVmCount,
- event.failedVmCount
+ totalHostCount,
+ availableHostCount,
+ totalVmCount,
+ activeVmCount,
+ inactiveVmCount,
+ waitingVmCount,
+ failedVmCount
)
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index a836b334..fd906f4d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,18 +22,18 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.AfterEach
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.model.Workload
@@ -45,8 +45,8 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
@@ -54,16 +54,6 @@ import java.time.Clock
@OptIn(ExperimentalCoroutinesApi::class)
class CapelinIntegrationTest {
/**
- * The [TestCoroutineScope] to use.
- */
- private lateinit var testScope: TestCoroutineScope
-
- /**
- * The simulation clock to use.
- */
- private lateinit var clock: Clock
-
- /**
* The monitor used to keep track of the metrics.
*/
private lateinit var monitor: TestExperimentReporter
@@ -73,37 +63,28 @@ class CapelinIntegrationTest {
*/
@BeforeEach
fun setUp() {
- testScope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(testScope)
-
monitor = TestExperimentReporter()
}
- /**
- * Tear down the experimental environment.
- */
- @AfterEach
- fun tearDown() = testScope.cleanupTestCoroutines()
-
@Test
- fun testLarge() {
+ fun testLarge() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val failures = false
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: ComputeService
- lateinit var monitorResults: MonitorResults
+ lateinit var monitorResults: ComputeMetrics
- testScope.launch {
- scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy
- )
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val meter: Meter = meterProvider.get("opendc-compute")
+
+ withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
@@ -118,28 +99,28 @@ class CapelinIntegrationTest {
null
}
- monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
- monitor.close()
}
- runSimulation()
+ monitorResults = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") },
+ { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
+ { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
+ { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
{ assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
{ assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
{ assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
@@ -148,38 +129,35 @@ class CapelinIntegrationTest {
}
@Test
- fun testSmall() {
+ fun testSmall() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy
- )
- val monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- yield()
-
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}")
-
- scheduler.close()
- monitor.close()
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val meter: Meter = meterProvider.get("opendc-compute")
+
+ withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
}
- runSimulation()
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
// Note that these values have been verified beforehand
assertAll(
@@ -191,11 +169,6 @@ class CapelinIntegrationTest {
}
/**
- * Run the simulation.
- */
- private fun runSimulation() = testScope.advanceUntilIdle()
-
- /**
* Obtain the trace reader for the test.
*/
private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
index 98e25be9..225200c9 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.sc18
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.test.TestCoroutineScope
import org.opendc.compute.service.ComputeService
@@ -100,6 +101,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
val compute = ComputeService(
testScope.coroutineContext,
clock,
+ MeterProvider.noop().get("opendc-compute"),
NumberOfActiveServersAllocationPolicy(),
)