diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 17:47:44 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 17:47:44 +0200 |
| commit | a9657e4fa3b15e2c1c11884b5a250b0861bcc21d (patch) | |
| tree | 6b25de3d7a1def150ab4977a45723c52167e7211 | |
| parent | 48da4538707cd074969287724ca6f02823f2ff5a (diff) | |
| parent | 8e3905273c7a3f2df4df5d5840e4088d99b0dffb (diff) | |
merge: Expose metrics directly to user (#80)
This pull request adds the ability to access the metrics of resources modeled
by the OpenDC Compute, Workflow, FaaS, and TensorFlow services directly from
their corresponding interfaces. Previously, users would have to interact with
OpenTelemetry to obtain these values, which is complex and provides
significant overhead.
With this pull request, users can access the metrics of all cloud resources
modeled by OpenDC via methods such as `getSchedulerStats()`, etc.
** Breaking Changes **
- `ComputeService.hostCount` removed in favour of `ComputeService.hosts.size`
43 files changed, 1404 insertions, 327 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt index b508a9f8..64b73d0b 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt @@ -22,6 +22,8 @@ package org.opendc.compute.api +import java.time.Instant + /** * A stateful object representing a server instance that is running on some physical or virtual machine. */ @@ -42,6 +44,11 @@ public interface Server : Resource { public val state: ServerState /** + * The most recent moment in time when the server was launched. + */ + public val launchedAt: Instant? + + /** * Request the server to be started. * * This method is guaranteed to return after the request was acknowledged, but might return before the server was diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 2a1fbaa0..3a6baaa1 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -25,9 +25,11 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient +import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -37,16 +39,11 @@ import kotlin.coroutines.CoroutineContext */ public interface ComputeService : AutoCloseable { /** - * The hosts that are used by the compute service. + * The hosts that are registered with the "compute" service. */ public val hosts: Set<Host> /** - * The number of hosts available in the system. - */ - public val hostCount: Int - - /** * Create a new [ComputeClient] to control the compute service. */ public fun newClient(): ComputeClient @@ -66,6 +63,16 @@ public interface ComputeService : AutoCloseable { */ public override fun close() + /** + * Lookup the [Host] that currently hosts the specified [server]. + */ + public fun lookupHost(server: Server): Host? + + /** + * Collect the statistics about the scheduler component of this service. + */ + public fun getSchedulerStats(): SchedulerStats + public companion object { /** * Construct a new [ComputeService] implementation. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index bed15dfd..67b144d9 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -23,6 +23,10 @@ package org.opendc.compute.service.driver import org.opendc.compute.api.Server +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats +import org.opendc.compute.service.driver.telemetry.HostCpuStats +import org.opendc.compute.service.driver.telemetry.HostSystemStats import java.util.* /** @@ -55,6 +59,11 @@ public interface Host { public val meta: Map<String, Any> /** + * The [Server] instances known to the host. + */ + public val instances: Set<Server> + + /** * Determine whether the specified [instance][server] can still fit on this host. */ public fun canFit(server: Server): Boolean @@ -100,4 +109,30 @@ public interface Host { * Remove a [HostListener] from this host. */ public fun removeListener(listener: HostListener) + + /** + * Query the system statistics of the host. + */ + public fun getSystemStats(): HostSystemStats + + /** + * Query the system statistics of a [Server] that is located on this host. + * + * @param server The [Server] to obtain the system statistics of. + * @throws IllegalArgumentException if the server is not present on the host. + */ + public fun getSystemStats(server: Server): GuestSystemStats + + /** + * Query the CPU statistics of the host. + */ + public fun getCpuStats(): HostCpuStats + + /** + * Query the CPU statistics of a [Server] that is located on this host. + * + * @param server The [Server] to obtain the CPU statistics of. + * @throws IllegalArgumentException if the server is not present on the host. + */ + public fun getCpuStats(server: Server): GuestCpuStats } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt new file mode 100644 index 00000000..b5d63471 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +/** + * Statistics about the CPUs of a guest. + * + * @property activeTime The cumulative time (in seconds) that the CPUs of the guest were actively running. + * @property idleTime The cumulative time (in seconds) the CPUs of the guest were idle. + * @property stealTime The cumulative CPU time (in seconds) that the guest was ready to run, but not granted time by the host. + * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference with other machines. + * @property capacity The available CPU capacity of the guest (in MHz). + * @property usage Amount of CPU resources (in MHz) actually used by the guest. + * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public data class GuestCpuStats( + val activeTime: Long, + val idleTime: Long, + val stealTime: Long, + val lostTime: Long, + val capacity: Double, + val usage: Double, + val utilization: Double +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt new file mode 100644 index 00000000..b3958473 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +import java.time.Duration +import java.time.Instant + +/** + * System-level statistics of a guest. + * + * @property uptime The cumulative uptime of the guest since last boot (in ms). + * @property downtime The cumulative downtime of the guest since last boot (in ms). + * @property bootTime The time at which the guest booted. + */ +public data class GuestSystemStats( + val uptime: Duration, + val downtime: Duration, + val bootTime: Instant +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt new file mode 100644 index 00000000..55e23c0e --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +/** + * Statistics about the CPUs of a host. + * + * @property activeTime The cumulative time (in seconds) that the CPUs of the host were actively running. + * @property idleTime The cumulative time (in seconds) the CPUs of the host were idle. + * @property stealTime The cumulative CPU time (in seconds) that virtual machines were ready to run, but were not able to. + * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference between virtual machines. + * @property capacity The available CPU capacity of the host (in MHz). + * @property demand Amount of CPU resources (in MHz) the guests would use if there were no CPU contention or CPU + * limits. + * @property usage Amount of CPU resources (in MHz) actually used by the host. + * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public data class HostCpuStats( + val activeTime: Long, + val idleTime: Long, + val stealTime: Long, + val lostTime: Long, + val capacity: Double, + val demand: Double, + val usage: Double, + val utilization: Double +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt new file mode 100644 index 00000000..1c07023f --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +import java.time.Duration +import java.time.Instant + +/** + * System-level statistics of a host. + + * @property uptime The cumulative uptime of the host since last boot (in ms). + * @property downtime The cumulative downtime of the host since last boot (in ms). + * @property bootTime The time at which the server started. + * @property powerUsage Instantaneous power usage of the system (in W). + * @property energyUsage The cumulative energy usage of the system (in J). + * @property guestsTerminated The number of guests that are in a terminated state. + * @property guestsRunning The number of guests that are in a running state. + * @property guestsError The number of guests that are in an error state. + * @property guestsInvalid The number of guests that are in an unknown state. + */ +public data class HostSystemStats( + val uptime: Duration, + val downtime: Duration, + val bootTime: Instant, + val powerUsage: Double, + val energyUsage: Double, + val guestsTerminated: Int, + val guestsRunning: Int, + val guestsError: Int, + val guestsInvalid: Int, +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index f2929bf3..45775640 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -27,6 +27,7 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher +import java.time.Instant import java.util.* /** @@ -55,6 +56,9 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche override var state: ServerState = delegate.state private set + override var launchedAt: Instant? = null + private set + override suspend fun start() { delegate.start() refresh() @@ -95,6 +99,7 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche labels = delegate.labels meta = delegate.meta state = delegate.state + launchedAt = delegate.launchedAt } override fun onStateChanged(server: Server, newState: ServerState) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 144b6573..e8664e5c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -36,8 +36,10 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -126,6 +128,9 @@ internal class ComputeServiceImpl( private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success") private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure") private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error") + private var _attemptsSuccess = 0L + private var _attemptsFailure = 0L + private var _attemptsError = 0L /** * The response time of the service. @@ -145,6 +150,8 @@ internal class ComputeServiceImpl( .build() private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending") private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") + private var _serversPending = 0 + private var _serversActive = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. @@ -154,9 +161,6 @@ internal class ComputeServiceImpl( override val hosts: Set<Host> get() = hostToView.keys - override val hostCount: Int - get() = hostToView.size - init { val upState = Attributes.of(AttributeKey.stringKey("state"), "up") val downState = Attributes.of(AttributeKey.stringKey("state"), "down") @@ -165,7 +169,7 @@ internal class ComputeServiceImpl( .setDescription("Number of hosts registered with the scheduler") .setUnit("1") .buildWithCallback { result -> - val total = hostCount + val total = hosts.size val available = availableHosts.size.toLong() result.record(available, upState) @@ -322,17 +326,35 @@ internal class ComputeServiceImpl( } } + override fun lookupHost(server: Server): Host? { + val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" } + return internal.host + } + override fun close() { scope.cancel() } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats( + availableHosts.size, + hostToView.size - availableHosts.size, + _attemptsSuccess, + _attemptsFailure, + _attemptsError, + _serversPending, + _serversActive + ) + } + internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } val now = clock.millis() val request = SchedulingRequest(server, now) - server.lastProvisioningTimestamp = now + server.launchedAt = Instant.ofEpochMilli(now) queue.add(request) + _serversPending++ _servers.add(1, _serversPendingAttr) requestSchedulingCycle() return request @@ -371,6 +393,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) continue } @@ -383,7 +406,9 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) + _attemptsFailure++ _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr) logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } @@ -399,6 +424,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) _schedulingLatency.record(now - request.submitTime, server.attributes) @@ -417,6 +443,8 @@ internal class ComputeServiceImpl( activeServers[server] = host _servers.add(1, _serversActiveAttr) + _serversActive++ + _attemptsSuccess++ _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr) } catch (e: Throwable) { logger.error(e) { "Failed to deploy VM" } @@ -425,6 +453,7 @@ internal class ComputeServiceImpl( hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + _attemptsError++ _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr) } } @@ -481,6 +510,7 @@ internal class ComputeServiceImpl( logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } if (activeServers.remove(server) != null) { + _serversActive-- _servers.add(-1, _serversActiveAttr) } @@ -503,7 +533,8 @@ internal class ComputeServiceImpl( */ private fun collectProvisionTime(result: ObservableLongMeasurement) { for ((_, server) in servers) { - result.record(server.lastProvisioningTimestamp, server.attributes) + val launchedAt = server.launchedAt ?: continue + result.record(launchedAt.toEpochMilli(), server.attributes) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index f1b92c66..d2a2d896 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -28,6 +28,7 @@ import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host +import java.time.Instant import java.util.UUID /** @@ -75,7 +76,7 @@ internal class InternalServer( /** * The most recent timestamp when the server entered a provisioning state. */ - @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE + override var launchedAt: Instant? = null /** * The current scheduling request. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..4dc70286 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.telemetry + +import org.opendc.compute.service.ComputeService + +/** + * Statistics about the scheduling component of the [ComputeService]. + * + * @property hostsAvailable The number of hosts currently available for scheduling. + * @property hostsUnavailable The number of hosts unavailable for scheduling. + * @property attemptsSuccess Scheduling attempts that resulted into an allocation onto a host. + * @property attemptsFailure The number of failed scheduling attempt due to insufficient capacity at the moment. + * @property attemptsError The number of scheduling attempts that failed due to system error. + * @property serversPending The number of servers that are pending to be scheduled. + * @property serversActive The number of servers that are currently managed by the service and running. + */ +public data class SchedulerStats( + val hostsAvailable: Int, + val hostsUnavailable: Int, + val attemptsSuccess: Long, + val attemptsFailure: Long, + val attemptsError: Long, + val serversPending: Int, + val serversActive: Int +) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 7b8d0fe2..eb106817 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.service import io.mockk.* import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull @@ -48,10 +47,9 @@ import java.util.* /** * Test suite for the [ComputeService] interface. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class ComputeServiceTest { - lateinit var scope: SimulationCoroutineScope - lateinit var service: ComputeService + private lateinit var scope: SimulationCoroutineScope + private lateinit var service: ComputeService @BeforeEach fun setUp() { @@ -128,14 +126,12 @@ internal class ComputeServiceTest { every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP - assertEquals(0, service.hostCount) assertEquals(emptySet<Host>(), service.hosts) service.addHost(host) verify(exactly = 1) { host.addListener(any()) } - assertEquals(1, service.hostCount) assertEquals(1, service.hosts.size) service.removeHost(host) @@ -150,7 +146,6 @@ internal class ComputeServiceTest { every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN - assertEquals(0, service.hostCount) assertEquals(emptySet<Host>(), service.hosts) service.addHost(host) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index dfd3bc67..4e5a37ae 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -23,7 +23,6 @@ package org.opendc.compute.service import io.mockk.* -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test @@ -41,7 +40,6 @@ import java.util.* /** * Test suite for the [InternalServer] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) class InternalServerTest { @Test fun testEquality() { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 4eb6392e..323ae4fe 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -29,11 +29,14 @@ import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.api.metrics.ObservableDoubleMeasurement import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* -import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats +import org.opendc.compute.service.driver.telemetry.HostCpuStats +import org.opendc.compute.service.driver.telemetry.HostSystemStats import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* @@ -49,6 +52,8 @@ import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.CoroutineContext @@ -81,11 +86,6 @@ public class SimHost( private val clock = engine.clock /** - * The logger instance of this server. - */ - private val logger = KotlinLogging.logger {} - - /** * The [Meter] to track metrics of the simulated host. */ private val meter = meterProvider.get("org.opendc.compute.simulator") @@ -112,6 +112,9 @@ public class SimHost( private val guests = HashMap<Server, Guest>() private val _guests = mutableListOf<Guest>() + override val instances: Set<Server> + get() = guests.keys + override val state: HostState get() = _state private var _state: HostState = HostState.DOWN @@ -249,6 +252,68 @@ public class SimHost( machine.cancel() } + override fun getSystemStats(): HostSystemStats { + updateUptime() + + var terminated = 0 + var running = 0 + var error = 0 + var invalid = 0 + + val guests = _guests.listIterator() + for (guest in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + ServerState.DELETED -> { + // Remove guests that have been deleted + this.guests.remove(guest.server) + guests.remove() + } + else -> invalid++ + } + } + + return HostSystemStats( + Duration.ofMillis(_uptime), + Duration.ofMillis(_downtime), + Instant.ofEpochMilli(_bootTime), + machine.powerUsage, + machine.energyUsage, + terminated, + running, + error, + invalid + ) + } + + override fun getSystemStats(server: Server): GuestSystemStats { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + return guest.getSystemStats() + } + + override fun getCpuStats(): HostCpuStats { + val counters = hypervisor.counters + counters.flush() + + return HostCpuStats( + counters.cpuActiveTime / 1000L, + counters.cpuIdleTime / 1000L, + counters.cpuStealTime / 1000L, + counters.cpuLostTime / 1000L, + hypervisor.cpuCapacity, + hypervisor.cpuDemand, + hypervisor.cpuUsage, + hypervisor.cpuUsage / _cpuLimit + ) + } + + override fun getCpuStats(server: Server): GuestCpuStats { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + return guest.getCpuStats() + } + override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean { @@ -417,13 +482,12 @@ public class SimHost( * Helper function to track the CPU time of a machine. */ private fun collectCpuTime(result: ObservableLongMeasurement) { - val counters = hypervisor.counters - counters.flush() + val stats = getCpuStats() - result.record(counters.cpuActiveTime / 1000L, _activeState) - result.record(counters.cpuIdleTime / 1000L, _idleState) - result.record(counters.cpuStealTime / 1000L, _stealState) - result.record(counters.cpuLostTime / 1000L, _lostState) + result.record(stats.activeTime, _activeState) + result.record(stats.idleTime, _idleState) + result.record(stats.stealTime, _stealState) + result.record(stats.lostTime, _lostState) val guests = _guests for (i in guests.indices) { @@ -450,7 +514,7 @@ public class SimHost( val guests = _guests for (i in guests.indices) { - guests[i].updateUptime(duration) + guests[i].updateUptime() } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index bb378ee3..0d4c550d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -32,6 +32,8 @@ import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper import org.opendc.simulator.compute.kernel.SimHypervisor @@ -39,6 +41,8 @@ import org.opendc.simulator.compute.kernel.SimVirtualMachine import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock +import java.time.Duration +import java.time.Instant import kotlin.coroutines.CoroutineContext /** @@ -146,6 +150,37 @@ internal class Guest( } /** + * Obtain the system statistics of this guest. + */ + fun getSystemStats(): GuestSystemStats { + updateUptime() + + return GuestSystemStats( + Duration.ofMillis(_uptime), + Duration.ofMillis(_downtime), + Instant.ofEpochMilli(_bootTime) + ) + } + + /** + * Obtain the CPU statistics of this guest. + */ + fun getCpuStats(): GuestCpuStats { + val counters = machine.counters + counters.flush() + + return GuestCpuStats( + counters.cpuActiveTime / 1000L, + counters.cpuIdleTime / 1000L, + counters.cpuStealTime / 1000L, + counters.cpuLostTime / 1000L, + machine.cpuCapacity, + machine.cpuUsage, + machine.cpuUsage / _cpuLimit + ) + } + + /** * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. */ private var job: Job? = null @@ -209,6 +244,8 @@ internal class Guest( * This method is invoked when the guest stopped. */ private fun onStop(target: ServerState) { + updateUptime() + state = target listener.onStop(this) } @@ -224,10 +261,16 @@ internal class Guest( .put(STATE_KEY, "down") .build() + private var _lastReport = clock.millis() + /** * Helper function to track the uptime and downtime of the guest. */ - fun updateUptime(duration: Long) { + fun updateUptime() { + val now = clock.millis() + val duration = now - _lastReport + _lastReport = now + if (state == ServerState.RUNNING) { _uptime += duration } else if (state == ServerState.ERROR) { @@ -239,6 +282,8 @@ internal class Guest( * Helper function to track the uptime of the guest. */ fun collectUptime(result: ObservableLongMeasurement) { + updateUptime() + result.record(_uptime, _upState) result.record(_downtime, _downState) } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index f0325023..fd54ad1d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,8 +22,7 @@ package org.opendc.compute.simulator -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -42,13 +41,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine -import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.HOST_ID -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.resume @@ -73,45 +66,16 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var idleTime = 0L - var activeTime = 0L - var stealTime = 0L - - val hostId = UUID.randomUUID() - val hostResource = Resource.builder() - .put(HOST_ID, hostId.toString()) - .build() - - // Setup metric reader val duration = 5 * 60L - val reader = CoroutineMetricReader( - this, - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - stealTime += reader.cpuStealTime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - - val meterProvider = SdkMeterProvider - .builder() - .setResource(hostResource) - .setClock(clock.toOtelClock()) - .registerMetricReader(reader) - .build() - val engine = FlowEngine(coroutineContext, clock) - val virtDriver = SimHost( - uid = hostId, + val host = SimHost( + uid = UUID.randomUUID(), name = "test", model = machineModel, meta = emptyMap(), coroutineContext, engine, - meterProvider, + MeterProvider.noop(), SimFairShareHypervisorProvider() ) val vmImageA = MockImage( @@ -150,11 +114,11 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) coroutineScope { - launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } - launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } + launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } + launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } suspendCancellableCoroutine<Unit> { cont -> - virtDriver.addListener(object : HostListener { + host.addListener(object : HostListener { private var finished = 0 override fun onStateChanged(host: Host, server: Server, newState: ServerState) { @@ -168,13 +132,14 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) - virtDriver.close() - meterProvider.close() + host.close() + + val cpuStats = host.getCpuStats() assertAll( - { assertEquals(658, activeTime, "Active time does not match") }, - { assertEquals(2341, idleTime, "Idle time does not match") }, - { assertEquals(637, stealTime, "Steal time does not match") }, + { assertEquals(658, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(2341, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(637, cpuStats.stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } @@ -184,54 +149,16 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var activeTime = 0L - var idleTime = 0L - var uptime = 0L - var downtime = 0L - var guestUptime = 0L - var guestDowntime = 0L - - val hostId = UUID.randomUUID() - val hostResource = Resource.builder() - .put(HOST_ID, hostId.toString()) - .build() - - // Setup metric reader val duration = 5 * 60L - val reader = CoroutineMetricReader( - this, - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - uptime += reader.uptime - downtime += reader.downtime - } - - override fun record(reader: ServerTableReader) { - guestUptime += reader.uptime - guestDowntime += reader.downtime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - - val meterProvider = SdkMeterProvider - .builder() - .setResource(hostResource) - .setClock(clock.toOtelClock()) - .registerMetricReader(reader) - .build() - val engine = FlowEngine(coroutineContext, clock) val host = SimHost( - uid = hostId, + uid = UUID.randomUUID(), name = "test", model = machineModel, meta = emptyMap(), coroutineContext, engine, - meterProvider, + MeterProvider.noop(), SimFairShareHypervisorProvider() ) val image = MockImage( @@ -275,15 +202,17 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) - meterProvider.close() + val cpuStats = host.getCpuStats() + val sysStats = host.getSystemStats() + val guestSysStats = host.getSystemStats(server) assertAll( - { assertEquals(1775, idleTime, "Idle time does not match") }, - { assertEquals(624, activeTime, "Active time does not match") }, - { assertEquals(900001, uptime, "Uptime does not match") }, - { assertEquals(300000, downtime, "Downtime does not match") }, - { assertEquals(900000, guestUptime, "Guest uptime does not match") }, - { assertEquals(300000, guestDowntime, "Guest downtime does not match") }, + { assertEquals(1775, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(624, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(900001, sysStats.uptime.toMillis(), "Uptime does not match") }, + { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") }, + { assertEquals(900001, guestSysStats.uptime.toMillis(), "Guest uptime does not match") }, + { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") }, ) } @@ -332,6 +261,8 @@ internal class SimHostTest { override val state: ServerState = ServerState.TERMINATED + override val launchedAt: Instant? = null + override suspend fun start() {} override suspend fun stop() {} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 4b0b343f..21cfdad2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield +import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost @@ -81,9 +82,19 @@ public class ComputeServiceHelper( } /** - * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * + * @param trace The trace to simulate. + * @param seed The seed for the simulation. + * @param servers A list to which the created servers is added. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). */ - public suspend fun run(trace: List<VirtualMachine>, seed: Long, submitImmediately: Boolean = false) { + public suspend fun run( + trace: List<VirtualMachine>, + seed: Long, + servers: MutableList<Server>? = null, + submitImmediately: Boolean = false + ) { val random = Random(seed) val injector = failureModel?.createInjector(context, clock, service, random) val client = service.newClient() @@ -129,12 +140,14 @@ public class ComputeServiceHelper( meta = mapOf("workload" to workload) ) + servers?.add(server) + // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) - // Delete the server after reaching the end-time of the virtual machine - server.delete() + // Stop the server after reaching the end-time of the virtual machine + server.stop() } } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt index a46885f4..6c515118 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -22,8 +22,6 @@ package org.opendc.compute.workload.export.parquet -import io.opentelemetry.sdk.common.CompletableResultCode -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.telemetry.compute.table.ServerTableReader @@ -33,7 +31,7 @@ import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { +public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { private val serverWriter = ParquetServerDataWriter( File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize @@ -61,11 +59,9 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS serviceWriter.write(reader) } - override fun shutdown(): CompletableResultCode { + override fun close() { hostWriter.close() serviceWriter.close() serverWriter.close() - - return CompletableResultCode.ofSuccess() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 0bbf1443..6fd85e8c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,12 +23,14 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory +import kotlinx.coroutines.* +import org.opendc.compute.api.Server import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter +import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -37,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import java.io.File import java.time.Duration import java.util.* @@ -97,7 +99,7 @@ abstract class Portfolio(name: String) : Experiment(name) { else null val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) - val telemetry = SdkTelemetryManager(clock) + val telemetry = NoopTelemetryManager() val runner = ComputeServiceHelper( coroutineContext, clock, @@ -107,23 +109,35 @@ abstract class Portfolio(name: String) : Experiment(name) { interferenceModel?.withSeed(repeat.toLong()) ) - val exporter = ParquetComputeMetricExporter( - File(config.getString("output-path")), - "portfolio_id=$name/scenario_id=$id/run_id=$repeat", - 4096 - ) - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) - val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) + val servers = mutableListOf<Server>() + val exporter = ComputeMetricReader( + this, + clock, + runner.service, + servers, + ParquetComputeMonitor( + File(config.getString("output-path")), + "portfolio_id=$name/scenario_id=$id/run_id=$repeat", + bufferSize = 4096 + ), + exportInterval = Duration.ofMinutes(5) + ) try { // Instantiate the desired topology runner.apply(topology) - // Run the workload trace - runner.run(vms, seeder.nextLong()) + coroutineScope { + // Run the workload trace + runner.run(vms, seeder.nextLong(), servers) + + // Stop the metric collection + exporter.close() + } } finally { runner.close() + exporter.close() } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 01b2a8fe..62cdf123 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.api.Server import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMetricReader +import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration -import java.time.Instant import java.util.* /** @@ -54,7 +52,7 @@ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var exporter: TestComputeMetricExporter + private lateinit var monitor: TestComputeMonitor /** * The [FilterScheduler] to use for all experiments. @@ -67,11 +65,11 @@ class CapelinIntegrationTest { private lateinit var workloadLoader: ComputeWorkloadLoader /** - * Setup the experimental environment. + * Set up the experimental environment. */ @BeforeEach fun setUp() { - exporter = TestComputeMetricExporter() + monitor = TestComputeMonitor() computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -85,22 +83,22 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val (workload, _) = createTestWorkload(1.0) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology() - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, 0) + runner.run(workload, 0, servers) - val serviceMetrics = exporter.serviceMetrics + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -116,15 +114,15 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } finally { runner.close() - telemetry.close() + reader.close() } } @@ -135,41 +133,41 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1 val (workload, _) = createTestWorkload(0.25, seed) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, seed.toLong()) + runner.run(workload, seed.toLong(), servers) + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { runner.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -181,41 +179,41 @@ class CapelinIntegrationTest { val seed = 0 val (workload, interferenceModel) = createTestWorkload(1.0, seed) - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } ) } @@ -225,43 +223,43 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") val (workload, _) = createTestWorkload(0.25, seed) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } } + { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } ) } @@ -281,8 +279,7 @@ class CapelinIntegrationTest { return stream.use { clusterTopology(stream) } } - class TestComputeMetricExporter : ComputeMetricExporter() { - var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0) + class TestComputeMonitor : ComputeMonitor { var idleTime = 0L var activeTime = 0L var stealTime = 0L @@ -290,19 +287,6 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L - override fun record(reader: ServiceTableReader) { - serviceMetrics = ServiceData( - reader.timestamp, - reader.hostsUp, - reader.hostsDown, - reader.serversPending, - reader.serversActive, - reader.attemptsSuccess, - reader.attemptsFailure, - reader.attemptsError - ) - } - override fun record(reader: HostTableReader) { idleTime += reader.cpuIdleTime activeTime += reader.cpuActiveTime diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 43093abf..5762ce64 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -36,8 +36,7 @@ dependencies { implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.jackson.module.kotlin) { - exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") - } - implementation("org.jetbrains.kotlin:kotlin-reflect:1.6.10") + implementation(libs.jackson.module.kotlin) + + testImplementation(libs.slf4j.simple) } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 5245261c..99948c8e 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.tf20.core -import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine @@ -52,7 +51,7 @@ public class SimTFDevice( context: CoroutineContext, clock: Clock, meter: Meter, - private val pu: ProcessingUnit, + pu: ProcessingUnit, private val memory: MemoryUnit, powerModel: PowerModel ) : TFDevice { @@ -70,17 +69,13 @@ public class SimTFDevice( ) /** - * The identifier of a device. - */ - private val deviceId = AttributeKey.stringKey("device.id") - - /** * The usage of the device. */ private val _usage = meter.histogramBuilder("device.usage") .setDescription("The amount of device resources used") .setUnit("MHz") .build() + private var _resourceUsage = 0.0 /** * The power draw of the device. @@ -89,6 +84,8 @@ public class SimTFDevice( .setDescription("The power draw of the device") .setUnit("W") .build() + private var _powerUsage = 0.0 + private var _energyUsage = 0.0 /** * The workload that will be run by the device. @@ -175,7 +172,10 @@ public class SimTFDevice( override fun onConverge(conn: FlowConnection, now: Long) { _usage.record(conn.rate) + _resourceUsage = conn.rate _power.record(machine.psu.powerDraw) + _powerUsage = machine.powerUsage + _energyUsage = machine.energyUsage } } @@ -197,6 +197,10 @@ public class SimTFDevice( } } + override fun getDeviceStats(): TFDeviceStats { + return TFDeviceStats(_resourceUsage, _powerUsage, _energyUsage) + } + override fun close() { machine.cancel() scope.cancel() diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt index bbc34ed9..839ed8a9 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt @@ -47,4 +47,9 @@ public interface TFDevice : AutoCloseable { * Perform [flops] amount of computation on the device. */ public suspend fun compute(flops: Double) + + /** + * Collect device statistics. + */ + public fun getDeviceStats(): TFDeviceStats } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt new file mode 100644 index 00000000..016d2a8b --- /dev/null +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.tf20.core + +/** + * Statistics about a TensorFlow [TFDevice]. + * + * @property resourceUsage The resource usage of the device (in MHz). + * @property powerUsage The instantaneous power draw of the device (in W). + * @property energyUsage Cumulative energy usage of the device since boot (in J). + */ +data class TFDeviceStats( + val resourceUsage: Double, + val powerUsage: Double, + val energyUsage: Double +) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 28a2a319..0d5fbebb 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -25,6 +25,7 @@ package org.opendc.experiments.tf20.core import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.compute.model.MemoryUnit @@ -57,6 +58,12 @@ internal class SimTFDeviceTest { launch { device.compute(1e6) } launch { device.compute(2e6) } } - assertEquals(3681, clock.millis()) + + val stats = device.getDeviceStats() + + assertAll( + { assertEquals(3681, clock.millis()) }, + { assertEquals(325.75, stats.energyUsage) } + ) } } diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index c54595d3..1803ae69 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(projects.opendcFaas.opendcFaasApi) api(projects.opendcTelemetry.opendcTelemetryApi) + api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.opentelemetry.semconv) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 1d5331cb..f7dc3c1f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -25,10 +25,13 @@ package org.opendc.faas.service import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient +import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy import org.opendc.faas.service.deployer.FunctionDeployer import org.opendc.faas.service.internal.FaaSServiceImpl import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.service.telemetry.FunctionStats +import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -42,6 +45,16 @@ public interface FaaSService : AutoCloseable { public fun newClient(): FaaSClient /** + * Collect statistics about the scheduler of the service. + */ + public fun getSchedulerStats(): SchedulerStats + + /** + * Collect statistics about the specified [function]. + */ + public fun getFunctionStats(function: FaaSFunction): FunctionStats + + /** * Terminate the lifecycle of the FaaS service, stopping all running function instances. */ public override fun close() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 836231c8..52fcffa1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -29,7 +29,9 @@ import io.opentelemetry.api.metrics.LongHistogram import io.opentelemetry.api.metrics.LongUpDownCounter import io.opentelemetry.api.metrics.Meter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance +import org.opendc.faas.service.telemetry.FunctionStats import java.util.* /** @@ -46,7 +48,7 @@ public class FunctionObject( /** * The attributes of this function. */ - public val attributes: Attributes = Attributes.builder() + private val attributes: Attributes = Attributes.builder() .put(ResourceAttributes.FAAS_ID, uid.toString()) .put(ResourceAttributes.FAAS_NAME, name) .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) @@ -56,68 +58,78 @@ public class FunctionObject( /** * The total amount of function invocations received by the function. */ - public val invocations: LongCounter = meter.counterBuilder("function.invocations.total") + private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") .setDescription("Number of function invocations") .setUnit("1") .build() + private var _invocations = 0L /** * The amount of function invocations that could be handled directly. */ - public val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") + private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") .setDescription("Number of function invocations handled directly") .setUnit("1") .build() + private var _timelyInvocations = 0L /** * The amount of function invocations that were delayed due to function deployment. */ - public val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") + private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() + private var _delayedInvocations = 0L /** * The amount of function invocations that failed. */ - public val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") + private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") .setDescription("Number of function invocations that failed") .setUnit("1") .build() + private var _failedInvocations = 0L /** * The amount of instances for this function. */ - public val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") + private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") .setDescription("Number of active function instances") .setUnit("1") .build() + private var _activeInstances = 0 /** * The amount of idle instances for this function. */ - public val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") + private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") .setDescription("Number of idle function instances") .setUnit("1") .build() + private var _idleInstances = 0 /** * The time that the function waited. */ - public val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") + private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") .ofLongs() .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() + private val _waitTime = DescriptiveStatistics() + .apply { windowSize = 100 } /** * The time that the function was running. */ - public val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") + private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") .ofLongs() .setDescription("Time the function was running") .setUnit("ms") .build() + private val _activeTime = DescriptiveStatistics() + .apply { windowSize = 100 } /** * The instances associated with this function. @@ -134,6 +146,80 @@ public class FunctionObject( public val meta: MutableMap<String, Any> = meta.toMutableMap() + /** + * Report a scheduled invocation. + */ + internal fun reportSubmission() { + invocations.add(1, attributes) + _invocations++ + } + + /** + * Report the deployment of an invocation. + */ + internal fun reportDeployment(isDelayed: Boolean) { + if (isDelayed) { + delayedInvocations.add(1, attributes) + _delayedInvocations++ + + idleInstances.add(1, attributes) + _idleInstances++ + } else { + timelyInvocations.add(1, attributes) + _timelyInvocations++ + } + } + + /** + * Report the start of a function invocation. + */ + internal fun reportStart(start: Long, submitTime: Long) { + val wait = start - submitTime + waitTime.record(wait, attributes) + _waitTime.addValue(wait.toDouble()) + + idleInstances.add(-1, attributes) + _idleInstances-- + activeInstances.add(1, attributes) + _activeInstances++ + } + + /** + * Report the failure of a function invocation. + */ + internal fun reportFailure() { + failedInvocations.add(1, attributes) + _failedInvocations++ + } + + /** + * Report the end of a function invocation. + */ + internal fun reportEnd(duration: Long) { + activeTime.record(duration, attributes) + _activeTime.addValue(duration.toDouble()) + idleInstances.add(1, attributes) + _idleInstances++ + activeInstances.add(-1, attributes) + _activeInstances-- + } + + /** + * Collect the statistics of this function. + */ + internal fun getStats(): FunctionStats { + return FunctionStats( + _invocations, + _timelyInvocations, + _delayedInvocations, + _failedInvocations, + _activeInstances, + _idleInstances, + _waitTime.copy(), + _activeTime.copy() + ) + } + override fun close() { instances.forEach(FunctionInstance::close) instances.clear() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 1526be9d..ce3b2b98 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -38,6 +38,8 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.service.telemetry.FunctionStats +import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock import java.util.* @@ -103,6 +105,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations") .setUnit("1") .build() + private var totalInvocations = 0L /** * The amount of function invocations that could be handled directly. @@ -111,6 +114,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations handled directly") .setUnit("1") .build() + private var timelyInvocations = 0L /** * The amount of function invocations that were delayed due to function deployment. @@ -119,6 +123,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() + private var delayedInvocations = 0L override fun newClient(): FaaSClient { return object : FaaSClient { @@ -187,6 +192,15 @@ internal class FaaSServiceImpl( } } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats(totalInvocations, timelyInvocations, delayedInvocations) + } + + override fun getFunctionStats(function: FaaSFunction): FunctionStats { + val func = requireNotNull(functions[function.uid]) { "Unknown function" } + return func.getStats() + } + /** * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ @@ -219,7 +233,8 @@ internal class FaaSServiceImpl( val instance = if (activeInstance != null) { _timelyInvocations.add(1) - function.timelyInvocations.add(1, function.attributes) + timelyInvocations++ + function.reportDeployment(isDelayed = false) activeInstance } else { @@ -227,29 +242,24 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - function.idleInstances.add(1, function.attributes) - _delayedInvocations.add(1) - function.delayedInvocations.add(1, function.attributes) + delayedInvocations++ + function.reportDeployment(isDelayed = true) instance } suspend { val start = clock.millis() - function.waitTime.record(start - submitTime, function.attributes) - function.idleInstances.add(-1, function.attributes) - function.activeInstances.add(1, function.attributes) + function.reportStart(start, submitTime) try { instance.invoke() } catch (e: Throwable) { logger.debug(e) { "Function invocation failed" } - function.failedInvocations.add(1, function.attributes) + function.reportFailure() } finally { val end = clock.millis() - function.activeTime.record(end - start, function.attributes) - function.idleInstances.add(1, function.attributes) - function.activeInstances.add(-1, function.attributes) + function.reportEnd(end - start) } }.startCoroutineCancellable(cont) } @@ -262,7 +272,8 @@ internal class FaaSServiceImpl( check(function.uid in functions) { "Function does not exist (anymore)" } _invocations.add(1) - function.invocations.add(1, function.attributes) + totalInvocations++ + function.reportSubmission() return suspendCancellableCoroutine { cont -> if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt new file mode 100644 index 00000000..497ee423 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.telemetry + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics + +/** + * Statistics about function invocations. + * + * @property totalInvocations The number of function invocations. + * @property timelyInvocations The number of function invocations that could be handled directly. + * @property delayedInvocations The number of function invocations that are delayed (cold starts). + * @property failedInvocations The number of function invocations that failed. + * @property activeInstances The number of active function instances. + * @property idleInstances The number of idle function instances. + * @property waitTime Statistics about the wait time of a function invocation. + * @property activeTime Statistics about the runtime of a function invocation. + */ +public data class FunctionStats( + val totalInvocations: Long, + val timelyInvocations: Long, + val delayedInvocations: Long, + val failedInvocations: Long, + val activeInstances: Int, + val idleInstances: Int, + val waitTime: DescriptiveStatistics, + val activeTime: DescriptiveStatistics +) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..cabb1d56 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.telemetry + +/** + * Statistics reported by the FaaS scheduler. + * + * @property totalInvocations The total amount of function invocations received by the scheduler. + * @property timelyInvocations The amount of function invocations that could be handled directly. + * @property delayedInvocations The amount of function invocations that were delayed due to function deployment. + */ +public data class SchedulerStats( + val totalInvocations: Long, + val timelyInvocations: Long, + val delayedInvocations: Long +) diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 68233c1a..a3d0d34e 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -123,7 +123,6 @@ public class SimFunctionDeployer( /** * Start the function instance. */ - @OptIn(InternalCoroutinesApi::class) internal fun start() { check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" } job = scope.launch { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 0dc9ba87..792a8584 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -28,22 +28,25 @@ import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.faas.service.FaaSService import org.opendc.faas.service.autoscaler.FunctionTerminationPolicyFixed import org.opendc.faas.service.router.RandomRoutingPolicy -import org.opendc.faas.simulator.delay.ZeroDelayInjector +import org.opendc.faas.simulator.delay.ColdStartModel +import org.opendc.faas.simulator.delay.StochasticDelayInjector import org.opendc.faas.simulator.workload.SimFaaSWorkload import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation import java.time.Duration +import java.util.* /** * A test suite for the [FaaSService] implementation under simulated conditions. @@ -65,10 +68,15 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runBlockingSimulation { - val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) { - override suspend fun invoke() {} + val random = Random(0) + val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000) { + override suspend fun invoke() { + delay(random.nextInt(1000).toLong()) + } }) - val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload } + + val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) + val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } val service = FaaSService( coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) @@ -84,8 +92,15 @@ internal class SimFaaSServiceTest { yield() + val funcStats = service.getFunctionStats(function) + assertAll( { coVerify { workload.invoke() } }, + { assertEquals(1, funcStats.totalInvocations) }, + { assertEquals(1, funcStats.delayedInvocations) }, + { assertEquals(0, funcStats.failedInvocations) }, + { assertEquals(100.0, funcStats.waitTime.mean) }, + { assertEquals(1285.0, funcStats.activeTime.mean) }, ) } } diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index 47e30a14..b476a669 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcCompute.opendcComputeService) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt new file mode 100644 index 00000000..593203fc --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt @@ -0,0 +1,424 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.telemetry.compute.table.* +import java.time.Clock +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param clock The virtual clock. + * @param service The [ComputeService] to monitor. + * @param servers The [Server]s to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + scope: CoroutineScope, + clock: Clock, + private val service: ComputeService, + private val servers: List<Server>, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + + try { + while (isActive) { + delay(intervalMs) + + try { + val now = clock.instant() + + for (host in service.hosts) { + val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + for (server in servers) { + val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + serviceTableReader.record(now) + monitor.record(serviceTableReader) + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + } + } + } finally { + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index a9290c47..ca5da079 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -31,7 +31,6 @@ import io.opentelemetry.sdk.metrics.export.MetricReaderFactory import kotlinx.coroutines.* import mu.KotlinLogging import java.time.Duration -import java.util.* /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index a150de4e..7c0c43ed 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,8 +23,9 @@ package org.opendc.web.runner import mu.KotlinLogging +import org.opendc.compute.api.Server import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply @@ -35,13 +36,12 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario import org.opendc.web.runner.internal.JobManager -import org.opendc.web.runner.internal.WebComputeMetricExporter +import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration import java.util.* @@ -180,9 +180,9 @@ public class OpenDCRunner( private val scenario: Scenario, private val repeat: Int, private val topology: Topology, - ) : RecursiveTask<WebComputeMetricExporter.Results>() { - override fun compute(): WebComputeMetricExporter.Results { - val exporter = WebComputeMetricExporter() + ) : RecursiveTask<WebComputeMonitor.Results>() { + override fun compute(): WebComputeMonitor.Results { + val monitor = WebComputeMonitor() // Schedule task that interrupts the simulation if it runs for too long. val currentThread = Thread.currentThread() @@ -206,25 +206,24 @@ public class OpenDCRunner( else null - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, failureModel, interferenceModel.takeIf { phenomena.interference } ) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, seeder.nextLong()) + simulator.run(vms, seeder.nextLong(), servers) - val serviceMetrics = collectServiceMetrics(telemetry.metricProducer) + val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -235,14 +234,14 @@ public class OpenDCRunner( } } finally { simulator.close() - telemetry.close() + reader.close() } } } finally { interruptTask.cancel(false) } - return exporter.collectResults() + return monitor.collectResults() } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt index 8de0cee4..99b8aaf1 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt @@ -62,7 +62,7 @@ internal class JobManager(private val client: OpenDCRunnerClient) { /** * Persist the specified results. */ - fun finish(id: Long, results: List<WebComputeMetricExporter.Results>) { + fun finish(id: Long, results: List<WebComputeMonitor.Results>) { client.jobs.update( id, Job.Update( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 04437a5f..69350d8c 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -22,7 +22,6 @@ package org.opendc.web.runner.internal -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.telemetry.compute.table.ServiceTableReader @@ -32,7 +31,7 @@ import kotlin.math.roundToLong /** * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ -internal class WebComputeMetricExporter : ComputeMetricExporter() { +internal class WebComputeMonitor : ComputeMonitor { override fun record(reader: HostTableReader) { val slices = reader.downtime / SLICE_LENGTH diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index ebace07d..b8bc0e33 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -30,6 +30,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import org.opendc.workflow.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -46,6 +47,11 @@ public interface WorkflowService : AutoCloseable { public suspend fun invoke(job: Job) /** + * Collect statistics about the workflow scheduler. + */ + public fun getSchedulerStats(): SchedulerStats + + /** * Terminate the lifecycle of the workflow service, stopping all running workflows. */ public override fun close() diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index cdaec021..9c7f18a2 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -34,6 +34,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import org.opendc.workflow.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import java.util.* @@ -145,6 +146,7 @@ public class WorkflowServiceImpl( .setDescription("Number of submitted jobs") .setUnit("1") .build() + private var _workflowsSubmitted: Int = 0 /** * The number of jobs that are running. @@ -153,6 +155,7 @@ public class WorkflowServiceImpl( .setDescription("Number of jobs running") .setUnit("1") .build() + private var _workflowsRunning: Int = 0 /** * The number of jobs that have finished running. @@ -161,6 +164,7 @@ public class WorkflowServiceImpl( .setDescription("Number of jobs that finished running") .setUnit("1") .build() + private var _workflowsFinished: Int = 0 /** * The number of tasks that have been submitted to the service. @@ -169,6 +173,7 @@ public class WorkflowServiceImpl( .setDescription("Number of submitted tasks") .setUnit("1") .build() + private var _tasksSubmitted: Int = 0 /** * The number of jobs that are running. @@ -177,6 +182,7 @@ public class WorkflowServiceImpl( .setDescription("Number of tasks running") .setUnit("1") .build() + private var _tasksRunning: Int = 0 /** * The number of jobs that have finished running. @@ -185,6 +191,7 @@ public class WorkflowServiceImpl( .setDescription("Number of tasks that finished running") .setUnit("1") .build() + private var _tasksFinished: Int = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. @@ -223,16 +230,22 @@ public class WorkflowServiceImpl( } submittedTasks.add(1) + _tasksSubmitted++ } instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) submittedJobs.add(1) + _workflowsSubmitted++ pacer.enqueue() } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats(_workflowsSubmitted, _workflowsRunning, _workflowsFinished, _tasksSubmitted, _tasksRunning, _tasksFinished) + } + override fun close() { scope.cancel() } @@ -271,6 +284,7 @@ public class WorkflowServiceImpl( activeJobs += jobInstance runningJobs.add(1) + _workflowsRunning++ rootListener.jobStarted(jobInstance) } @@ -350,6 +364,7 @@ public class WorkflowServiceImpl( val task = taskByServer.getValue(server) task.startedAt = clock.millis() runningTasks.add(1) + _tasksRunning++ rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -368,6 +383,8 @@ public class WorkflowServiceImpl( runningTasks.add(-1) finishedTasks.add(1) + _tasksRunning-- + _tasksFinished++ rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -395,6 +412,8 @@ public class WorkflowServiceImpl( activeJobs -= job runningJobs.add(-1) finishedJobs.add(1) + _workflowsRunning-- + _workflowsFinished++ rootListener.jobFinished(job) job.cont.resume(Unit) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..7c7d7c4d --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.telemetry + +/** + * Statistics about the workflow scheduler. + * + * @property workflowsSubmitted The number of workflows submitted to the scheduler. + * @property workflowsRunning The number of workflows that are currently running. + * @property workflowsFinished The number of workflows that have completed since the scheduler started. + * @property tasksSubmitted The number of tasks submitted to the scheduler. + * @property tasksRunning The number of tasks that are currently running. + * @property tasksFinished The number of tasks that have completed. + */ +public data class SchedulerStats( + val workflowsSubmitted: Int, + val workflowsRunning: Int, + val workflowsFinished: Int, + val tasksSubmitted: Int, + val tasksRunning: Int, + val tasksFinished: Int +) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 1fd332b9..d5f06587 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,7 +22,6 @@ package org.opendc.workflow.service -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -66,7 +65,6 @@ internal class WorkflowServiceTest { @Test fun testTrace() = runBlockingSimulation { // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - val HOST_COUNT = 4 val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) @@ -74,7 +72,8 @@ internal class WorkflowServiceTest { val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) - repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) } + val hostCount = 4 + repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines val workflowScheduler = WorkflowSchedulerSpec( @@ -98,13 +97,13 @@ internal class WorkflowServiceTest { computeHelper.close() } - val metrics = collectMetrics(workflowHelper.metricProducer) + val metrics = workflowHelper.service.getSchedulerStats() assertAll( - { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, - { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } ) @@ -130,28 +129,4 @@ internal class WorkflowServiceTest { SimSpaceSharedHypervisorProvider() ) } - - class WorkflowMetrics { - var jobsSubmitted = 0L - var jobsActive = 0L - var jobsFinished = 0L - var tasksSubmitted = 0L - var tasksActive = 0L - var tasksFinished = 0L - } - - /** - * Collect the metrics of the workflow service. - */ - private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } - val res = WorkflowMetrics() - res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 - return res - } } |
