summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-27 12:33:13 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-27 12:36:18 +0100
commit5b0eaf76ec00192c755b268b7655f6463c5bc62f (patch)
treecf7c04ce21d0da964172ef35deb45c843aea8b98
parentccd1f96f8568978c80aa0b9a100ca6158ade34ba (diff)
compute: Migrate compute service simulator to OpenTelemetry
This change updates the compute service simulator to use OpenTelemetry for reporting metrics of the (simulated) hosts as opposed to using custom event flows. This approach is more generic, flexible and possibly offers better performance as we can collect metrics of all services in a single sweep, as opposed to listening to several services and each invoking the handlers.
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt72
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt117
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt142
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt72
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt171
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt9
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt2
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt112
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt1
14 files changed, 368 insertions, 348 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index c3c39572..bed15dfd 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.service.driver
-import kotlinx.coroutines.flow.Flow
import org.opendc.compute.api.Server
import java.util.*
@@ -56,11 +55,6 @@ public interface Host {
public val meta: Map<String, Any>
/**
- * The events emitted by the driver.
- */
- public val events: Flow<HostEvent>
-
- /**
* Determine whether the specified [instance][server] can still fit on this host.
*/
public fun canFit(server: Server): Boolean
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
deleted file mode 100644
index 97350679..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.driver
-
-/**
- * An event that is emitted by a [Host].
- */
-public sealed class HostEvent {
- /**
- * The driver that emitted the event.
- */
- public abstract val driver: Host
-
- /**
- * This event is emitted when the number of active servers on the server managed by this driver is updated.
- *
- * @property driver The driver that emitted the event.
- * @property numberOfActiveServers The number of active servers.
- * @property availableMemory The available memory, in MB.
- */
- public data class VmsUpdated(
- override val driver: Host,
- public val numberOfActiveServers: Int,
- public val availableMemory: Long
- ) : HostEvent()
-
- /**
- * This event is emitted when a slice is finished.
- *
- * @property driver The driver that emitted the event.
- * @property requestedBurst The total requested CPU time (can be above capacity).
- * @property grantedBurst The actual total granted capacity, which might be lower than the requested burst due to
- * the hypervisor being interrupted during a slice.
- * @property overcommissionedBurst The CPU time that the hypervisor could not grant to the virtual machine since
- * it did not have the capacity.
- * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance
- * interference.
- * @property cpuUsage CPU use in megahertz.
- * @property cpuDemand CPU demand in megahertz.
- * @property numberOfDeployedImages The number of images deployed on this hypervisor.
- */
- public data class SliceFinished(
- override val driver: Host,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val numberOfDeployedImages: Int,
- ) : HostEvent()
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
index 1ad3f1c6..3bf8a114 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -38,5 +38,6 @@ dependencies {
implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 89784803..6d81aa7d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,8 +22,9 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.common.Labels
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
@@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.MachinePowerModel
import org.opendc.simulator.failures.FailureDomain
-import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -52,6 +52,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
clock: Clock,
+ meter: Meter,
hypervisor: SimHypervisorProvider,
powerModel: MachinePowerModel = ConstantPowerModel(0.0),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
@@ -66,10 +67,6 @@ public class SimHost(
*/
private val logger = KotlinLogging.logger {}
- override val events: Flow<HostEvent>
- get() = _events
- internal val _events = EventFlow<HostEvent>()
-
/**
* The event listeners registered with this host.
*/
@@ -99,18 +96,13 @@ public class SimHost(
cpuUsage: Double,
cpuDemand: Double
) {
- _events.emit(
- HostEvent.SliceFinished(
- this@SimHost,
- requestedWork,
- grantedWork,
- overcommittedWork,
- interferedWork,
- cpuUsage,
- cpuDemand,
- guests.size
- )
- )
+ _cpuWork.record(requestedWork.toDouble())
+ _cpuWorkGranted.record(grantedWork.toDouble())
+ _cpuWorkOvercommit.record(overcommittedWork.toDouble())
+ _cpuWorkInterference.record(interferedWork.toDouble())
+ _cpuUsage.record(cpuUsage)
+ _cpuDemand.record(cpuDemand)
+ _cpuPower.record(machine.powerDraw.value)
}
}
)
@@ -132,6 +124,87 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
+ /**
+ * The number of guests on the host.
+ */
+ private val _guests = meter.longUpDownCounterBuilder("guests.total")
+ .setDescription("Number of guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The number of active guests on the host.
+ */
+ private val _activeGuests = meter.longUpDownCounterBuilder("guests.active")
+ .setDescription("Number of active guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU usage on the host.
+ */
+ private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage")
+ .setDescription("The amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU demand on the host.
+ */
+ private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand")
+ .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage")
+ .setDescription("The amount of power used by the CPU")
+ .setUnit("W")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total")
+ .setDescription("The amount of work supplied to the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work actually performed by the CPU.
+ */
+ private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted")
+ .setDescription("The amount of work performed by the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to overcommitting resource.
+ */
+ private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit")
+ .setDescription("The amount of work not performed by the CPU due to overcommitment")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to interference.
+ */
+ private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference")
+ .setDescription("The amount of work not performed by the CPU due to interference")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
init {
// Launch hypervisor onto machine
scope.launch {
@@ -166,12 +239,11 @@ public class SimHost(
require(canFit(server)) { "Server does not fit" }
val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
guests[server] = guest
+ _guests.add(1)
if (start) {
guest.start()
}
-
- _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override fun contains(server: Server): Boolean {
@@ -191,6 +263,7 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
guest.terminate()
+ _guests.add(-1)
}
override fun addListener(listener: HostListener) {
@@ -228,6 +301,7 @@ public class SimHost(
}
}
+ _activeGuests.add(1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
}
@@ -238,9 +312,8 @@ public class SimHost(
}
}
+ _activeGuests.add(-1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
-
- _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override suspend fun fail() {
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e311cd21..830fc868 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -22,12 +22,14 @@
package org.opendc.compute.simulator
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -37,7 +39,8 @@ import org.opendc.compute.api.Image
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
-import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.time.Clock
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.sdk.toOtelClock
import java.util.UUID
+import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
- private lateinit var scope: TestCoroutineScope
- private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@BeforeEach
fun setUp() {
- scope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(scope)
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
@@ -74,72 +74,98 @@ internal class SimHostTest {
* Test overcommitting of resources by the hypervisor.
*/
@Test
- fun testOvercommitted() {
+ fun testOvercommitted() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
var requestedWork = 0L
var grantedWork = 0L
var overcommittedWork = 0L
- scope.launch {
- val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider())
- val duration = 5 * 60L
- val vmImageA = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
- ),
- )
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider())
+ val duration = 5 * 60L
+ val vmImageA = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
+ ),
)
)
- val vmImageB = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
- )
+ )
+ val vmImageB = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
)
)
)
+ )
- delay(5)
-
- val flavor = MockFlavor(2, 0)
- virtDriver.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> {
- requestedWork += event.requestedBurst
- grantedWork += event.grantedBurst
- overcommittedWork += event.overcommissionedBurst
- }
- }
+ val flavor = MockFlavor(2, 0)
+
+ // Setup metric reader
+ val reader = CoroutineMetricReader(
+ this, listOf(meterProvider as MetricProducer),
+ object : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong()
+ grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong()
+ overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong()
+ return CompletableResultCode.ofSuccess()
}
- .launchIn(this)
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ },
+ exportInterval = duration * 1000
+ )
+
+ coroutineScope {
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
+
+ suspendCancellableCoroutine<Unit> { cont ->
+ virtDriver.addListener(object : HostListener {
+ private var finished = 0
+
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED && ++finished == 2) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ }
}
- scope.advanceUntilIdle()
+ // Ensure last cycle is collected
+ delay(1000 * duration)
+ virtDriver.close()
+ reader.close()
assertAll(
- { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
{ assertEquals(4197600, requestedWork, "Requested work does not match") },
{ assertEquals(2157600, grantedWork, "Granted work does not match") },
{ assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
- { assertEquals(1200006, scope.currentTime) }
+ { assertEquals(1500001, currentTime) }
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 4f48bba7..40f50235 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,24 +22,19 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
@@ -138,7 +133,7 @@ public fun createTraceReader(
*/
public suspend fun withComputeService(
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
block: suspend CoroutineScope.(ComputeService) -> Unit
@@ -153,13 +148,15 @@ public suspend fun withComputeService(
def.meta,
coroutineContext,
clock,
+ meterProvider.get("opendc-compute-simulator"),
SimFairShareHypervisorProvider(),
def.powerModel
)
}
+ val schedulerMeter = meterProvider.get("opendc-compute")
val scheduler =
- ComputeService(coroutineContext, clock, meter, allocationPolicy)
+ ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
@@ -194,62 +191,13 @@ public suspend fun withMonitor(
monitor.reportHostStateChange(clock.millis(), host, newState)
}
})
-
- monitorJobs += host.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> monitor.reportHostSlice(
- clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.driver
- )
- }
- }
- .launchIn(this)
-
- monitorJobs += (host as SimHost).machine.powerDraw
- .onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(this)
}
val reader = CoroutineMetricReader(
- this, listOf(metricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
-
- val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
-
- monitor.reportProvisionerMetrics(
- clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- return CompletableResultCode.ofSuccess()
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = 5 * 60 * 1000
+ this,
+ listOf(metricProducer),
+ ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
+ exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
)
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 2921daba..5fa77161 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -126,8 +126,6 @@ public abstract class Portfolio(name: String) : Experiment(name) {
.setClock(clock.toOtelClock())
.build()
- val meter = meterProvider.get("opendc-compute")
-
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -153,7 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
new file mode 100644
index 00000000..799de60f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import org.opendc.compute.service.driver.Host
+import java.time.Clock
+
+/**
+ * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ */
+public class ExperimentMetricExporter(
+ private val monitor: ExperimentMonitor,
+ private val clock: Clock,
+ private val hosts: Map<String, Host>
+) : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ reportHostMetrics(metricsByName)
+ reportProvisionerMetrics(metricsByName)
+ return CompletableResultCode.ofSuccess()
+ }
+
+ private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ val hostMetrics = mutableMapOf<String, HostMetrics>()
+ hosts.mapValuesTo(hostMetrics) { HostMetrics() }
+
+ mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
+ m.cpuDemand = v
+ }
+
+ mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
+ m.cpuUsage = v
+ }
+
+ mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v ->
+ m.powerDraw = v
+ }
+
+ mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
+ m.requestedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
+ m.grantedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
+ m.overcommissionedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ m.interferedBurst = v.toLong()
+ }
+
+ mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
+ m.numberOfDeployedImages = v.toInt()
+ }
+
+ for ((id, hostMetric) in hostMetrics) {
+ val host = hosts.getValue(id)
+ monitor.reportHostSlice(
+ clock.millis(),
+ hostMetric.requestedBurst,
+ hostMetric.grantedBurst,
+ hostMetric.overcommissionedBurst,
+ hostMetric.interferedBurst,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.numberOfDeployedImages,
+ host
+ )
+
+ monitor.reportPowerConsumption(host, hostMetric.powerDraw)
+ }
+ }
+
+ private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSummaryData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.sum)
+ }
+ }
+ }
+
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ val points = data?.longSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
+ val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ }
+
+ private class HostMetrics {
+ var requestedBurst: Long = 0
+ var grantedBurst: Long = 0
+ var overcommissionedBurst: Long = 0
+ var interferedBurst: Long = 0
+ var cpuUsage: Double = 0.0
+ var cpuDemand: Double = 0.0
+ var numberOfDeployedImages: Int = 0
+ var powerDraw: Double = 0.0
+ }
+
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index a57c8d78..5e75c890 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -26,12 +26,11 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
-import java.io.Closeable
/**
* A monitor watches the events of an experiment.
*/
-public interface ExperimentMonitor : Closeable {
+public interface ExperimentMonitor : AutoCloseable {
/**
* This method is invoked when the state of a VM changes.
*/
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index fd906f4d..02cfdc06 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
@@ -82,9 +81,7 @@ class CapelinIntegrationTest {
.setClock(clock.toOtelClock())
.build()
- val meter: Meter = meterProvider.get("opendc-compute")
-
- withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
@@ -142,9 +139,7 @@ class CapelinIntegrationTest {
.setClock(clock.toOtelClock())
.build()
- val meter: Meter = meterProvider.get("opendc-compute")
-
- withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
processTrace(
clock,
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 706efdc9..5b717ff7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -34,7 +34,6 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
-import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
@@ -226,7 +225,6 @@ public class RunnerCli : CliktCommand(name = "runner") {
.setClock(clock.toOtelClock())
.build()
val metricProducer = meterProvider as MetricProducer
- val meter: Meter = meterProvider.get("opendc-compute")
val operational = scenario.get("operational", Document::class.java)
val allocationPolicy =
@@ -254,7 +252,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
val environment = TopologyParser(topologies, topologyId)
val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7
- withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
val failureDomain = if (failureFrequency > 0) {
logger.debug { "ENABLING failures" }
createFailureDomain(
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index 937b6966..f2eea97c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -71,7 +71,7 @@ class SimResourceBenchmarks {
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingTest {
val provider = SimResourceSource(4200.0, clock, scheduler)
- val forwarder = SimResourceTransformer()
+ val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
return@runBlockingTest forwarder.consume(state.consumers[0])
}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
deleted file mode 100644
index 10f29f4e..00000000
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.utils.flow
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.channels.SendChannel
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.FlowCollector
-import kotlinx.coroutines.flow.consumeAsFlow
-
-/**
- * A [Flow] that can be used to emit events.
- */
-public interface EventFlow<T> : Flow<T> {
- /**
- * Emit the specified [event].
- */
- public fun emit(event: T)
-
- /**
- * Close the flow.
- */
- public fun close()
-}
-
-/**
- * Creates a new [EventFlow].
- */
-@Suppress("FunctionName")
-public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
-
-/**
- * Internal implementation of the [EventFlow] class.
- */
-@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
-private class EventFlowImpl<T> : EventFlow<T> {
- private var closed: Boolean = false
- private val subscribers = mutableListOf<SendChannel<T>>()
-
- override fun emit(event: T) {
- if (closed) {
- return
- }
-
- val it = subscribers.iterator()
- synchronized(this) {
- while (it.hasNext()) {
- val chan = it.next()
- if (chan.isClosedForSend) {
- it.remove()
- } else {
- chan.offer(event)
- }
- }
- }
- }
-
- override fun close() {
- synchronized(this) {
- closed = true
-
- for (chan in subscribers) {
- chan.close()
- }
-
- subscribers.clear()
- }
- }
-
- @InternalCoroutinesApi
- override suspend fun collect(collector: FlowCollector<T>) {
- val channel: Channel<T>
- synchronized(this) {
- if (closed) {
- return
- }
-
- channel = Channel(Channel.UNLIMITED)
- subscribers.add(channel)
- }
- try {
- channel.consumeAsFlow().collect(collector)
- } finally {
- channel.close()
- }
- }
-
- override fun toString(): String = "EventFlow"
-}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
index e06e5eb3..46c0d835 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
@@ -78,6 +78,7 @@ internal class WorkflowServiceIntegrationTest {
def.meta,
coroutineContext,
clock,
+ MeterProvider.noop().get("opendc-compute-simulator"),
SimSpaceSharedHypervisorProvider()
)
}