From 67920d2e83658b92a39e25956999c2ed61738ade Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 3 May 2022 14:39:57 +0200 Subject: refactor(compute): Expose CPU and system stats via Host interface This change updates the `Host` interface to directly expose CPU and system stats to be used by components that interface with the `Host` interface. Previously, this would require the user to interact with the OpenTelemetry SDK. Although that is still possible for more advanced usage cases, users can use the following methods to easily access common host and guest statistics. --- .../org/opendc/compute/service/driver/Host.kt | 35 ++++++ .../service/driver/telemetry/GuestCpuStats.kt | 44 ++++++++ .../service/driver/telemetry/GuestSystemStats.kt | 39 +++++++ .../service/driver/telemetry/HostCpuStats.kt | 47 ++++++++ .../service/driver/telemetry/HostSystemStats.kt | 51 +++++++++ .../kotlin/org/opendc/compute/simulator/SimHost.kt | 90 +++++++++++++--- .../org/opendc/compute/simulator/internal/Guest.kt | 47 +++++++- .../org/opendc/compute/simulator/SimHostTest.kt | 120 +++++---------------- 8 files changed, 363 insertions(+), 110 deletions(-) create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index bed15dfd..67b144d9 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -23,6 +23,10 @@ package org.opendc.compute.service.driver import org.opendc.compute.api.Server +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats +import org.opendc.compute.service.driver.telemetry.HostCpuStats +import org.opendc.compute.service.driver.telemetry.HostSystemStats import java.util.* /** @@ -54,6 +58,11 @@ public interface Host { */ public val meta: Map + /** + * The [Server] instances known to the host. + */ + public val instances: Set + /** * Determine whether the specified [instance][server] can still fit on this host. */ @@ -100,4 +109,30 @@ public interface Host { * Remove a [HostListener] from this host. */ public fun removeListener(listener: HostListener) + + /** + * Query the system statistics of the host. + */ + public fun getSystemStats(): HostSystemStats + + /** + * Query the system statistics of a [Server] that is located on this host. + * + * @param server The [Server] to obtain the system statistics of. + * @throws IllegalArgumentException if the server is not present on the host. + */ + public fun getSystemStats(server: Server): GuestSystemStats + + /** + * Query the CPU statistics of the host. + */ + public fun getCpuStats(): HostCpuStats + + /** + * Query the CPU statistics of a [Server] that is located on this host. + * + * @param server The [Server] to obtain the CPU statistics of. + * @throws IllegalArgumentException if the server is not present on the host. + */ + public fun getCpuStats(server: Server): GuestCpuStats } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt new file mode 100644 index 00000000..b5d63471 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +/** + * Statistics about the CPUs of a guest. + * + * @property activeTime The cumulative time (in seconds) that the CPUs of the guest were actively running. + * @property idleTime The cumulative time (in seconds) the CPUs of the guest were idle. + * @property stealTime The cumulative CPU time (in seconds) that the guest was ready to run, but not granted time by the host. + * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference with other machines. + * @property capacity The available CPU capacity of the guest (in MHz). + * @property usage Amount of CPU resources (in MHz) actually used by the guest. + * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public data class GuestCpuStats( + val activeTime: Long, + val idleTime: Long, + val stealTime: Long, + val lostTime: Long, + val capacity: Double, + val usage: Double, + val utilization: Double +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt new file mode 100644 index 00000000..b3958473 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +import java.time.Duration +import java.time.Instant + +/** + * System-level statistics of a guest. + * + * @property uptime The cumulative uptime of the guest since last boot (in ms). + * @property downtime The cumulative downtime of the guest since last boot (in ms). + * @property bootTime The time at which the guest booted. + */ +public data class GuestSystemStats( + val uptime: Duration, + val downtime: Duration, + val bootTime: Instant +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt new file mode 100644 index 00000000..55e23c0e --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +/** + * Statistics about the CPUs of a host. + * + * @property activeTime The cumulative time (in seconds) that the CPUs of the host were actively running. + * @property idleTime The cumulative time (in seconds) the CPUs of the host were idle. + * @property stealTime The cumulative CPU time (in seconds) that virtual machines were ready to run, but were not able to. + * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference between virtual machines. + * @property capacity The available CPU capacity of the host (in MHz). + * @property demand Amount of CPU resources (in MHz) the guests would use if there were no CPU contention or CPU + * limits. + * @property usage Amount of CPU resources (in MHz) actually used by the host. + * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public data class HostCpuStats( + val activeTime: Long, + val idleTime: Long, + val stealTime: Long, + val lostTime: Long, + val capacity: Double, + val demand: Double, + val usage: Double, + val utilization: Double +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt new file mode 100644 index 00000000..1c07023f --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +import java.time.Duration +import java.time.Instant + +/** + * System-level statistics of a host. + + * @property uptime The cumulative uptime of the host since last boot (in ms). + * @property downtime The cumulative downtime of the host since last boot (in ms). + * @property bootTime The time at which the server started. + * @property powerUsage Instantaneous power usage of the system (in W). + * @property energyUsage The cumulative energy usage of the system (in J). + * @property guestsTerminated The number of guests that are in a terminated state. + * @property guestsRunning The number of guests that are in a running state. + * @property guestsError The number of guests that are in an error state. + * @property guestsInvalid The number of guests that are in an unknown state. + */ +public data class HostSystemStats( + val uptime: Duration, + val downtime: Duration, + val bootTime: Instant, + val powerUsage: Double, + val energyUsage: Double, + val guestsTerminated: Int, + val guestsRunning: Int, + val guestsError: Int, + val guestsInvalid: Int, +) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 4eb6392e..323ae4fe 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -29,11 +29,14 @@ import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.api.metrics.ObservableDoubleMeasurement import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* -import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats +import org.opendc.compute.service.driver.telemetry.HostCpuStats +import org.opendc.compute.service.driver.telemetry.HostSystemStats import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* @@ -49,6 +52,8 @@ import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.CoroutineContext @@ -80,11 +85,6 @@ public class SimHost( */ private val clock = engine.clock - /** - * The logger instance of this server. - */ - private val logger = KotlinLogging.logger {} - /** * The [Meter] to track metrics of the simulated host. */ @@ -112,6 +112,9 @@ public class SimHost( private val guests = HashMap() private val _guests = mutableListOf() + override val instances: Set + get() = guests.keys + override val state: HostState get() = _state private var _state: HostState = HostState.DOWN @@ -249,6 +252,68 @@ public class SimHost( machine.cancel() } + override fun getSystemStats(): HostSystemStats { + updateUptime() + + var terminated = 0 + var running = 0 + var error = 0 + var invalid = 0 + + val guests = _guests.listIterator() + for (guest in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + ServerState.DELETED -> { + // Remove guests that have been deleted + this.guests.remove(guest.server) + guests.remove() + } + else -> invalid++ + } + } + + return HostSystemStats( + Duration.ofMillis(_uptime), + Duration.ofMillis(_downtime), + Instant.ofEpochMilli(_bootTime), + machine.powerUsage, + machine.energyUsage, + terminated, + running, + error, + invalid + ) + } + + override fun getSystemStats(server: Server): GuestSystemStats { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + return guest.getSystemStats() + } + + override fun getCpuStats(): HostCpuStats { + val counters = hypervisor.counters + counters.flush() + + return HostCpuStats( + counters.cpuActiveTime / 1000L, + counters.cpuIdleTime / 1000L, + counters.cpuStealTime / 1000L, + counters.cpuLostTime / 1000L, + hypervisor.cpuCapacity, + hypervisor.cpuDemand, + hypervisor.cpuUsage, + hypervisor.cpuUsage / _cpuLimit + ) + } + + override fun getCpuStats(server: Server): GuestCpuStats { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + return guest.getCpuStats() + } + override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean { @@ -417,13 +482,12 @@ public class SimHost( * Helper function to track the CPU time of a machine. */ private fun collectCpuTime(result: ObservableLongMeasurement) { - val counters = hypervisor.counters - counters.flush() + val stats = getCpuStats() - result.record(counters.cpuActiveTime / 1000L, _activeState) - result.record(counters.cpuIdleTime / 1000L, _idleState) - result.record(counters.cpuStealTime / 1000L, _stealState) - result.record(counters.cpuLostTime / 1000L, _lostState) + result.record(stats.activeTime, _activeState) + result.record(stats.idleTime, _idleState) + result.record(stats.stealTime, _stealState) + result.record(stats.lostTime, _lostState) val guests = _guests for (i in guests.indices) { @@ -450,7 +514,7 @@ public class SimHost( val guests = _guests for (i in guests.indices) { - guests[i].updateUptime(duration) + guests[i].updateUptime() } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index bb378ee3..0d4c550d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -32,6 +32,8 @@ import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper import org.opendc.simulator.compute.kernel.SimHypervisor @@ -39,6 +41,8 @@ import org.opendc.simulator.compute.kernel.SimVirtualMachine import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock +import java.time.Duration +import java.time.Instant import kotlin.coroutines.CoroutineContext /** @@ -145,6 +149,37 @@ internal class Guest( doStart() } + /** + * Obtain the system statistics of this guest. + */ + fun getSystemStats(): GuestSystemStats { + updateUptime() + + return GuestSystemStats( + Duration.ofMillis(_uptime), + Duration.ofMillis(_downtime), + Instant.ofEpochMilli(_bootTime) + ) + } + + /** + * Obtain the CPU statistics of this guest. + */ + fun getCpuStats(): GuestCpuStats { + val counters = machine.counters + counters.flush() + + return GuestCpuStats( + counters.cpuActiveTime / 1000L, + counters.cpuIdleTime / 1000L, + counters.cpuStealTime / 1000L, + counters.cpuLostTime / 1000L, + machine.cpuCapacity, + machine.cpuUsage, + machine.cpuUsage / _cpuLimit + ) + } + /** * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. */ @@ -209,6 +244,8 @@ internal class Guest( * This method is invoked when the guest stopped. */ private fun onStop(target: ServerState) { + updateUptime() + state = target listener.onStop(this) } @@ -224,10 +261,16 @@ internal class Guest( .put(STATE_KEY, "down") .build() + private var _lastReport = clock.millis() + /** * Helper function to track the uptime and downtime of the guest. */ - fun updateUptime(duration: Long) { + fun updateUptime() { + val now = clock.millis() + val duration = now - _lastReport + _lastReport = now + if (state == ServerState.RUNNING) { _uptime += duration } else if (state == ServerState.ERROR) { @@ -239,6 +282,8 @@ internal class Guest( * Helper function to track the uptime of the guest. */ fun collectUptime(result: ObservableLongMeasurement) { + updateUptime() + result.record(_uptime, _upState) result.record(_downtime, _downState) } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index f0325023..67689dc8 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,8 +22,7 @@ package org.opendc.compute.simulator -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -42,13 +41,6 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine -import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.HOST_ID -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Duration import java.util.* import kotlin.coroutines.resume @@ -73,45 +65,16 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var idleTime = 0L - var activeTime = 0L - var stealTime = 0L - - val hostId = UUID.randomUUID() - val hostResource = Resource.builder() - .put(HOST_ID, hostId.toString()) - .build() - - // Setup metric reader val duration = 5 * 60L - val reader = CoroutineMetricReader( - this, - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - stealTime += reader.cpuStealTime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - - val meterProvider = SdkMeterProvider - .builder() - .setResource(hostResource) - .setClock(clock.toOtelClock()) - .registerMetricReader(reader) - .build() - val engine = FlowEngine(coroutineContext, clock) - val virtDriver = SimHost( - uid = hostId, + val host = SimHost( + uid = UUID.randomUUID(), name = "test", model = machineModel, meta = emptyMap(), coroutineContext, engine, - meterProvider, + MeterProvider.noop(), SimFairShareHypervisorProvider() ) val vmImageA = MockImage( @@ -150,11 +113,11 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) coroutineScope { - launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } - launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } + launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } + launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } suspendCancellableCoroutine { cont -> - virtDriver.addListener(object : HostListener { + host.addListener(object : HostListener { private var finished = 0 override fun onStateChanged(host: Host, server: Server, newState: ServerState) { @@ -168,13 +131,14 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) - virtDriver.close() - meterProvider.close() + host.close() + + val cpuStats = host.getCpuStats() assertAll( - { assertEquals(658, activeTime, "Active time does not match") }, - { assertEquals(2341, idleTime, "Idle time does not match") }, - { assertEquals(637, stealTime, "Steal time does not match") }, + { assertEquals(658, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(2341, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(637, cpuStats.stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } @@ -184,54 +148,16 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var activeTime = 0L - var idleTime = 0L - var uptime = 0L - var downtime = 0L - var guestUptime = 0L - var guestDowntime = 0L - - val hostId = UUID.randomUUID() - val hostResource = Resource.builder() - .put(HOST_ID, hostId.toString()) - .build() - - // Setup metric reader val duration = 5 * 60L - val reader = CoroutineMetricReader( - this, - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - uptime += reader.uptime - downtime += reader.downtime - } - - override fun record(reader: ServerTableReader) { - guestUptime += reader.uptime - guestDowntime += reader.downtime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - - val meterProvider = SdkMeterProvider - .builder() - .setResource(hostResource) - .setClock(clock.toOtelClock()) - .registerMetricReader(reader) - .build() - val engine = FlowEngine(coroutineContext, clock) val host = SimHost( - uid = hostId, + uid = UUID.randomUUID(), name = "test", model = machineModel, meta = emptyMap(), coroutineContext, engine, - meterProvider, + MeterProvider.noop(), SimFairShareHypervisorProvider() ) val image = MockImage( @@ -275,15 +201,17 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) - meterProvider.close() + val cpuStats = host.getCpuStats() + val sysStats = host.getSystemStats() + val guestSysStats = host.getSystemStats(server) assertAll( - { assertEquals(1775, idleTime, "Idle time does not match") }, - { assertEquals(624, activeTime, "Active time does not match") }, - { assertEquals(900001, uptime, "Uptime does not match") }, - { assertEquals(300000, downtime, "Downtime does not match") }, - { assertEquals(900000, guestUptime, "Guest uptime does not match") }, - { assertEquals(300000, guestDowntime, "Guest downtime does not match") }, + { assertEquals(1775, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(624, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(900001, sysStats.uptime.toMillis(), "Uptime does not match") }, + { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") }, + { assertEquals(900001, guestSysStats.uptime.toMillis(), "Guest uptime does not match") }, + { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") }, ) } -- cgit v1.2.3 From 4883d8eb0c6cae82153aaf5f12561014d08cdc41 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 4 May 2022 14:41:39 +0200 Subject: feat(compute): Add support for looking up hosts This change adds the ability for users to lookup the `Host` on which a `Server` is hosted (if any). This allows the user to potentially interact with the `Host` directly, e.g., in order to obtain advanced metrics. --- .../kotlin/org/opendc/compute/service/ComputeService.kt | 13 +++++++------ .../opendc/compute/service/internal/ComputeServiceImpl.kt | 10 ++++++---- .../kotlin/org/opendc/compute/service/ComputeServiceTest.kt | 9 ++------- 3 files changed, 15 insertions(+), 17 deletions(-) (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 2a1fbaa0..e8cd9e54 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -25,6 +25,7 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient +import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler @@ -37,15 +38,10 @@ import kotlin.coroutines.CoroutineContext */ public interface ComputeService : AutoCloseable { /** - * The hosts that are used by the compute service. + * The hosts that are registered with the "compute" service. */ public val hosts: Set - /** - * The number of hosts available in the system. - */ - public val hostCount: Int - /** * Create a new [ComputeClient] to control the compute service. */ @@ -66,6 +62,11 @@ public interface ComputeService : AutoCloseable { */ public override fun close() + /** + * Lookup the [Host] that currently hosts the specified [server]. + */ + public fun lookupHost(server: Server): Host? + public companion object { /** * Construct a new [ComputeService] implementation. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 144b6573..b7a47a06 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -154,9 +154,6 @@ internal class ComputeServiceImpl( override val hosts: Set get() = hostToView.keys - override val hostCount: Int - get() = hostToView.size - init { val upState = Attributes.of(AttributeKey.stringKey("state"), "up") val downState = Attributes.of(AttributeKey.stringKey("state"), "down") @@ -165,7 +162,7 @@ internal class ComputeServiceImpl( .setDescription("Number of hosts registered with the scheduler") .setUnit("1") .buildWithCallback { result -> - val total = hostCount + val total = hosts.size val available = availableHosts.size.toLong() result.record(available, upState) @@ -322,6 +319,11 @@ internal class ComputeServiceImpl( } } + override fun lookupHost(server: Server): Host? { + val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" } + return internal.host + } + override fun close() { scope.cancel() } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 7b8d0fe2..eb106817 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.service import io.mockk.* import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull @@ -48,10 +47,9 @@ import java.util.* /** * Test suite for the [ComputeService] interface. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class ComputeServiceTest { - lateinit var scope: SimulationCoroutineScope - lateinit var service: ComputeService + private lateinit var scope: SimulationCoroutineScope + private lateinit var service: ComputeService @BeforeEach fun setUp() { @@ -128,14 +126,12 @@ internal class ComputeServiceTest { every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP - assertEquals(0, service.hostCount) assertEquals(emptySet(), service.hosts) service.addHost(host) verify(exactly = 1) { host.addListener(any()) } - assertEquals(1, service.hostCount) assertEquals(1, service.hosts.size) service.removeHost(host) @@ -150,7 +146,6 @@ internal class ComputeServiceTest { every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN - assertEquals(0, service.hostCount) assertEquals(emptySet(), service.hosts) service.addHost(host) -- cgit v1.2.3 From 564911a2458b3c54834d5cbfed91f502e9856566 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 4 May 2022 14:43:17 +0200 Subject: refactor(compute): Directly expose scheduler stats to user This change updates the `ComputeService` interface to directly expose statistics about the scheduler to the user, such that they do not necessarily have to interact with OpenTelemetry to obtain these values. --- .../main/kotlin/org/opendc/compute/api/Server.kt | 7 ++++ .../org/opendc/compute/service/ComputeService.kt | 6 +++ .../compute/service/internal/ClientServer.kt | 5 +++ .../compute/service/internal/ComputeServiceImpl.kt | 33 +++++++++++++++- .../compute/service/internal/InternalServer.kt | 3 +- .../compute/service/telemetry/SchedulerStats.kt | 46 ++++++++++++++++++++++ .../opendc/compute/service/InternalServerTest.kt | 2 - .../org/opendc/compute/simulator/SimHostTest.kt | 3 ++ 8 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt index b508a9f8..64b73d0b 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt @@ -22,6 +22,8 @@ package org.opendc.compute.api +import java.time.Instant + /** * A stateful object representing a server instance that is running on some physical or virtual machine. */ @@ -41,6 +43,11 @@ public interface Server : Resource { */ public val state: ServerState + /** + * The most recent moment in time when the server was launched. + */ + public val launchedAt: Instant? + /** * Request the server to be started. * diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index e8cd9e54..3a6baaa1 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -29,6 +29,7 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -67,6 +68,11 @@ public interface ComputeService : AutoCloseable { */ public fun lookupHost(server: Server): Host? + /** + * Collect the statistics about the scheduler component of this service. + */ + public fun getSchedulerStats(): SchedulerStats + public companion object { /** * Construct a new [ComputeService] implementation. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index f2929bf3..45775640 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -27,6 +27,7 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher +import java.time.Instant import java.util.* /** @@ -55,6 +56,9 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche override var state: ServerState = delegate.state private set + override var launchedAt: Instant? = null + private set + override suspend fun start() { delegate.start() refresh() @@ -95,6 +99,7 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche labels = delegate.labels meta = delegate.meta state = delegate.state + launchedAt = delegate.launchedAt } override fun onStateChanged(server: Server, newState: ServerState) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index b7a47a06..e8664e5c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -36,8 +36,10 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -126,6 +128,9 @@ internal class ComputeServiceImpl( private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success") private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure") private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error") + private var _attemptsSuccess = 0L + private var _attemptsFailure = 0L + private var _attemptsError = 0L /** * The response time of the service. @@ -145,6 +150,8 @@ internal class ComputeServiceImpl( .build() private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending") private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") + private var _serversPending = 0 + private var _serversActive = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. @@ -328,13 +335,26 @@ internal class ComputeServiceImpl( scope.cancel() } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats( + availableHosts.size, + hostToView.size - availableHosts.size, + _attemptsSuccess, + _attemptsFailure, + _attemptsError, + _serversPending, + _serversActive + ) + } + internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } val now = clock.millis() val request = SchedulingRequest(server, now) - server.lastProvisioningTimestamp = now + server.launchedAt = Instant.ofEpochMilli(now) queue.add(request) + _serversPending++ _servers.add(1, _serversPendingAttr) requestSchedulingCycle() return request @@ -373,6 +393,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) continue } @@ -385,7 +406,9 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) + _attemptsFailure++ _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr) logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } @@ -401,6 +424,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) _schedulingLatency.record(now - request.submitTime, server.attributes) @@ -419,6 +443,8 @@ internal class ComputeServiceImpl( activeServers[server] = host _servers.add(1, _serversActiveAttr) + _serversActive++ + _attemptsSuccess++ _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr) } catch (e: Throwable) { logger.error(e) { "Failed to deploy VM" } @@ -427,6 +453,7 @@ internal class ComputeServiceImpl( hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + _attemptsError++ _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr) } } @@ -483,6 +510,7 @@ internal class ComputeServiceImpl( logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } if (activeServers.remove(server) != null) { + _serversActive-- _servers.add(-1, _serversActiveAttr) } @@ -505,7 +533,8 @@ internal class ComputeServiceImpl( */ private fun collectProvisionTime(result: ObservableLongMeasurement) { for ((_, server) in servers) { - result.record(server.lastProvisioningTimestamp, server.attributes) + val launchedAt = server.launchedAt ?: continue + result.record(launchedAt.toEpochMilli(), server.attributes) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index f1b92c66..d2a2d896 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -28,6 +28,7 @@ import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host +import java.time.Instant import java.util.UUID /** @@ -75,7 +76,7 @@ internal class InternalServer( /** * The most recent timestamp when the server entered a provisioning state. */ - @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE + override var launchedAt: Instant? = null /** * The current scheduling request. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..4dc70286 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.telemetry + +import org.opendc.compute.service.ComputeService + +/** + * Statistics about the scheduling component of the [ComputeService]. + * + * @property hostsAvailable The number of hosts currently available for scheduling. + * @property hostsUnavailable The number of hosts unavailable for scheduling. + * @property attemptsSuccess Scheduling attempts that resulted into an allocation onto a host. + * @property attemptsFailure The number of failed scheduling attempt due to insufficient capacity at the moment. + * @property attemptsError The number of scheduling attempts that failed due to system error. + * @property serversPending The number of servers that are pending to be scheduled. + * @property serversActive The number of servers that are currently managed by the service and running. + */ +public data class SchedulerStats( + val hostsAvailable: Int, + val hostsUnavailable: Int, + val attemptsSuccess: Long, + val attemptsFailure: Long, + val attemptsError: Long, + val serversPending: Int, + val serversActive: Int +) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index dfd3bc67..4e5a37ae 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -23,7 +23,6 @@ package org.opendc.compute.service import io.mockk.* -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test @@ -41,7 +40,6 @@ import java.util.* /** * Test suite for the [InternalServer] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) class InternalServerTest { @Test fun testEquality() { diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 67689dc8..fd54ad1d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -41,6 +41,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import java.time.Instant import java.util.* import kotlin.coroutines.resume @@ -260,6 +261,8 @@ internal class SimHostTest { override val state: ServerState = ServerState.TERMINATED + override val launchedAt: Instant? = null + override suspend fun start() {} override suspend fun stop() {} -- cgit v1.2.3 From 7981e9aa3e6854ad593a5af85f8eb56874299d7e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 17:45:23 +0200 Subject: refactor(telemetry/compute): Support direct metric access This change introduces a `ComputeMetricReader` class that can be used as a replacement for the `CoroutineMetricReader` class when reading metrics from the Compute service. This implementation operates directly on a `ComputeService` instance, providing better performance. --- .../compute/workload/ComputeServiceHelper.kt | 21 +++++-- .../export/parquet/ParquetComputeMetricExporter.kt | 71 ---------------------- .../export/parquet/ParquetComputeMonitor.kt | 67 ++++++++++++++++++++ 3 files changed, 84 insertions(+), 75 deletions(-) delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt (limited to 'opendc-compute') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 4b0b343f..21cfdad2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield +import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost @@ -81,9 +82,19 @@ public class ComputeServiceHelper( } /** - * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * + * @param trace The trace to simulate. + * @param seed The seed for the simulation. + * @param servers A list to which the created servers is added. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). */ - public suspend fun run(trace: List, seed: Long, submitImmediately: Boolean = false) { + public suspend fun run( + trace: List, + seed: Long, + servers: MutableList? = null, + submitImmediately: Boolean = false + ) { val random = Random(seed) val injector = failureModel?.createInjector(context, clock, service, random) val client = service.newClient() @@ -129,12 +140,14 @@ public class ComputeServiceHelper( meta = mapOf("workload" to workload) ) + servers?.add(server) + // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) - // Delete the server after reaching the end-time of the virtual machine - server.delete() + // Stop the server after reaching the end-time of the virtual machine + server.stop() } } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt deleted file mode 100644 index a46885f4..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.export.parquet - -import io.opentelemetry.sdk.common.CompletableResultCode -import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader -import java.io.File - -/** - * A [ComputeMonitor] that logs the events to a Parquet file. - */ -public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { - private val serverWriter = ParquetServerDataWriter( - File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val hostWriter = ParquetHostDataWriter( - File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val serviceWriter = ParquetServiceDataWriter( - File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun record(reader: ServerTableReader) { - serverWriter.write(reader) - } - - override fun record(reader: HostTableReader) { - hostWriter.write(reader) - } - - override fun record(reader: ServiceTableReader) { - serviceWriter.write(reader) - } - - override fun shutdown(): CompletableResultCode { - hostWriter.close() - serviceWriter.close() - serverWriter.close() - - return CompletableResultCode.ofSuccess() - } -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt new file mode 100644 index 00000000..6c515118 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) + } + + override fun record(reader: HostTableReader) { + hostWriter.write(reader) + } + + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + } +} -- cgit v1.2.3