summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src/main')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt131
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt59
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt12
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt26
4 files changed, 150 insertions, 78 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 6f99a44e..4f48bba7 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,6 +22,11 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.launchIn
@@ -29,7 +34,6 @@ import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
@@ -45,8 +49,10 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Clock
+import kotlin.coroutines.coroutineContext
import kotlin.coroutines.resume
import kotlin.math.ln
import kotlin.math.max
@@ -130,12 +136,13 @@ public fun createTraceReader(
/**
* Construct the environment for a simulated compute service..
*/
-public fun createComputeService(
- coroutineScope: CoroutineScope,
+public suspend fun withComputeService(
clock: Clock,
+ meter: Meter,
environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy
-): ComputeService {
+ allocationPolicy: AllocationPolicy,
+ block: suspend CoroutineScope.(ComputeService) -> Unit
+): Unit = coroutineScope {
val hosts = environmentReader
.use { it.read() }
.map { def ->
@@ -144,7 +151,7 @@ public fun createComputeService(
def.name,
def.model,
def.meta,
- coroutineScope.coroutineContext,
+ coroutineContext,
clock,
SimFairShareHypervisorProvider(),
def.powerModel
@@ -152,26 +159,33 @@ public fun createComputeService(
}
val scheduler =
- ComputeService(coroutineScope.coroutineContext, clock, allocationPolicy)
+ ComputeService(coroutineContext, clock, meter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
}
- return scheduler
+ try {
+ block(this, scheduler)
+ } finally {
+ scheduler.close()
+ hosts.forEach(SimHost::close)
+ }
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public fun attachMonitor(
- coroutineScope: CoroutineScope,
+public suspend fun withMonitor(
+ monitor: ExperimentMonitor,
clock: Clock,
+ metricProducer: MetricProducer,
scheduler: ComputeService,
- monitor: ExperimentMonitor
-): MonitorResults {
- val results = MonitorResults()
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ val monitorJobs = mutableSetOf<Job>()
+
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -181,7 +195,7 @@ public fun attachMonitor(
}
})
- host.events
+ monitorJobs += host.events
.onEach { event ->
when (event) {
is HostEvent.SliceFinished -> monitor.reportHostSlice(
@@ -197,37 +211,81 @@ public fun attachMonitor(
)
}
}
- .launchIn(coroutineScope)
+ .launchIn(this)
- (host as SimHost).machine.powerDraw
+ monitorJobs += (host as SimHost).machine.powerDraw
.onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(coroutineScope)
+ .launchIn(this)
}
- scheduler.events
- .onEach { event ->
- when (event) {
- is ComputeServiceEvent.MetricsAvailable -> {
- results.submittedVms = event.totalVmCount
- results.queuedVms = event.waitingVmCount
- results.runningVms = event.activeVmCount
- results.finishedVms = event.inactiveVmCount
- results.unscheduledVms = event.failedVmCount
- monitor.reportProvisionerMetrics(clock.millis(), event)
- }
+ val reader = CoroutineMetricReader(
+ this, listOf(metricProducer),
+ object : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+
+ val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ return CompletableResultCode.ofSuccess()
}
- }
- .launchIn(coroutineScope)
- return results
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ },
+ exportInterval = 5 * 60 * 1000
+ )
+
+ try {
+ block(this)
+ } finally {
+ monitorJobs.forEach(Job::cancel)
+ reader.close()
+ monitor.close()
+ }
}
-public class MonitorResults {
+public class ComputeMetrics {
public var submittedVms: Int = 0
public var queuedVms: Int = 0
public var runningVms: Int = 0
- public var finishedVms: Int = 0
public var unscheduledVms: Int = 0
+ public var finishedVms: Int = 0
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+ val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ val res = ComputeMetrics()
+ try {
+ // Hack to extract metrics from OpenTelemetry SDK
+ res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Failed to collect metrics" }
+ }
+ return res
}
/**
@@ -242,12 +300,17 @@ public suspend fun processTrace(
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
+ var offset = Long.MIN_VALUE
try {
coroutineScope {
while (reader.hasNext()) {
val entry = reader.next()
- delay(max(0, entry.start - clock.millis()))
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
val server = client.newServer(
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 46e0bcb9..2921daba 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -22,11 +22,13 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.opendc.compute.service.scheduler.*
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -41,6 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
@@ -110,15 +113,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
* Perform a single trial for this portfolio.
*/
@OptIn(ExperimentalCoroutinesApi::class)
- override fun doRun(repeat: Int) {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
+ override fun doRun(repeat: Int): Unit = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seeder = Random(repeat)
val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createAllocationPolicy(seeder)
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val meter = meterProvider.get("opendc-compute")
+
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -144,14 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy
- )
-
+ withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
@@ -166,30 +168,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
null
}
- val monitorResults = attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
-
- logger.debug("SUBMIT=${monitorResults.submittedVms}")
- logger.debug("FAIL=${monitorResults.unscheduledVms}")
- logger.debug("QUEUED=${monitorResults.queuedVms}")
- logger.debug("RUNNING=${monitorResults.runningVms}")
- logger.debug("FINISHED=${monitorResults.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
}
- try {
- testScope.advanceUntilIdle()
- } finally {
- monitor.close()
- }
+ val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
/**
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 14cc06dc..a57c8d78 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.monitor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import java.io.Closeable
@@ -68,5 +67,14 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index c9d57a98..0e675d87 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.telemetry.HostEvent
@@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerWriter.write(
ProvisionerEvent(
time,
- event.totalHostCount,
- event.availableHostCount,
- event.totalVmCount,
- event.activeVmCount,
- event.inactiveVmCount,
- event.waitingVmCount,
- event.failedVmCount
+ totalHostCount,
+ availableHostCount,
+ totalVmCount,
+ activeVmCount,
+ inactiveVmCount,
+ waitingVmCount,
+ failedVmCount
)
)
}