diff options
43 files changed, 1404 insertions, 327 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt index b508a9f8..64b73d0b 100644 --- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt +++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt @@ -22,6 +22,8 @@ package org.opendc.compute.api +import java.time.Instant + /** * A stateful object representing a server instance that is running on some physical or virtual machine. */ @@ -42,6 +44,11 @@ public interface Server : Resource { public val state: ServerState /** + * The most recent moment in time when the server was launched. + */ + public val launchedAt: Instant? + + /** * Request the server to be started. * * This method is guaranteed to return after the request was acknowledged, but might return before the server was diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 2a1fbaa0..3a6baaa1 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -25,9 +25,11 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient +import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -37,16 +39,11 @@ import kotlin.coroutines.CoroutineContext */ public interface ComputeService : AutoCloseable { /** - * The hosts that are used by the compute service. + * The hosts that are registered with the "compute" service. */ public val hosts: Set<Host> /** - * The number of hosts available in the system. - */ - public val hostCount: Int - - /** * Create a new [ComputeClient] to control the compute service. */ public fun newClient(): ComputeClient @@ -66,6 +63,16 @@ public interface ComputeService : AutoCloseable { */ public override fun close() + /** + * Lookup the [Host] that currently hosts the specified [server]. + */ + public fun lookupHost(server: Server): Host? + + /** + * Collect the statistics about the scheduler component of this service. + */ + public fun getSchedulerStats(): SchedulerStats + public companion object { /** * Construct a new [ComputeService] implementation. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index bed15dfd..67b144d9 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -23,6 +23,10 @@ package org.opendc.compute.service.driver import org.opendc.compute.api.Server +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats +import org.opendc.compute.service.driver.telemetry.HostCpuStats +import org.opendc.compute.service.driver.telemetry.HostSystemStats import java.util.* /** @@ -55,6 +59,11 @@ public interface Host { public val meta: Map<String, Any> /** + * The [Server] instances known to the host. + */ + public val instances: Set<Server> + + /** * Determine whether the specified [instance][server] can still fit on this host. */ public fun canFit(server: Server): Boolean @@ -100,4 +109,30 @@ public interface Host { * Remove a [HostListener] from this host. */ public fun removeListener(listener: HostListener) + + /** + * Query the system statistics of the host. + */ + public fun getSystemStats(): HostSystemStats + + /** + * Query the system statistics of a [Server] that is located on this host. + * + * @param server The [Server] to obtain the system statistics of. + * @throws IllegalArgumentException if the server is not present on the host. + */ + public fun getSystemStats(server: Server): GuestSystemStats + + /** + * Query the CPU statistics of the host. + */ + public fun getCpuStats(): HostCpuStats + + /** + * Query the CPU statistics of a [Server] that is located on this host. + * + * @param server The [Server] to obtain the CPU statistics of. + * @throws IllegalArgumentException if the server is not present on the host. + */ + public fun getCpuStats(server: Server): GuestCpuStats } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt new file mode 100644 index 00000000..b5d63471 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +/** + * Statistics about the CPUs of a guest. + * + * @property activeTime The cumulative time (in seconds) that the CPUs of the guest were actively running. + * @property idleTime The cumulative time (in seconds) the CPUs of the guest were idle. + * @property stealTime The cumulative CPU time (in seconds) that the guest was ready to run, but not granted time by the host. + * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference with other machines. + * @property capacity The available CPU capacity of the guest (in MHz). + * @property usage Amount of CPU resources (in MHz) actually used by the guest. + * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public data class GuestCpuStats( + val activeTime: Long, + val idleTime: Long, + val stealTime: Long, + val lostTime: Long, + val capacity: Double, + val usage: Double, + val utilization: Double +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt new file mode 100644 index 00000000..b3958473 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +import java.time.Duration +import java.time.Instant + +/** + * System-level statistics of a guest. + * + * @property uptime The cumulative uptime of the guest since last boot (in ms). + * @property downtime The cumulative downtime of the guest since last boot (in ms). + * @property bootTime The time at which the guest booted. + */ +public data class GuestSystemStats( + val uptime: Duration, + val downtime: Duration, + val bootTime: Instant +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt new file mode 100644 index 00000000..55e23c0e --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +/** + * Statistics about the CPUs of a host. + * + * @property activeTime The cumulative time (in seconds) that the CPUs of the host were actively running. + * @property idleTime The cumulative time (in seconds) the CPUs of the host were idle. + * @property stealTime The cumulative CPU time (in seconds) that virtual machines were ready to run, but were not able to. + * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference between virtual machines. + * @property capacity The available CPU capacity of the host (in MHz). + * @property demand Amount of CPU resources (in MHz) the guests would use if there were no CPU contention or CPU + * limits. + * @property usage Amount of CPU resources (in MHz) actually used by the host. + * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity. + */ +public data class HostCpuStats( + val activeTime: Long, + val idleTime: Long, + val stealTime: Long, + val lostTime: Long, + val capacity: Double, + val demand: Double, + val usage: Double, + val utilization: Double +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt new file mode 100644 index 00000000..1c07023f --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.driver.telemetry + +import java.time.Duration +import java.time.Instant + +/** + * System-level statistics of a host. + + * @property uptime The cumulative uptime of the host since last boot (in ms). + * @property downtime The cumulative downtime of the host since last boot (in ms). + * @property bootTime The time at which the server started. + * @property powerUsage Instantaneous power usage of the system (in W). + * @property energyUsage The cumulative energy usage of the system (in J). + * @property guestsTerminated The number of guests that are in a terminated state. + * @property guestsRunning The number of guests that are in a running state. + * @property guestsError The number of guests that are in an error state. + * @property guestsInvalid The number of guests that are in an unknown state. + */ +public data class HostSystemStats( + val uptime: Duration, + val downtime: Duration, + val bootTime: Instant, + val powerUsage: Double, + val energyUsage: Double, + val guestsTerminated: Int, + val guestsRunning: Int, + val guestsError: Int, + val guestsInvalid: Int, +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index f2929bf3..45775640 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -27,6 +27,7 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher +import java.time.Instant import java.util.* /** @@ -55,6 +56,9 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche override var state: ServerState = delegate.state private set + override var launchedAt: Instant? = null + private set + override suspend fun start() { delegate.start() refresh() @@ -95,6 +99,7 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche labels = delegate.labels meta = delegate.meta state = delegate.state + launchedAt = delegate.launchedAt } override fun onStateChanged(server: Server, newState: ServerState) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 144b6573..e8664e5c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -36,8 +36,10 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -126,6 +128,9 @@ internal class ComputeServiceImpl( private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success") private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure") private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error") + private var _attemptsSuccess = 0L + private var _attemptsFailure = 0L + private var _attemptsError = 0L /** * The response time of the service. @@ -145,6 +150,8 @@ internal class ComputeServiceImpl( .build() private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending") private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") + private var _serversPending = 0 + private var _serversActive = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. @@ -154,9 +161,6 @@ internal class ComputeServiceImpl( override val hosts: Set<Host> get() = hostToView.keys - override val hostCount: Int - get() = hostToView.size - init { val upState = Attributes.of(AttributeKey.stringKey("state"), "up") val downState = Attributes.of(AttributeKey.stringKey("state"), "down") @@ -165,7 +169,7 @@ internal class ComputeServiceImpl( .setDescription("Number of hosts registered with the scheduler") .setUnit("1") .buildWithCallback { result -> - val total = hostCount + val total = hosts.size val available = availableHosts.size.toLong() result.record(available, upState) @@ -322,17 +326,35 @@ internal class ComputeServiceImpl( } } + override fun lookupHost(server: Server): Host? { + val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" } + return internal.host + } + override fun close() { scope.cancel() } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats( + availableHosts.size, + hostToView.size - availableHosts.size, + _attemptsSuccess, + _attemptsFailure, + _attemptsError, + _serversPending, + _serversActive + ) + } + internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } val now = clock.millis() val request = SchedulingRequest(server, now) - server.lastProvisioningTimestamp = now + server.launchedAt = Instant.ofEpochMilli(now) queue.add(request) + _serversPending++ _servers.add(1, _serversPendingAttr) requestSchedulingCycle() return request @@ -371,6 +393,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) continue } @@ -383,7 +406,9 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) + _attemptsFailure++ _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr) logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } @@ -399,6 +424,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() + _serversPending-- _servers.add(-1, _serversPendingAttr) _schedulingLatency.record(now - request.submitTime, server.attributes) @@ -417,6 +443,8 @@ internal class ComputeServiceImpl( activeServers[server] = host _servers.add(1, _serversActiveAttr) + _serversActive++ + _attemptsSuccess++ _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr) } catch (e: Throwable) { logger.error(e) { "Failed to deploy VM" } @@ -425,6 +453,7 @@ internal class ComputeServiceImpl( hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + _attemptsError++ _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr) } } @@ -481,6 +510,7 @@ internal class ComputeServiceImpl( logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } if (activeServers.remove(server) != null) { + _serversActive-- _servers.add(-1, _serversActiveAttr) } @@ -503,7 +533,8 @@ internal class ComputeServiceImpl( */ private fun collectProvisionTime(result: ObservableLongMeasurement) { for ((_, server) in servers) { - result.record(server.lastProvisioningTimestamp, server.attributes) + val launchedAt = server.launchedAt ?: continue + result.record(launchedAt.toEpochMilli(), server.attributes) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index f1b92c66..d2a2d896 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -28,6 +28,7 @@ import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host +import java.time.Instant import java.util.UUID /** @@ -75,7 +76,7 @@ internal class InternalServer( /** * The most recent timestamp when the server entered a provisioning state. */ - @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE + override var launchedAt: Instant? = null /** * The current scheduling request. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..4dc70286 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.telemetry + +import org.opendc.compute.service.ComputeService + +/** + * Statistics about the scheduling component of the [ComputeService]. + * + * @property hostsAvailable The number of hosts currently available for scheduling. + * @property hostsUnavailable The number of hosts unavailable for scheduling. + * @property attemptsSuccess Scheduling attempts that resulted into an allocation onto a host. + * @property attemptsFailure The number of failed scheduling attempt due to insufficient capacity at the moment. + * @property attemptsError The number of scheduling attempts that failed due to system error. + * @property serversPending The number of servers that are pending to be scheduled. + * @property serversActive The number of servers that are currently managed by the service and running. + */ +public data class SchedulerStats( + val hostsAvailable: Int, + val hostsUnavailable: Int, + val attemptsSuccess: Long, + val attemptsFailure: Long, + val attemptsError: Long, + val serversPending: Int, + val serversActive: Int +) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 7b8d0fe2..eb106817 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.service import io.mockk.* import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull @@ -48,10 +47,9 @@ import java.util.* /** * Test suite for the [ComputeService] interface. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class ComputeServiceTest { - lateinit var scope: SimulationCoroutineScope - lateinit var service: ComputeService + private lateinit var scope: SimulationCoroutineScope + private lateinit var service: ComputeService @BeforeEach fun setUp() { @@ -128,14 +126,12 @@ internal class ComputeServiceTest { every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP - assertEquals(0, service.hostCount) assertEquals(emptySet<Host>(), service.hosts) service.addHost(host) verify(exactly = 1) { host.addListener(any()) } - assertEquals(1, service.hostCount) assertEquals(1, service.hosts.size) service.removeHost(host) @@ -150,7 +146,6 @@ internal class ComputeServiceTest { every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN - assertEquals(0, service.hostCount) assertEquals(emptySet<Host>(), service.hosts) service.addHost(host) diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index dfd3bc67..4e5a37ae 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -23,7 +23,6 @@ package org.opendc.compute.service import io.mockk.* -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test @@ -41,7 +40,6 @@ import java.util.* /** * Test suite for the [InternalServer] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) class InternalServerTest { @Test fun testEquality() { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 4eb6392e..323ae4fe 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -29,11 +29,14 @@ import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.api.metrics.ObservableDoubleMeasurement import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* -import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats +import org.opendc.compute.service.driver.telemetry.HostCpuStats +import org.opendc.compute.service.driver.telemetry.HostSystemStats import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* @@ -49,6 +52,8 @@ import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.CoroutineContext @@ -81,11 +86,6 @@ public class SimHost( private val clock = engine.clock /** - * The logger instance of this server. - */ - private val logger = KotlinLogging.logger {} - - /** * The [Meter] to track metrics of the simulated host. */ private val meter = meterProvider.get("org.opendc.compute.simulator") @@ -112,6 +112,9 @@ public class SimHost( private val guests = HashMap<Server, Guest>() private val _guests = mutableListOf<Guest>() + override val instances: Set<Server> + get() = guests.keys + override val state: HostState get() = _state private var _state: HostState = HostState.DOWN @@ -249,6 +252,68 @@ public class SimHost( machine.cancel() } + override fun getSystemStats(): HostSystemStats { + updateUptime() + + var terminated = 0 + var running = 0 + var error = 0 + var invalid = 0 + + val guests = _guests.listIterator() + for (guest in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + ServerState.DELETED -> { + // Remove guests that have been deleted + this.guests.remove(guest.server) + guests.remove() + } + else -> invalid++ + } + } + + return HostSystemStats( + Duration.ofMillis(_uptime), + Duration.ofMillis(_downtime), + Instant.ofEpochMilli(_bootTime), + machine.powerUsage, + machine.energyUsage, + terminated, + running, + error, + invalid + ) + } + + override fun getSystemStats(server: Server): GuestSystemStats { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + return guest.getSystemStats() + } + + override fun getCpuStats(): HostCpuStats { + val counters = hypervisor.counters + counters.flush() + + return HostCpuStats( + counters.cpuActiveTime / 1000L, + counters.cpuIdleTime / 1000L, + counters.cpuStealTime / 1000L, + counters.cpuLostTime / 1000L, + hypervisor.cpuCapacity, + hypervisor.cpuDemand, + hypervisor.cpuUsage, + hypervisor.cpuUsage / _cpuLimit + ) + } + + override fun getCpuStats(server: Server): GuestCpuStats { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + return guest.getCpuStats() + } + override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean { @@ -417,13 +482,12 @@ public class SimHost( * Helper function to track the CPU time of a machine. */ private fun collectCpuTime(result: ObservableLongMeasurement) { - val counters = hypervisor.counters - counters.flush() + val stats = getCpuStats() - result.record(counters.cpuActiveTime / 1000L, _activeState) - result.record(counters.cpuIdleTime / 1000L, _idleState) - result.record(counters.cpuStealTime / 1000L, _stealState) - result.record(counters.cpuLostTime / 1000L, _lostState) + result.record(stats.activeTime, _activeState) + result.record(stats.idleTime, _idleState) + result.record(stats.stealTime, _stealState) + result.record(stats.lostTime, _lostState) val guests = _guests for (i in guests.indices) { @@ -450,7 +514,7 @@ public class SimHost( val guests = _guests for (i in guests.indices) { - guests[i].updateUptime(duration) + guests[i].updateUptime() } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index bb378ee3..0d4c550d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -32,6 +32,8 @@ import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.telemetry.GuestCpuStats +import org.opendc.compute.service.driver.telemetry.GuestSystemStats import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper import org.opendc.simulator.compute.kernel.SimHypervisor @@ -39,6 +41,8 @@ import org.opendc.simulator.compute.kernel.SimVirtualMachine import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock +import java.time.Duration +import java.time.Instant import kotlin.coroutines.CoroutineContext /** @@ -146,6 +150,37 @@ internal class Guest( } /** + * Obtain the system statistics of this guest. + */ + fun getSystemStats(): GuestSystemStats { + updateUptime() + + return GuestSystemStats( + Duration.ofMillis(_uptime), + Duration.ofMillis(_downtime), + Instant.ofEpochMilli(_bootTime) + ) + } + + /** + * Obtain the CPU statistics of this guest. + */ + fun getCpuStats(): GuestCpuStats { + val counters = machine.counters + counters.flush() + + return GuestCpuStats( + counters.cpuActiveTime / 1000L, + counters.cpuIdleTime / 1000L, + counters.cpuStealTime / 1000L, + counters.cpuLostTime / 1000L, + machine.cpuCapacity, + machine.cpuUsage, + machine.cpuUsage / _cpuLimit + ) + } + + /** * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. */ private var job: Job? = null @@ -209,6 +244,8 @@ internal class Guest( * This method is invoked when the guest stopped. */ private fun onStop(target: ServerState) { + updateUptime() + state = target listener.onStop(this) } @@ -224,10 +261,16 @@ internal class Guest( .put(STATE_KEY, "down") .build() + private var _lastReport = clock.millis() + /** * Helper function to track the uptime and downtime of the guest. */ - fun updateUptime(duration: Long) { + fun updateUptime() { + val now = clock.millis() + val duration = now - _lastReport + _lastReport = now + if (state == ServerState.RUNNING) { _uptime += duration } else if (state == ServerState.ERROR) { @@ -239,6 +282,8 @@ internal class Guest( * Helper function to track the uptime of the guest. */ fun collectUptime(result: ObservableLongMeasurement) { + updateUptime() + result.record(_uptime, _upState) result.record(_downtime, _downState) } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index f0325023..fd54ad1d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,8 +22,7 @@ package org.opendc.compute.simulator -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -42,13 +41,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine -import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.HOST_ID -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Duration +import java.time.Instant import java.util.* import kotlin.coroutines.resume @@ -73,45 +66,16 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var idleTime = 0L - var activeTime = 0L - var stealTime = 0L - - val hostId = UUID.randomUUID() - val hostResource = Resource.builder() - .put(HOST_ID, hostId.toString()) - .build() - - // Setup metric reader val duration = 5 * 60L - val reader = CoroutineMetricReader( - this, - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - stealTime += reader.cpuStealTime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - - val meterProvider = SdkMeterProvider - .builder() - .setResource(hostResource) - .setClock(clock.toOtelClock()) - .registerMetricReader(reader) - .build() - val engine = FlowEngine(coroutineContext, clock) - val virtDriver = SimHost( - uid = hostId, + val host = SimHost( + uid = UUID.randomUUID(), name = "test", model = machineModel, meta = emptyMap(), coroutineContext, engine, - meterProvider, + MeterProvider.noop(), SimFairShareHypervisorProvider() ) val vmImageA = MockImage( @@ -150,11 +114,11 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) coroutineScope { - launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } - launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } + launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } + launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } suspendCancellableCoroutine<Unit> { cont -> - virtDriver.addListener(object : HostListener { + host.addListener(object : HostListener { private var finished = 0 override fun onStateChanged(host: Host, server: Server, newState: ServerState) { @@ -168,13 +132,14 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) - virtDriver.close() - meterProvider.close() + host.close() + + val cpuStats = host.getCpuStats() assertAll( - { assertEquals(658, activeTime, "Active time does not match") }, - { assertEquals(2341, idleTime, "Idle time does not match") }, - { assertEquals(637, stealTime, "Steal time does not match") }, + { assertEquals(658, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(2341, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(637, cpuStats.stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } @@ -184,54 +149,16 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var activeTime = 0L - var idleTime = 0L - var uptime = 0L - var downtime = 0L - var guestUptime = 0L - var guestDowntime = 0L - - val hostId = UUID.randomUUID() - val hostResource = Resource.builder() - .put(HOST_ID, hostId.toString()) - .build() - - // Setup metric reader val duration = 5 * 60L - val reader = CoroutineMetricReader( - this, - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - uptime += reader.uptime - downtime += reader.downtime - } - - override fun record(reader: ServerTableReader) { - guestUptime += reader.uptime - guestDowntime += reader.downtime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - - val meterProvider = SdkMeterProvider - .builder() - .setResource(hostResource) - .setClock(clock.toOtelClock()) - .registerMetricReader(reader) - .build() - val engine = FlowEngine(coroutineContext, clock) val host = SimHost( - uid = hostId, + uid = UUID.randomUUID(), name = "test", model = machineModel, meta = emptyMap(), coroutineContext, engine, - meterProvider, + MeterProvider.noop(), SimFairShareHypervisorProvider() ) val image = MockImage( @@ -275,15 +202,17 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) - meterProvider.close() + val cpuStats = host.getCpuStats() + val sysStats = host.getSystemStats() + val guestSysStats = host.getSystemStats(server) assertAll( - { assertEquals(1775, idleTime, "Idle time does not match") }, - { assertEquals(624, activeTime, "Active time does not match") }, - { assertEquals(900001, uptime, "Uptime does not match") }, - { assertEquals(300000, downtime, "Downtime does not match") }, - { assertEquals(900000, guestUptime, "Guest uptime does not match") }, - { assertEquals(300000, guestDowntime, "Guest downtime does not match") }, + { assertEquals(1775, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(624, cpuStats.activeTime, "Active time does not match") }, + { assertEquals(900001, sysStats.uptime.toMillis(), "Uptime does not match") }, + { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") }, + { assertEquals(900001, guestSysStats.uptime.toMillis(), "Guest uptime does not match") }, + { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") }, ) } @@ -332,6 +261,8 @@ internal class SimHostTest { override val state: ServerState = ServerState.TERMINATED + override val launchedAt: Instant? = null + override suspend fun start() {} override suspend fun stop() {} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 4b0b343f..21cfdad2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield +import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost @@ -81,9 +82,19 @@ public class ComputeServiceHelper( } /** - * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * + * @param trace The trace to simulate. + * @param seed The seed for the simulation. + * @param servers A list to which the created servers is added. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). */ - public suspend fun run(trace: List<VirtualMachine>, seed: Long, submitImmediately: Boolean = false) { + public suspend fun run( + trace: List<VirtualMachine>, + seed: Long, + servers: MutableList<Server>? = null, + submitImmediately: Boolean = false + ) { val random = Random(seed) val injector = failureModel?.createInjector(context, clock, service, random) val client = service.newClient() @@ -129,12 +140,14 @@ public class ComputeServiceHelper( meta = mapOf("workload" to workload) ) + servers?.add(server) + // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) - // Delete the server after reaching the end-time of the virtual machine - server.delete() + // Stop the server after reaching the end-time of the virtual machine + server.stop() } } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt index a46885f4..6c515118 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -22,8 +22,6 @@ package org.opendc.compute.workload.export.parquet -import io.opentelemetry.sdk.common.CompletableResultCode -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.telemetry.compute.table.ServerTableReader @@ -33,7 +31,7 @@ import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { +public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { private val serverWriter = ParquetServerDataWriter( File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize @@ -61,11 +59,9 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS serviceWriter.write(reader) } - override fun shutdown(): CompletableResultCode { + override fun close() { hostWriter.close() serviceWriter.close() serverWriter.close() - - return CompletableResultCode.ofSuccess() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 0bbf1443..6fd85e8c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,12 +23,14 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory +import kotlinx.coroutines.* +import org.opendc.compute.api.Server import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter +import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -37,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import java.io.File import java.time.Duration import java.util.* @@ -97,7 +99,7 @@ abstract class Portfolio(name: String) : Experiment(name) { else null val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) - val telemetry = SdkTelemetryManager(clock) + val telemetry = NoopTelemetryManager() val runner = ComputeServiceHelper( coroutineContext, clock, @@ -107,23 +109,35 @@ abstract class Portfolio(name: String) : Experiment(name) { interferenceModel?.withSeed(repeat.toLong()) ) - val exporter = ParquetComputeMetricExporter( - File(config.getString("output-path")), - "portfolio_id=$name/scenario_id=$id/run_id=$repeat", - 4096 - ) - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) - val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) + val servers = mutableListOf<Server>() + val exporter = ComputeMetricReader( + this, + clock, + runner.service, + servers, + ParquetComputeMonitor( + File(config.getString("output-path")), + "portfolio_id=$name/scenario_id=$id/run_id=$repeat", + bufferSize = 4096 + ), + exportInterval = Duration.ofMinutes(5) + ) try { // Instantiate the desired topology runner.apply(topology) - // Run the workload trace - runner.run(vms, seeder.nextLong()) + coroutineScope { + // Run the workload trace + runner.run(vms, seeder.nextLong(), servers) + + // Stop the metric collection + exporter.close() + } } finally { runner.close() + exporter.close() } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 01b2a8fe..62cdf123 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.api.Server import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMetricReader +import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration -import java.time.Instant import java.util.* /** @@ -54,7 +52,7 @@ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var exporter: TestComputeMetricExporter + private lateinit var monitor: TestComputeMonitor /** * The [FilterScheduler] to use for all experiments. @@ -67,11 +65,11 @@ class CapelinIntegrationTest { private lateinit var workloadLoader: ComputeWorkloadLoader /** - * Setup the experimental environment. + * Set up the experimental environment. */ @BeforeEach fun setUp() { - exporter = TestComputeMetricExporter() + monitor = TestComputeMonitor() computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -85,22 +83,22 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val (workload, _) = createTestWorkload(1.0) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology() - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, 0) + runner.run(workload, 0, servers) - val serviceMetrics = exporter.serviceMetrics + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -116,15 +114,15 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } finally { runner.close() - telemetry.close() + reader.close() } } @@ -135,41 +133,41 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1 val (workload, _) = createTestWorkload(0.25, seed) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, seed.toLong()) + runner.run(workload, seed.toLong(), servers) + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { runner.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -181,41 +179,41 @@ class CapelinIntegrationTest { val seed = 0 val (workload, interferenceModel) = createTestWorkload(1.0, seed) - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } ) } @@ -225,43 +223,43 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") val (workload, _) = createTestWorkload(0.25, seed) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } } + { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } ) } @@ -281,8 +279,7 @@ class CapelinIntegrationTest { return stream.use { clusterTopology(stream) } } - class TestComputeMetricExporter : ComputeMetricExporter() { - var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0) + class TestComputeMonitor : ComputeMonitor { var idleTime = 0L var activeTime = 0L var stealTime = 0L @@ -290,19 +287,6 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L - override fun record(reader: ServiceTableReader) { - serviceMetrics = ServiceData( - reader.timestamp, - reader.hostsUp, - reader.hostsDown, - reader.serversPending, - reader.serversActive, - reader.attemptsSuccess, - reader.attemptsFailure, - reader.attemptsError - ) - } - override fun record(reader: HostTableReader) { idleTime += reader.cpuIdleTime activeTime += reader.cpuActiveTime diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 43093abf..5762ce64 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -36,8 +36,7 @@ dependencies { implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.jackson.module.kotlin) { - exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") - } - implementation("org.jetbrains.kotlin:kotlin-reflect:1.6.10") + implementation(libs.jackson.module.kotlin) + + testImplementation(libs.slf4j.simple) } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 5245261c..99948c8e 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.tf20.core -import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine @@ -52,7 +51,7 @@ public class SimTFDevice( context: CoroutineContext, clock: Clock, meter: Meter, - private val pu: ProcessingUnit, + pu: ProcessingUnit, private val memory: MemoryUnit, powerModel: PowerModel ) : TFDevice { @@ -70,17 +69,13 @@ public class SimTFDevice( ) /** - * The identifier of a device. - */ - private val deviceId = AttributeKey.stringKey("device.id") - - /** * The usage of the device. */ private val _usage = meter.histogramBuilder("device.usage") .setDescription("The amount of device resources used") .setUnit("MHz") .build() + private var _resourceUsage = 0.0 /** * The power draw of the device. @@ -89,6 +84,8 @@ public class SimTFDevice( .setDescription("The power draw of the device") .setUnit("W") .build() + private var _powerUsage = 0.0 + private var _energyUsage = 0.0 /** * The workload that will be run by the device. @@ -175,7 +172,10 @@ public class SimTFDevice( override fun onConverge(conn: FlowConnection, now: Long) { _usage.record(conn.rate) + _resourceUsage = conn.rate _power.record(machine.psu.powerDraw) + _powerUsage = machine.powerUsage + _energyUsage = machine.energyUsage } } @@ -197,6 +197,10 @@ public class SimTFDevice( } } + override fun getDeviceStats(): TFDeviceStats { + return TFDeviceStats(_resourceUsage, _powerUsage, _energyUsage) + } + override fun close() { machine.cancel() scope.cancel() diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt index bbc34ed9..839ed8a9 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt @@ -47,4 +47,9 @@ public interface TFDevice : AutoCloseable { * Perform [flops] amount of computation on the device. */ public suspend fun compute(flops: Double) + + /** + * Collect device statistics. + */ + public fun getDeviceStats(): TFDeviceStats } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt new file mode 100644 index 00000000..016d2a8b --- /dev/null +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.tf20.core + +/** + * Statistics about a TensorFlow [TFDevice]. + * + * @property resourceUsage The resource usage of the device (in MHz). + * @property powerUsage The instantaneous power draw of the device (in W). + * @property energyUsage Cumulative energy usage of the device since boot (in J). + */ +data class TFDeviceStats( + val resourceUsage: Double, + val powerUsage: Double, + val energyUsage: Double +) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 28a2a319..0d5fbebb 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -25,6 +25,7 @@ package org.opendc.experiments.tf20.core import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.compute.model.MemoryUnit @@ -57,6 +58,12 @@ internal class SimTFDeviceTest { launch { device.compute(1e6) } launch { device.compute(2e6) } } - assertEquals(3681, clock.millis()) + + val stats = device.getDeviceStats() + + assertAll( + { assertEquals(3681, clock.millis()) }, + { assertEquals(325.75, stats.energyUsage) } + ) } } diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index c54595d3..1803ae69 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(projects.opendcFaas.opendcFaasApi) api(projects.opendcTelemetry.opendcTelemetryApi) + api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.opentelemetry.semconv) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 1d5331cb..f7dc3c1f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -25,10 +25,13 @@ package org.opendc.faas.service import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient +import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy import org.opendc.faas.service.deployer.FunctionDeployer import org.opendc.faas.service.internal.FaaSServiceImpl import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.service.telemetry.FunctionStats +import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -42,6 +45,16 @@ public interface FaaSService : AutoCloseable { public fun newClient(): FaaSClient /** + * Collect statistics about the scheduler of the service. + */ + public fun getSchedulerStats(): SchedulerStats + + /** + * Collect statistics about the specified [function]. + */ + public fun getFunctionStats(function: FaaSFunction): FunctionStats + + /** * Terminate the lifecycle of the FaaS service, stopping all running function instances. */ public override fun close() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 836231c8..52fcffa1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -29,7 +29,9 @@ import io.opentelemetry.api.metrics.LongHistogram import io.opentelemetry.api.metrics.LongUpDownCounter import io.opentelemetry.api.metrics.Meter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance +import org.opendc.faas.service.telemetry.FunctionStats import java.util.* /** @@ -46,7 +48,7 @@ public class FunctionObject( /** * The attributes of this function. */ - public val attributes: Attributes = Attributes.builder() + private val attributes: Attributes = Attributes.builder() .put(ResourceAttributes.FAAS_ID, uid.toString()) .put(ResourceAttributes.FAAS_NAME, name) .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) @@ -56,68 +58,78 @@ public class FunctionObject( /** * The total amount of function invocations received by the function. */ - public val invocations: LongCounter = meter.counterBuilder("function.invocations.total") + private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") .setDescription("Number of function invocations") .setUnit("1") .build() + private var _invocations = 0L /** * The amount of function invocations that could be handled directly. */ - public val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") + private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") .setDescription("Number of function invocations handled directly") .setUnit("1") .build() + private var _timelyInvocations = 0L /** * The amount of function invocations that were delayed due to function deployment. */ - public val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") + private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() + private var _delayedInvocations = 0L /** * The amount of function invocations that failed. */ - public val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") + private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") .setDescription("Number of function invocations that failed") .setUnit("1") .build() + private var _failedInvocations = 0L /** * The amount of instances for this function. */ - public val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") + private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") .setDescription("Number of active function instances") .setUnit("1") .build() + private var _activeInstances = 0 /** * The amount of idle instances for this function. */ - public val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") + private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") .setDescription("Number of idle function instances") .setUnit("1") .build() + private var _idleInstances = 0 /** * The time that the function waited. */ - public val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") + private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") .ofLongs() .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() + private val _waitTime = DescriptiveStatistics() + .apply { windowSize = 100 } /** * The time that the function was running. */ - public val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") + private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") .ofLongs() .setDescription("Time the function was running") .setUnit("ms") .build() + private val _activeTime = DescriptiveStatistics() + .apply { windowSize = 100 } /** * The instances associated with this function. @@ -134,6 +146,80 @@ public class FunctionObject( public val meta: MutableMap<String, Any> = meta.toMutableMap() + /** + * Report a scheduled invocation. + */ + internal fun reportSubmission() { + invocations.add(1, attributes) + _invocations++ + } + + /** + * Report the deployment of an invocation. + */ + internal fun reportDeployment(isDelayed: Boolean) { + if (isDelayed) { + delayedInvocations.add(1, attributes) + _delayedInvocations++ + + idleInstances.add(1, attributes) + _idleInstances++ + } else { + timelyInvocations.add(1, attributes) + _timelyInvocations++ + } + } + + /** + * Report the start of a function invocation. + */ + internal fun reportStart(start: Long, submitTime: Long) { + val wait = start - submitTime + waitTime.record(wait, attributes) + _waitTime.addValue(wait.toDouble()) + + idleInstances.add(-1, attributes) + _idleInstances-- + activeInstances.add(1, attributes) + _activeInstances++ + } + + /** + * Report the failure of a function invocation. + */ + internal fun reportFailure() { + failedInvocations.add(1, attributes) + _failedInvocations++ + } + + /** + * Report the end of a function invocation. + */ + internal fun reportEnd(duration: Long) { + activeTime.record(duration, attributes) + _activeTime.addValue(duration.toDouble()) + idleInstances.add(1, attributes) + _idleInstances++ + activeInstances.add(-1, attributes) + _activeInstances-- + } + + /** + * Collect the statistics of this function. + */ + internal fun getStats(): FunctionStats { + return FunctionStats( + _invocations, + _timelyInvocations, + _delayedInvocations, + _failedInvocations, + _activeInstances, + _idleInstances, + _waitTime.copy(), + _activeTime.copy() + ) + } + override fun close() { instances.forEach(FunctionInstance::close) instances.clear() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 1526be9d..ce3b2b98 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -38,6 +38,8 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.service.telemetry.FunctionStats +import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock import java.util.* @@ -103,6 +105,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations") .setUnit("1") .build() + private var totalInvocations = 0L /** * The amount of function invocations that could be handled directly. @@ -111,6 +114,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations handled directly") .setUnit("1") .build() + private var timelyInvocations = 0L /** * The amount of function invocations that were delayed due to function deployment. @@ -119,6 +123,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() + private var delayedInvocations = 0L override fun newClient(): FaaSClient { return object : FaaSClient { @@ -187,6 +192,15 @@ internal class FaaSServiceImpl( } } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats(totalInvocations, timelyInvocations, delayedInvocations) + } + + override fun getFunctionStats(function: FaaSFunction): FunctionStats { + val func = requireNotNull(functions[function.uid]) { "Unknown function" } + return func.getStats() + } + /** * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ @@ -219,7 +233,8 @@ internal class FaaSServiceImpl( val instance = if (activeInstance != null) { _timelyInvocations.add(1) - function.timelyInvocations.add(1, function.attributes) + timelyInvocations++ + function.reportDeployment(isDelayed = false) activeInstance } else { @@ -227,29 +242,24 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - function.idleInstances.add(1, function.attributes) - _delayedInvocations.add(1) - function.delayedInvocations.add(1, function.attributes) + delayedInvocations++ + function.reportDeployment(isDelayed = true) instance } suspend { val start = clock.millis() - function.waitTime.record(start - submitTime, function.attributes) - function.idleInstances.add(-1, function.attributes) - function.activeInstances.add(1, function.attributes) + function.reportStart(start, submitTime) try { instance.invoke() } catch (e: Throwable) { logger.debug(e) { "Function invocation failed" } - function.failedInvocations.add(1, function.attributes) + function.reportFailure() } finally { val end = clock.millis() - function.activeTime.record(end - start, function.attributes) - function.idleInstances.add(1, function.attributes) - function.activeInstances.add(-1, function.attributes) + function.reportEnd(end - start) } }.startCoroutineCancellable(cont) } @@ -262,7 +272,8 @@ internal class FaaSServiceImpl( check(function.uid in functions) { "Function does not exist (anymore)" } _invocations.add(1) - function.invocations.add(1, function.attributes) + totalInvocations++ + function.reportSubmission() return suspendCancellableCoroutine { cont -> if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt new file mode 100644 index 00000000..497ee423 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.telemetry + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics + +/** + * Statistics about function invocations. + * + * @property totalInvocations The number of function invocations. + * @property timelyInvocations The number of function invocations that could be handled directly. + * @property delayedInvocations The number of function invocations that are delayed (cold starts). + * @property failedInvocations The number of function invocations that failed. + * @property activeInstances The number of active function instances. + * @property idleInstances The number of idle function instances. + * @property waitTime Statistics about the wait time of a function invocation. + * @property activeTime Statistics about the runtime of a function invocation. + */ +public data class FunctionStats( + val totalInvocations: Long, + val timelyInvocations: Long, + val delayedInvocations: Long, + val failedInvocations: Long, + val activeInstances: Int, + val idleInstances: Int, + val waitTime: DescriptiveStatistics, + val activeTime: DescriptiveStatistics +) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..cabb1d56 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.telemetry + +/** + * Statistics reported by the FaaS scheduler. + * + * @property totalInvocations The total amount of function invocations received by the scheduler. + * @property timelyInvocations The amount of function invocations that could be handled directly. + * @property delayedInvocations The amount of function invocations that were delayed due to function deployment. + */ +public data class SchedulerStats( + val totalInvocations: Long, + val timelyInvocations: Long, + val delayedInvocations: Long +) diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 68233c1a..a3d0d34e 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -123,7 +123,6 @@ public class SimFunctionDeployer( /** * Start the function instance. */ - @OptIn(InternalCoroutinesApi::class) internal fun start() { check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" } job = scope.launch { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 0dc9ba87..792a8584 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -28,22 +28,25 @@ import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.faas.service.FaaSService import org.opendc.faas.service.autoscaler.FunctionTerminationPolicyFixed import org.opendc.faas.service.router.RandomRoutingPolicy -import org.opendc.faas.simulator.delay.ZeroDelayInjector +import org.opendc.faas.simulator.delay.ColdStartModel +import org.opendc.faas.simulator.delay.StochasticDelayInjector import org.opendc.faas.simulator.workload.SimFaaSWorkload import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation import java.time.Duration +import java.util.* /** * A test suite for the [FaaSService] implementation under simulated conditions. @@ -65,10 +68,15 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runBlockingSimulation { - val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) { - override suspend fun invoke() {} + val random = Random(0) + val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000) { + override suspend fun invoke() { + delay(random.nextInt(1000).toLong()) + } }) - val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload } + + val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) + val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } val service = FaaSService( coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) @@ -84,8 +92,15 @@ internal class SimFaaSServiceTest { yield() + val funcStats = service.getFunctionStats(function) + assertAll( { coVerify { workload.invoke() } }, + { assertEquals(1, funcStats.totalInvocations) }, + { assertEquals(1, funcStats.delayedInvocations) }, + { assertEquals(0, funcStats.failedInvocations) }, + { assertEquals(100.0, funcStats.waitTime.mean) }, + { assertEquals(1285.0, funcStats.activeTime.mean) }, ) } } diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index 47e30a14..b476a669 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcCompute.opendcComputeService) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt new file mode 100644 index 00000000..593203fc --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt @@ -0,0 +1,424 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.telemetry.compute.table.* +import java.time.Clock +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param clock The virtual clock. + * @param service The [ComputeService] to monitor. + * @param servers The [Server]s to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + scope: CoroutineScope, + clock: Clock, + private val service: ComputeService, + private val servers: List<Server>, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + + try { + while (isActive) { + delay(intervalMs) + + try { + val now = clock.instant() + + for (host in service.hosts) { + val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + for (server in servers) { + val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + serviceTableReader.record(now) + monitor.record(serviceTableReader) + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + } + } + } finally { + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index a9290c47..ca5da079 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -31,7 +31,6 @@ import io.opentelemetry.sdk.metrics.export.MetricReaderFactory import kotlinx.coroutines.* import mu.KotlinLogging import java.time.Duration -import java.util.* /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index a150de4e..7c0c43ed 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,8 +23,9 @@ package org.opendc.web.runner import mu.KotlinLogging +import org.opendc.compute.api.Server import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply @@ -35,13 +36,12 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario import org.opendc.web.runner.internal.JobManager -import org.opendc.web.runner.internal.WebComputeMetricExporter +import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration import java.util.* @@ -180,9 +180,9 @@ public class OpenDCRunner( private val scenario: Scenario, private val repeat: Int, private val topology: Topology, - ) : RecursiveTask<WebComputeMetricExporter.Results>() { - override fun compute(): WebComputeMetricExporter.Results { - val exporter = WebComputeMetricExporter() + ) : RecursiveTask<WebComputeMonitor.Results>() { + override fun compute(): WebComputeMonitor.Results { + val monitor = WebComputeMonitor() // Schedule task that interrupts the simulation if it runs for too long. val currentThread = Thread.currentThread() @@ -206,25 +206,24 @@ public class OpenDCRunner( else null - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, failureModel, interferenceModel.takeIf { phenomena.interference } ) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) + val servers = mutableListOf<Server>() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, seeder.nextLong()) + simulator.run(vms, seeder.nextLong(), servers) - val serviceMetrics = collectServiceMetrics(telemetry.metricProducer) + val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -235,14 +234,14 @@ public class OpenDCRunner( } } finally { simulator.close() - telemetry.close() + reader.close() } } } finally { interruptTask.cancel(false) } - return exporter.collectResults() + return monitor.collectResults() } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt index 8de0cee4..99b8aaf1 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt @@ -62,7 +62,7 @@ internal class JobManager(private val client: OpenDCRunnerClient) { /** * Persist the specified results. */ - fun finish(id: Long, results: List<WebComputeMetricExporter.Results>) { + fun finish(id: Long, results: List<WebComputeMonitor.Results>) { client.jobs.update( id, Job.Update( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 04437a5f..69350d8c 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -22,7 +22,6 @@ package org.opendc.web.runner.internal -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.telemetry.compute.table.ServiceTableReader @@ -32,7 +31,7 @@ import kotlin.math.roundToLong /** * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ -internal class WebComputeMetricExporter : ComputeMetricExporter() { +internal class WebComputeMonitor : ComputeMonitor { override fun record(reader: HostTableReader) { val slices = reader.downtime / SLICE_LENGTH diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index ebace07d..b8bc0e33 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -30,6 +30,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import org.opendc.workflow.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -46,6 +47,11 @@ public interface WorkflowService : AutoCloseable { public suspend fun invoke(job: Job) /** + * Collect statistics about the workflow scheduler. + */ + public fun getSchedulerStats(): SchedulerStats + + /** * Terminate the lifecycle of the workflow service, stopping all running workflows. */ public override fun close() diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index cdaec021..9c7f18a2 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -34,6 +34,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import org.opendc.workflow.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import java.util.* @@ -145,6 +146,7 @@ public class WorkflowServiceImpl( .setDescription("Number of submitted jobs") .setUnit("1") .build() + private var _workflowsSubmitted: Int = 0 /** * The number of jobs that are running. @@ -153,6 +155,7 @@ public class WorkflowServiceImpl( .setDescription("Number of jobs running") .setUnit("1") .build() + private var _workflowsRunning: Int = 0 /** * The number of jobs that have finished running. @@ -161,6 +164,7 @@ public class WorkflowServiceImpl( .setDescription("Number of jobs that finished running") .setUnit("1") .build() + private var _workflowsFinished: Int = 0 /** * The number of tasks that have been submitted to the service. @@ -169,6 +173,7 @@ public class WorkflowServiceImpl( .setDescription("Number of submitted tasks") .setUnit("1") .build() + private var _tasksSubmitted: Int = 0 /** * The number of jobs that are running. @@ -177,6 +182,7 @@ public class WorkflowServiceImpl( .setDescription("Number of tasks running") .setUnit("1") .build() + private var _tasksRunning: Int = 0 /** * The number of jobs that have finished running. @@ -185,6 +191,7 @@ public class WorkflowServiceImpl( .setDescription("Number of tasks that finished running") .setUnit("1") .build() + private var _tasksFinished: Int = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. @@ -223,16 +230,22 @@ public class WorkflowServiceImpl( } submittedTasks.add(1) + _tasksSubmitted++ } instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) submittedJobs.add(1) + _workflowsSubmitted++ pacer.enqueue() } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats(_workflowsSubmitted, _workflowsRunning, _workflowsFinished, _tasksSubmitted, _tasksRunning, _tasksFinished) + } + override fun close() { scope.cancel() } @@ -271,6 +284,7 @@ public class WorkflowServiceImpl( activeJobs += jobInstance runningJobs.add(1) + _workflowsRunning++ rootListener.jobStarted(jobInstance) } @@ -350,6 +364,7 @@ public class WorkflowServiceImpl( val task = taskByServer.getValue(server) task.startedAt = clock.millis() runningTasks.add(1) + _tasksRunning++ rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -368,6 +383,8 @@ public class WorkflowServiceImpl( runningTasks.add(-1) finishedTasks.add(1) + _tasksRunning-- + _tasksFinished++ rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -395,6 +412,8 @@ public class WorkflowServiceImpl( activeJobs -= job runningJobs.add(-1) finishedJobs.add(1) + _workflowsRunning-- + _workflowsFinished++ rootListener.jobFinished(job) job.cont.resume(Unit) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..7c7d7c4d --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.telemetry + +/** + * Statistics about the workflow scheduler. + * + * @property workflowsSubmitted The number of workflows submitted to the scheduler. + * @property workflowsRunning The number of workflows that are currently running. + * @property workflowsFinished The number of workflows that have completed since the scheduler started. + * @property tasksSubmitted The number of tasks submitted to the scheduler. + * @property tasksRunning The number of tasks that are currently running. + * @property tasksFinished The number of tasks that have completed. + */ +public data class SchedulerStats( + val workflowsSubmitted: Int, + val workflowsRunning: Int, + val workflowsFinished: Int, + val tasksSubmitted: Int, + val tasksRunning: Int, + val tasksFinished: Int +) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 1fd332b9..d5f06587 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,7 +22,6 @@ package org.opendc.workflow.service -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -66,7 +65,6 @@ internal class WorkflowServiceTest { @Test fun testTrace() = runBlockingSimulation { // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - val HOST_COUNT = 4 val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) @@ -74,7 +72,8 @@ internal class WorkflowServiceTest { val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) - repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) } + val hostCount = 4 + repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines val workflowScheduler = WorkflowSchedulerSpec( @@ -98,13 +97,13 @@ internal class WorkflowServiceTest { computeHelper.close() } - val metrics = collectMetrics(workflowHelper.metricProducer) + val metrics = workflowHelper.service.getSchedulerStats() assertAll( - { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, - { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } ) @@ -130,28 +129,4 @@ internal class WorkflowServiceTest { SimSpaceSharedHypervisorProvider() ) } - - class WorkflowMetrics { - var jobsSubmitted = 0L - var jobsActive = 0L - var jobsFinished = 0L - var tasksSubmitted = 0L - var tasksActive = 0L - var tasksFinished = 0L - } - - /** - * Collect the metrics of the workflow service. - */ - private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } - val res = WorkflowMetrics() - res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 - return res - } } |
