diff options
14 files changed, 368 insertions, 348 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index c3c39572..bed15dfd 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -22,7 +22,6 @@ package org.opendc.compute.service.driver -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.Server import java.util.* @@ -56,11 +55,6 @@ public interface Host { public val meta: Map<String, Any> /** - * The events emitted by the driver. - */ - public val events: Flow<HostEvent> - - /** * Determine whether the specified [instance][server] can still fit on this host. */ public fun canFit(server: Server): Boolean diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt deleted file mode 100644 index 97350679..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.driver - -/** - * An event that is emitted by a [Host]. - */ -public sealed class HostEvent { - /** - * The driver that emitted the event. - */ - public abstract val driver: Host - - /** - * This event is emitted when the number of active servers on the server managed by this driver is updated. - * - * @property driver The driver that emitted the event. - * @property numberOfActiveServers The number of active servers. - * @property availableMemory The available memory, in MB. - */ - public data class VmsUpdated( - override val driver: Host, - public val numberOfActiveServers: Int, - public val availableMemory: Long - ) : HostEvent() - - /** - * This event is emitted when a slice is finished. - * - * @property driver The driver that emitted the event. - * @property requestedBurst The total requested CPU time (can be above capacity). - * @property grantedBurst The actual total granted capacity, which might be lower than the requested burst due to - * the hypervisor being interrupted during a slice. - * @property overcommissionedBurst The CPU time that the hypervisor could not grant to the virtual machine since - * it did not have the capacity. - * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance - * interference. - * @property cpuUsage CPU use in megahertz. - * @property cpuDemand CPU demand in megahertz. - * @property numberOfDeployedImages The number of images deployed on this hypervisor. - */ - public data class SliceFinished( - override val driver: Host, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val numberOfDeployedImages: Int, - ) : HostEvent() -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index 1ad3f1c6..3bf8a114 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -38,5 +38,6 @@ dependencies { implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk")) testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 89784803..6d81aa7d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,8 +22,9 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server @@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.MachinePowerModel import org.opendc.simulator.failures.FailureDomain -import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -52,6 +52,7 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, clock: Clock, + meter: Meter, hypervisor: SimHypervisorProvider, powerModel: MachinePowerModel = ConstantPowerModel(0.0), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), @@ -66,10 +67,6 @@ public class SimHost( */ private val logger = KotlinLogging.logger {} - override val events: Flow<HostEvent> - get() = _events - internal val _events = EventFlow<HostEvent>() - /** * The event listeners registered with this host. */ @@ -99,18 +96,13 @@ public class SimHost( cpuUsage: Double, cpuDemand: Double ) { - _events.emit( - HostEvent.SliceFinished( - this@SimHost, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - guests.size - ) - ) + _cpuWork.record(requestedWork.toDouble()) + _cpuWorkGranted.record(grantedWork.toDouble()) + _cpuWorkOvercommit.record(overcommittedWork.toDouble()) + _cpuWorkInterference.record(interferedWork.toDouble()) + _cpuUsage.record(cpuUsage) + _cpuDemand.record(cpuDemand) + _cpuPower.record(machine.powerDraw.value) } } ) @@ -132,6 +124,87 @@ public class SimHost( override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) + /** + * The number of guests on the host. + */ + private val _guests = meter.longUpDownCounterBuilder("guests.total") + .setDescription("Number of guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The number of active guests on the host. + */ + private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") + .setDescription("Number of active guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU usage on the host. + */ + private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") + .setDescription("The amount of CPU resources used by the host") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU demand on the host. + */ + private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") + .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") + .setDescription("The amount of power used by the CPU") + .setUnit("W") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") + .setDescription("The amount of work supplied to the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work actually performed by the CPU. + */ + private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") + .setDescription("The amount of work performed by the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to overcommitting resource. + */ + private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") + .setDescription("The amount of work not performed by the CPU due to overcommitment") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to interference. + */ + private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") + .setDescription("The amount of work not performed by the CPU due to interference") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + init { // Launch hypervisor onto machine scope.launch { @@ -166,12 +239,11 @@ public class SimHost( require(canFit(server)) { "Server does not fit" } val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) guests[server] = guest + _guests.add(1) if (start) { guest.start() } - - _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override fun contains(server: Server): Boolean { @@ -191,6 +263,7 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return guest.terminate() + _guests.add(-1) } override fun addListener(listener: HostListener) { @@ -228,6 +301,7 @@ public class SimHost( } } + _activeGuests.add(1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } @@ -238,9 +312,8 @@ public class SimHost( } } + _activeGuests.add(-1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - - _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override suspend fun fail() { diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index e311cd21..830fc868 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,12 +22,14 @@ package org.opendc.compute.simulator -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -37,7 +39,8 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher -import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.sdk.toOtelClock import java.util.UUID +import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( @@ -74,72 +74,98 @@ internal class SimHostTest { * Test overcommitting of resources by the hypervisor. */ @Test - fun testOvercommitted() { + fun testOvercommitted() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) var requestedWork = 0L var grantedWork = 0L var overcommittedWork = 0L - scope.launch { - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider()) - val duration = 5 * 60L - val vmImageA = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) - ), - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val duration = 5 * 60L + val vmImageA = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) + ), ) ) - val vmImageB = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) - ) + ) + val vmImageB = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) ) ) ) + ) - delay(5) - - val flavor = MockFlavor(2, 0) - virtDriver.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> { - requestedWork += event.requestedBurst - grantedWork += event.grantedBurst - overcommittedWork += event.overcommissionedBurst - } - } + val flavor = MockFlavor(2, 0) + + // Setup metric reader + val reader = CoroutineMetricReader( + this, listOf(meterProvider as MetricProducer), + object : MetricExporter { + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() + grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() + overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() + return CompletableResultCode.ofSuccess() } - .launchIn(this) + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + }, + exportInterval = duration * 1000 + ) + + coroutineScope { launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } + + suspendCancellableCoroutine<Unit> { cont -> + virtDriver.addListener(object : HostListener { + private var finished = 0 + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 2) { + cont.resume(Unit) + } + } + }) + } } - scope.advanceUntilIdle() + // Ensure last cycle is collected + delay(1000 * duration) + virtDriver.close() + reader.close() assertAll( - { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, { assertEquals(4197600, requestedWork, "Requested work does not match") }, { assertEquals(2157600, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, - { assertEquals(1200006, scope.currentTime) } + { assertEquals(1500001, currentTime) } ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 4f48bba7..40f50235 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,24 +22,19 @@ package org.opendc.experiments.capelin -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader @@ -138,7 +133,7 @@ public fun createTraceReader( */ public suspend fun withComputeService( clock: Clock, - meter: Meter, + meterProvider: MeterProvider, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, block: suspend CoroutineScope.(ComputeService) -> Unit @@ -153,13 +148,15 @@ public suspend fun withComputeService( def.meta, coroutineContext, clock, + meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), def.powerModel ) } + val schedulerMeter = meterProvider.get("opendc-compute") val scheduler = - ComputeService(coroutineContext, clock, meter, allocationPolicy) + ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) for (host in hosts) { scheduler.addHost(host) @@ -194,62 +191,13 @@ public suspend fun withMonitor( monitor.reportHostStateChange(clock.millis(), host, newState) } }) - - monitorJobs += host.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> monitor.reportHostSlice( - clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.driver - ) - } - } - .launchIn(this) - - monitorJobs += (host as SimHost).machine.powerDraw - .onEach { monitor.reportPowerConsumption(host, it) } - .launchIn(this) } val reader = CoroutineMetricReader( - this, listOf(metricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - - val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - - monitor.reportProvisionerMetrics( - clock.millis(), - hosts, - availableHosts, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - return CompletableResultCode.ofSuccess() - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, - exportInterval = 5 * 60 * 1000 + this, + listOf(metricProducer), + ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), + exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ ) try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 2921daba..5fa77161 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -126,8 +126,6 @@ public abstract class Portfolio(name: String) : Experiment(name) { .setClock(clock.toOtelClock()) .build() - val meter = meterProvider.get("opendc-compute") - val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } @@ -153,7 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt new file mode 100644 index 00000000..799de60f --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import org.opendc.compute.service.driver.Host +import java.time.Clock + +/** + * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. + */ +public class ExperimentMetricExporter( + private val monitor: ExperimentMonitor, + private val clock: Clock, + private val hosts: Map<String, Host> +) : MetricExporter { + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + reportHostMetrics(metricsByName) + reportProvisionerMetrics(metricsByName) + return CompletableResultCode.ofSuccess() + } + + private fun reportHostMetrics(metrics: Map<String, MetricData>) { + val hostMetrics = mutableMapOf<String, HostMetrics>() + hosts.mapValuesTo(hostMetrics) { HostMetrics() } + + mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> + m.cpuDemand = v + } + + mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> + m.cpuUsage = v + } + + mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v -> + m.powerDraw = v + } + + mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v -> + m.requestedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v -> + m.grantedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> + m.overcommissionedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> + m.interferedBurst = v.toLong() + } + + mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> + m.numberOfDeployedImages = v.toInt() + } + + for ((id, hostMetric) in hostMetrics) { + val host = hosts.getValue(id) + monitor.reportHostSlice( + clock.millis(), + hostMetric.requestedBurst, + hostMetric.grantedBurst, + hostMetric.overcommissionedBurst, + hostMetric.interferedBurst, + hostMetric.cpuUsage, + hostMetric.cpuDemand, + hostMetric.numberOfDeployedImages, + host + ) + + monitor.reportPowerConsumption(host, hostMetric.powerDraw) + } + } + + private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSummaryData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.sum) + } + } + } + + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) { + val points = data?.longSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) { + val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + + monitor.reportProvisionerMetrics( + clock.millis(), + hosts, + availableHosts, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + } + + private class HostMetrics { + var requestedBurst: Long = 0 + var grantedBurst: Long = 0 + var overcommissionedBurst: Long = 0 + var interferedBurst: Long = 0 + var cpuUsage: Double = 0.0 + var cpuDemand: Double = 0.0 + var numberOfDeployedImages: Int = 0 + var powerDraw: Double = 0.0 + } + + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index a57c8d78..5e75c890 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -26,12 +26,11 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState -import java.io.Closeable /** * A monitor watches the events of an experiment. */ -public interface ExperimentMonitor : Closeable { +public interface ExperimentMonitor : AutoCloseable { /** * This method is invoked when the state of a VM changes. */ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index fd906f4d..02cfdc06 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer @@ -82,9 +81,7 @@ class CapelinIntegrationTest { .setClock(clock.toOtelClock()) .build() - val meter: Meter = meterProvider.get("opendc-compute") - - withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -142,9 +139,7 @@ class CapelinIntegrationTest { .setClock(clock.toOtelClock()) .build() - val meter: Meter = meterProvider.get("opendc-compute") - - withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { processTrace( clock, diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 706efdc9..5b717ff7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -34,7 +34,6 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters -import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer @@ -226,7 +225,6 @@ public class RunnerCli : CliktCommand(name = "runner") { .setClock(clock.toOtelClock()) .build() val metricProducer = meterProvider as MetricProducer - val meter: Meter = meterProvider.get("opendc-compute") val operational = scenario.get("operational", Document::class.java) val allocationPolicy = @@ -254,7 +252,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val environment = TopologyParser(topologies, topologyId) val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7 - withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> val failureDomain = if (failureFrequency > 0) { logger.debug { "ENABLING failures" } createFailureDomain( diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index 937b6966..f2eea97c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -71,7 +71,7 @@ class SimResourceBenchmarks { fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingTest { val provider = SimResourceSource(4200.0, clock, scheduler) - val forwarder = SimResourceTransformer() + val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) return@runBlockingTest forwarder.consume(state.consumers[0]) } diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt deleted file mode 100644 index 10f29f4e..00000000 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils.flow - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.consumeAsFlow - -/** - * A [Flow] that can be used to emit events. - */ -public interface EventFlow<T> : Flow<T> { - /** - * Emit the specified [event]. - */ - public fun emit(event: T) - - /** - * Close the flow. - */ - public fun close() -} - -/** - * Creates a new [EventFlow]. - */ -@Suppress("FunctionName") -public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() - -/** - * Internal implementation of the [EventFlow] class. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -private class EventFlowImpl<T> : EventFlow<T> { - private var closed: Boolean = false - private val subscribers = mutableListOf<SendChannel<T>>() - - override fun emit(event: T) { - if (closed) { - return - } - - val it = subscribers.iterator() - synchronized(this) { - while (it.hasNext()) { - val chan = it.next() - if (chan.isClosedForSend) { - it.remove() - } else { - chan.offer(event) - } - } - } - } - - override fun close() { - synchronized(this) { - closed = true - - for (chan in subscribers) { - chan.close() - } - - subscribers.clear() - } - } - - @InternalCoroutinesApi - override suspend fun collect(collector: FlowCollector<T>) { - val channel: Channel<T> - synchronized(this) { - if (closed) { - return - } - - channel = Channel(Channel.UNLIMITED) - subscribers.add(channel) - } - try { - channel.consumeAsFlow().collect(collector) - } finally { - channel.close() - } - } - - override fun toString(): String = "EventFlow" -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index e06e5eb3..46c0d835 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -78,6 +78,7 @@ internal class WorkflowServiceIntegrationTest { def.meta, coroutineContext, clock, + MeterProvider.noop().get("opendc-compute-simulator"), SimSpaceSharedHypervisorProvider() ) } |
