summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt72
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt117
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt142
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt72
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt171
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt9
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt2
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt112
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt1
14 files changed, 368 insertions, 348 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index c3c39572..bed15dfd 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.service.driver
-import kotlinx.coroutines.flow.Flow
import org.opendc.compute.api.Server
import java.util.*
@@ -56,11 +55,6 @@ public interface Host {
public val meta: Map<String, Any>
/**
- * The events emitted by the driver.
- */
- public val events: Flow<HostEvent>
-
- /**
* Determine whether the specified [instance][server] can still fit on this host.
*/
public fun canFit(server: Server): Boolean
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
deleted file mode 100644
index 97350679..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.driver
-
-/**
- * An event that is emitted by a [Host].
- */
-public sealed class HostEvent {
- /**
- * The driver that emitted the event.
- */
- public abstract val driver: Host
-
- /**
- * This event is emitted when the number of active servers on the server managed by this driver is updated.
- *
- * @property driver The driver that emitted the event.
- * @property numberOfActiveServers The number of active servers.
- * @property availableMemory The available memory, in MB.
- */
- public data class VmsUpdated(
- override val driver: Host,
- public val numberOfActiveServers: Int,
- public val availableMemory: Long
- ) : HostEvent()
-
- /**
- * This event is emitted when a slice is finished.
- *
- * @property driver The driver that emitted the event.
- * @property requestedBurst The total requested CPU time (can be above capacity).
- * @property grantedBurst The actual total granted capacity, which might be lower than the requested burst due to
- * the hypervisor being interrupted during a slice.
- * @property overcommissionedBurst The CPU time that the hypervisor could not grant to the virtual machine since
- * it did not have the capacity.
- * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance
- * interference.
- * @property cpuUsage CPU use in megahertz.
- * @property cpuDemand CPU demand in megahertz.
- * @property numberOfDeployedImages The number of images deployed on this hypervisor.
- */
- public data class SliceFinished(
- override val driver: Host,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val numberOfDeployedImages: Int,
- ) : HostEvent()
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
index 1ad3f1c6..3bf8a114 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -38,5 +38,6 @@ dependencies {
implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 89784803..6d81aa7d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,8 +22,9 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.common.Labels
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
@@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.MachinePowerModel
import org.opendc.simulator.failures.FailureDomain
-import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -52,6 +52,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
clock: Clock,
+ meter: Meter,
hypervisor: SimHypervisorProvider,
powerModel: MachinePowerModel = ConstantPowerModel(0.0),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
@@ -66,10 +67,6 @@ public class SimHost(
*/
private val logger = KotlinLogging.logger {}
- override val events: Flow<HostEvent>
- get() = _events
- internal val _events = EventFlow<HostEvent>()
-
/**
* The event listeners registered with this host.
*/
@@ -99,18 +96,13 @@ public class SimHost(
cpuUsage: Double,
cpuDemand: Double
) {
- _events.emit(
- HostEvent.SliceFinished(
- this@SimHost,
- requestedWork,
- grantedWork,
- overcommittedWork,
- interferedWork,
- cpuUsage,
- cpuDemand,
- guests.size
- )
- )
+ _cpuWork.record(requestedWork.toDouble())
+ _cpuWorkGranted.record(grantedWork.toDouble())
+ _cpuWorkOvercommit.record(overcommittedWork.toDouble())
+ _cpuWorkInterference.record(interferedWork.toDouble())
+ _cpuUsage.record(cpuUsage)
+ _cpuDemand.record(cpuDemand)
+ _cpuPower.record(machine.powerDraw.value)
}
}
)
@@ -132,6 +124,87 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
+ /**
+ * The number of guests on the host.
+ */
+ private val _guests = meter.longUpDownCounterBuilder("guests.total")
+ .setDescription("Number of guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The number of active guests on the host.
+ */
+ private val _activeGuests = meter.longUpDownCounterBuilder("guests.active")
+ .setDescription("Number of active guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU usage on the host.
+ */
+ private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage")
+ .setDescription("The amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU demand on the host.
+ */
+ private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand")
+ .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage")
+ .setDescription("The amount of power used by the CPU")
+ .setUnit("W")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total")
+ .setDescription("The amount of work supplied to the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work actually performed by the CPU.
+ */
+ private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted")
+ .setDescription("The amount of work performed by the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to overcommitting resource.
+ */
+ private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit")
+ .setDescription("The amount of work not performed by the CPU due to overcommitment")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to interference.
+ */
+ private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference")
+ .setDescription("The amount of work not performed by the CPU due to interference")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
init {
// Launch hypervisor onto machine
scope.launch {
@@ -166,12 +239,11 @@ public class SimHost(
require(canFit(server)) { "Server does not fit" }
val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
guests[server] = guest
+ _guests.add(1)
if (start) {
guest.start()
}
-
- _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override fun contains(server: Server): Boolean {
@@ -191,6 +263,7 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
guest.terminate()
+ _guests.add(-1)
}
override fun addListener(listener: HostListener) {
@@ -228,6 +301,7 @@ public class SimHost(
}
}
+ _activeGuests.add(1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
}
@@ -238,9 +312,8 @@ public class SimHost(
}
}
+ _activeGuests.add(-1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
-
- _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override suspend fun fail() {
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e311cd21..830fc868 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -22,12 +22,14 @@
package org.opendc.compute.simulator
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -37,7 +39,8 @@ import org.opendc.compute.api.Image
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
-import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.time.Clock
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.sdk.toOtelClock
import java.util.UUID
+import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
- private lateinit var scope: TestCoroutineScope
- private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@BeforeEach
fun setUp() {
- scope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(scope)
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
@@ -74,72 +74,98 @@ internal class SimHostTest {
* Test overcommitting of resources by the hypervisor.
*/
@Test
- fun testOvercommitted() {
+ fun testOvercommitted() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
var requestedWork = 0L
var grantedWork = 0L
var overcommittedWork = 0L
- scope.launch {
- val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider())
- val duration = 5 * 60L
- val vmImageA = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
- ),
- )
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider())
+ val duration = 5 * 60L
+ val vmImageA = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
+ ),
)
)
- val vmImageB = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
- )
+ )
+ val vmImageB = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
)
)
)
+ )
- delay(5)
-
- val flavor = MockFlavor(2, 0)
- virtDriver.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> {
- requestedWork += event.requestedBurst
- grantedWork += event.grantedBurst
- overcommittedWork += event.overcommissionedBurst
- }
- }
+ val flavor = MockFlavor(2, 0)
+
+ // Setup metric reader
+ val reader = CoroutineMetricReader(
+ this, listOf(meterProvider as MetricProducer),
+ object : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong()
+ grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong()
+ overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong()
+ return CompletableResultCode.ofSuccess()
}
- .launchIn(this)
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ },
+ exportInterval = duration * 1000
+ )
+
+ coroutineScope {
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
+
+ suspendCancellableCoroutine<Unit> { cont ->
+ virtDriver.addListener(object : HostListener {
+ private var finished = 0
+
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED && ++finished == 2) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ }
}
- scope.advanceUntilIdle()
+ // Ensure last cycle is collected
+ delay(1000 * duration)
+ virtDriver.close()
+ reader.close()
assertAll(
- { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
{ assertEquals(4197600, requestedWork, "Requested work does not match") },
{ assertEquals(2157600, grantedWork, "Granted work does not match") },
{ assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
- { assertEquals(1200006, scope.currentTime) }
+ { assertEquals(1500001, currentTime) }
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 4f48bba7..40f50235 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,24 +22,19 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
@@ -138,7 +133,7 @@ public fun createTraceReader(
*/
public suspend fun withComputeService(
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
block: suspend CoroutineScope.(ComputeService) -> Unit
@@ -153,13 +148,15 @@ public suspend fun withComputeService(
def.meta,
coroutineContext,
clock,
+ meterProvider.get("opendc-compute-simulator"),
SimFairShareHypervisorProvider(),
def.powerModel
)
}
+ val schedulerMeter = meterProvider.get("opendc-compute")
val scheduler =
- ComputeService(coroutineContext, clock, meter, allocationPolicy)
+ ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
@@ -194,62 +191,13 @@ public suspend fun withMonitor(
monitor.reportHostStateChange(clock.millis(), host, newState)
}
})
-
- monitorJobs += host.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> monitor.reportHostSlice(
- clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.driver
- )
- }
- }
- .launchIn(this)
-
- monitorJobs += (host as SimHost).machine.powerDraw
- .onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(this)
}
val reader = CoroutineMetricReader(
- this, listOf(metricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
-
- val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
-
- monitor.reportProvisionerMetrics(
- clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- return CompletableResultCode.ofSuccess()
- }
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = 5 * 60 * 1000
+ this,
+ listOf(metricProducer),
+ ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
+ exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
)
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 2921daba..5fa77161 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -126,8 +126,6 @@ public abstract class Portfolio(name: String) : Experiment(name) {
.setClock(clock.toOtelClock())
.build()
- val meter = meterProvider.get("opendc-compute")
-
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -153,7 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
new file mode 100644
index 00000000..799de60f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import org.opendc.compute.service.driver.Host
+import java.time.Clock
+
+/**
+ * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ */
+public class ExperimentMetricExporter(
+ private val monitor: ExperimentMonitor,
+ private val clock: Clock,
+ private val hosts: Map<String, Host>
+) : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ reportHostMetrics(metricsByName)
+ reportProvisionerMetrics(metricsByName)
+ return CompletableResultCode.ofSuccess()
+ }
+
+ private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ val hostMetrics = mutableMapOf<String, HostMetrics>()
+ hosts.mapValuesTo(hostMetrics) { HostMetrics() }
+
+ mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
+ m.cpuDemand = v
+ }
+
+ mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
+ m.cpuUsage = v
+ }
+
+ mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v ->
+ m.powerDraw = v
+ }
+
+ mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
+ m.requestedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
+ m.grantedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
+ m.overcommissionedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ m.interferedBurst = v.toLong()
+ }
+
+ mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
+ m.numberOfDeployedImages = v.toInt()
+ }
+
+ for ((id, hostMetric) in hostMetrics) {
+ val host = hosts.getValue(id)
+ monitor.reportHostSlice(
+ clock.millis(),
+ hostMetric.requestedBurst,
+ hostMetric.grantedBurst,
+ hostMetric.overcommissionedBurst,
+ hostMetric.interferedBurst,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.numberOfDeployedImages,
+ host
+ )
+
+ monitor.reportPowerConsumption(host, hostMetric.powerDraw)
+ }
+ }
+
+ private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSummaryData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.sum)
+ }
+ }
+ }
+
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ val points = data?.longSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
+ val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ }
+
+ private class HostMetrics {
+ var requestedBurst: Long = 0
+ var grantedBurst: Long = 0
+ var overcommissionedBurst: Long = 0
+ var interferedBurst: Long = 0
+ var cpuUsage: Double = 0.0
+ var cpuDemand: Double = 0.0
+ var numberOfDeployedImages: Int = 0
+ var powerDraw: Double = 0.0
+ }
+
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index a57c8d78..5e75c890 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -26,12 +26,11 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
-import java.io.Closeable
/**
* A monitor watches the events of an experiment.
*/
-public interface ExperimentMonitor : Closeable {
+public interface ExperimentMonitor : AutoCloseable {
/**
* This method is invoked when the state of a VM changes.
*/
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index fd906f4d..02cfdc06 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
@@ -82,9 +81,7 @@ class CapelinIntegrationTest {
.setClock(clock.toOtelClock())
.build()
- val meter: Meter = meterProvider.get("opendc-compute")
-
- withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
@@ -142,9 +139,7 @@ class CapelinIntegrationTest {
.setClock(clock.toOtelClock())
.build()
- val meter: Meter = meterProvider.get("opendc-compute")
-
- withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
processTrace(
clock,
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 706efdc9..5b717ff7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -34,7 +34,6 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
-import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
@@ -226,7 +225,6 @@ public class RunnerCli : CliktCommand(name = "runner") {
.setClock(clock.toOtelClock())
.build()
val metricProducer = meterProvider as MetricProducer
- val meter: Meter = meterProvider.get("opendc-compute")
val operational = scenario.get("operational", Document::class.java)
val allocationPolicy =
@@ -254,7 +252,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
val environment = TopologyParser(topologies, topologyId)
val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7
- withComputeService(clock, meter, environment, allocationPolicy) { scheduler ->
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
val failureDomain = if (failureFrequency > 0) {
logger.debug { "ENABLING failures" }
createFailureDomain(
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index 937b6966..f2eea97c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -71,7 +71,7 @@ class SimResourceBenchmarks {
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingTest {
val provider = SimResourceSource(4200.0, clock, scheduler)
- val forwarder = SimResourceTransformer()
+ val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
return@runBlockingTest forwarder.consume(state.consumers[0])
}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
deleted file mode 100644
index 10f29f4e..00000000
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.utils.flow
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.channels.SendChannel
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.FlowCollector
-import kotlinx.coroutines.flow.consumeAsFlow
-
-/**
- * A [Flow] that can be used to emit events.
- */
-public interface EventFlow<T> : Flow<T> {
- /**
- * Emit the specified [event].
- */
- public fun emit(event: T)
-
- /**
- * Close the flow.
- */
- public fun close()
-}
-
-/**
- * Creates a new [EventFlow].
- */
-@Suppress("FunctionName")
-public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
-
-/**
- * Internal implementation of the [EventFlow] class.
- */
-@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
-private class EventFlowImpl<T> : EventFlow<T> {
- private var closed: Boolean = false
- private val subscribers = mutableListOf<SendChannel<T>>()
-
- override fun emit(event: T) {
- if (closed) {
- return
- }
-
- val it = subscribers.iterator()
- synchronized(this) {
- while (it.hasNext()) {
- val chan = it.next()
- if (chan.isClosedForSend) {
- it.remove()
- } else {
- chan.offer(event)
- }
- }
- }
- }
-
- override fun close() {
- synchronized(this) {
- closed = true
-
- for (chan in subscribers) {
- chan.close()
- }
-
- subscribers.clear()
- }
- }
-
- @InternalCoroutinesApi
- override suspend fun collect(collector: FlowCollector<T>) {
- val channel: Channel<T>
- synchronized(this) {
- if (closed) {
- return
- }
-
- channel = Channel(Channel.UNLIMITED)
- subscribers.add(channel)
- }
- try {
- channel.consumeAsFlow().collect(collector)
- } finally {
- channel.close()
- }
- }
-
- override fun toString(): String = "EventFlow"
-}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
index e06e5eb3..46c0d835 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
@@ -78,6 +78,7 @@ internal class WorkflowServiceIntegrationTest {
def.meta,
coroutineContext,
clock,
+ MeterProvider.noop().get("opendc-compute-simulator"),
SimSpaceSharedHypervisorProvider()
)
}