summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt7
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt19
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt35
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt44
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt39
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt47
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt51
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt5
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt43
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt3
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt46
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt9
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt90
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt47
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt123
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt21
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt)8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt40
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt148
-rw-r--r--opendc-experiments/opendc-experiments-tf20/build.gradle.kts7
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt18
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt5
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt36
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt9
-rw-r--r--opendc-faas/opendc-faas-service/build.gradle.kts1
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt13
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt104
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt35
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt48
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt36
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt1
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt25
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts1
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt424
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt1
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt29
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt2
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt (renamed from opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt)3
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt6
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt19
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt42
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt39
43 files changed, 1404 insertions, 327 deletions
diff --git a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
index b508a9f8..64b73d0b 100644
--- a/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
+++ b/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
@@ -22,6 +22,8 @@
package org.opendc.compute.api
+import java.time.Instant
+
/**
* A stateful object representing a server instance that is running on some physical or virtual machine.
*/
@@ -42,6 +44,11 @@ public interface Server : Resource {
public val state: ServerState
/**
+ * The most recent moment in time when the server was launched.
+ */
+ public val launchedAt: Instant?
+
+ /**
* Request the server to be started.
*
* This method is guaranteed to return after the request was acknowledged, but might return before the server was
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 2a1fbaa0..3a6baaa1 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -25,9 +25,11 @@ package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
+import org.opendc.compute.api.Server
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import kotlin.coroutines.CoroutineContext
@@ -37,16 +39,11 @@ import kotlin.coroutines.CoroutineContext
*/
public interface ComputeService : AutoCloseable {
/**
- * The hosts that are used by the compute service.
+ * The hosts that are registered with the "compute" service.
*/
public val hosts: Set<Host>
/**
- * The number of hosts available in the system.
- */
- public val hostCount: Int
-
- /**
* Create a new [ComputeClient] to control the compute service.
*/
public fun newClient(): ComputeClient
@@ -66,6 +63,16 @@ public interface ComputeService : AutoCloseable {
*/
public override fun close()
+ /**
+ * Lookup the [Host] that currently hosts the specified [server].
+ */
+ public fun lookupHost(server: Server): Host?
+
+ /**
+ * Collect the statistics about the scheduler component of this service.
+ */
+ public fun getSchedulerStats(): SchedulerStats
+
public companion object {
/**
* Construct a new [ComputeService] implementation.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index bed15dfd..67b144d9 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -23,6 +23,10 @@
package org.opendc.compute.service.driver
import org.opendc.compute.api.Server
+import org.opendc.compute.service.driver.telemetry.GuestCpuStats
+import org.opendc.compute.service.driver.telemetry.GuestSystemStats
+import org.opendc.compute.service.driver.telemetry.HostCpuStats
+import org.opendc.compute.service.driver.telemetry.HostSystemStats
import java.util.*
/**
@@ -55,6 +59,11 @@ public interface Host {
public val meta: Map<String, Any>
/**
+ * The [Server] instances known to the host.
+ */
+ public val instances: Set<Server>
+
+ /**
* Determine whether the specified [instance][server] can still fit on this host.
*/
public fun canFit(server: Server): Boolean
@@ -100,4 +109,30 @@ public interface Host {
* Remove a [HostListener] from this host.
*/
public fun removeListener(listener: HostListener)
+
+ /**
+ * Query the system statistics of the host.
+ */
+ public fun getSystemStats(): HostSystemStats
+
+ /**
+ * Query the system statistics of a [Server] that is located on this host.
+ *
+ * @param server The [Server] to obtain the system statistics of.
+ * @throws IllegalArgumentException if the server is not present on the host.
+ */
+ public fun getSystemStats(server: Server): GuestSystemStats
+
+ /**
+ * Query the CPU statistics of the host.
+ */
+ public fun getCpuStats(): HostCpuStats
+
+ /**
+ * Query the CPU statistics of a [Server] that is located on this host.
+ *
+ * @param server The [Server] to obtain the CPU statistics of.
+ * @throws IllegalArgumentException if the server is not present on the host.
+ */
+ public fun getCpuStats(server: Server): GuestCpuStats
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt
new file mode 100644
index 00000000..b5d63471
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestCpuStats.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.driver.telemetry
+
+/**
+ * Statistics about the CPUs of a guest.
+ *
+ * @property activeTime The cumulative time (in seconds) that the CPUs of the guest were actively running.
+ * @property idleTime The cumulative time (in seconds) the CPUs of the guest were idle.
+ * @property stealTime The cumulative CPU time (in seconds) that the guest was ready to run, but not granted time by the host.
+ * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference with other machines.
+ * @property capacity The available CPU capacity of the guest (in MHz).
+ * @property usage Amount of CPU resources (in MHz) actually used by the guest.
+ * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity.
+ */
+public data class GuestCpuStats(
+ val activeTime: Long,
+ val idleTime: Long,
+ val stealTime: Long,
+ val lostTime: Long,
+ val capacity: Double,
+ val usage: Double,
+ val utilization: Double
+)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt
new file mode 100644
index 00000000..b3958473
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.driver.telemetry
+
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * System-level statistics of a guest.
+ *
+ * @property uptime The cumulative uptime of the guest since last boot (in ms).
+ * @property downtime The cumulative downtime of the guest since last boot (in ms).
+ * @property bootTime The time at which the guest booted.
+ */
+public data class GuestSystemStats(
+ val uptime: Duration,
+ val downtime: Duration,
+ val bootTime: Instant
+)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt
new file mode 100644
index 00000000..55e23c0e
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostCpuStats.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.driver.telemetry
+
+/**
+ * Statistics about the CPUs of a host.
+ *
+ * @property activeTime The cumulative time (in seconds) that the CPUs of the host were actively running.
+ * @property idleTime The cumulative time (in seconds) the CPUs of the host were idle.
+ * @property stealTime The cumulative CPU time (in seconds) that virtual machines were ready to run, but were not able to.
+ * @property lostTime The cumulative CPU time (in seconds) that was lost due to interference between virtual machines.
+ * @property capacity The available CPU capacity of the host (in MHz).
+ * @property demand Amount of CPU resources (in MHz) the guests would use if there were no CPU contention or CPU
+ * limits.
+ * @property usage Amount of CPU resources (in MHz) actually used by the host.
+ * @property utilization Utilization of the CPU resources (in %) relative to the total CPU capacity.
+ */
+public data class HostCpuStats(
+ val activeTime: Long,
+ val idleTime: Long,
+ val stealTime: Long,
+ val lostTime: Long,
+ val capacity: Double,
+ val demand: Double,
+ val usage: Double,
+ val utilization: Double
+)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt
new file mode 100644
index 00000000..1c07023f
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.driver.telemetry
+
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * System-level statistics of a host.
+
+ * @property uptime The cumulative uptime of the host since last boot (in ms).
+ * @property downtime The cumulative downtime of the host since last boot (in ms).
+ * @property bootTime The time at which the server started.
+ * @property powerUsage Instantaneous power usage of the system (in W).
+ * @property energyUsage The cumulative energy usage of the system (in J).
+ * @property guestsTerminated The number of guests that are in a terminated state.
+ * @property guestsRunning The number of guests that are in a running state.
+ * @property guestsError The number of guests that are in an error state.
+ * @property guestsInvalid The number of guests that are in an unknown state.
+ */
+public data class HostSystemStats(
+ val uptime: Duration,
+ val downtime: Duration,
+ val bootTime: Instant,
+ val powerUsage: Double,
+ val energyUsage: Double,
+ val guestsTerminated: Int,
+ val guestsRunning: Int,
+ val guestsError: Int,
+ val guestsInvalid: Int,
+)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
index f2929bf3..45775640 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
@@ -27,6 +27,7 @@ import org.opendc.compute.api.Image
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
+import java.time.Instant
import java.util.*
/**
@@ -55,6 +56,9 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
override var state: ServerState = delegate.state
private set
+ override var launchedAt: Instant? = null
+ private set
+
override suspend fun start() {
delegate.start()
refresh()
@@ -95,6 +99,7 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
labels = delegate.labels
meta = delegate.meta
state = delegate.state
+ launchedAt = delegate.launchedAt
}
override fun onStateChanged(server: Server, newState: ServerState) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 144b6573..e8664e5c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -36,8 +36,10 @@ import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -126,6 +128,9 @@ internal class ComputeServiceImpl(
private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success")
private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure")
private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error")
+ private var _attemptsSuccess = 0L
+ private var _attemptsFailure = 0L
+ private var _attemptsError = 0L
/**
* The response time of the service.
@@ -145,6 +150,8 @@ internal class ComputeServiceImpl(
.build()
private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending")
private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active")
+ private var _serversPending = 0
+ private var _serversActive = 0
/**
* The [Pacer] to use for scheduling the scheduler cycles.
@@ -154,9 +161,6 @@ internal class ComputeServiceImpl(
override val hosts: Set<Host>
get() = hostToView.keys
- override val hostCount: Int
- get() = hostToView.size
-
init {
val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
@@ -165,7 +169,7 @@ internal class ComputeServiceImpl(
.setDescription("Number of hosts registered with the scheduler")
.setUnit("1")
.buildWithCallback { result ->
- val total = hostCount
+ val total = hosts.size
val available = availableHosts.size.toLong()
result.record(available, upState)
@@ -322,17 +326,35 @@ internal class ComputeServiceImpl(
}
}
+ override fun lookupHost(server: Server): Host? {
+ val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" }
+ return internal.host
+ }
+
override fun close() {
scope.cancel()
}
+ override fun getSchedulerStats(): SchedulerStats {
+ return SchedulerStats(
+ availableHosts.size,
+ hostToView.size - availableHosts.size,
+ _attemptsSuccess,
+ _attemptsFailure,
+ _attemptsError,
+ _serversPending,
+ _serversActive
+ )
+ }
+
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
val now = clock.millis()
val request = SchedulingRequest(server, now)
- server.lastProvisioningTimestamp = now
+ server.launchedAt = Instant.ofEpochMilli(now)
queue.add(request)
+ _serversPending++
_servers.add(1, _serversPendingAttr)
requestSchedulingCycle()
return request
@@ -371,6 +393,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
+ _serversPending--
_servers.add(-1, _serversPendingAttr)
continue
}
@@ -383,7 +406,9 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
+ _serversPending--
_servers.add(-1, _serversPendingAttr)
+ _attemptsFailure++
_schedulingAttempts.add(1, _schedulingAttemptsFailureAttr)
logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
@@ -399,6 +424,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
+ _serversPending--
_servers.add(-1, _serversPendingAttr)
_schedulingLatency.record(now - request.submitTime, server.attributes)
@@ -417,6 +443,8 @@ internal class ComputeServiceImpl(
activeServers[server] = host
_servers.add(1, _serversActiveAttr)
+ _serversActive++
+ _attemptsSuccess++
_schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr)
} catch (e: Throwable) {
logger.error(e) { "Failed to deploy VM" }
@@ -425,6 +453,7 @@ internal class ComputeServiceImpl(
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+ _attemptsError++
_schedulingAttempts.add(1, _schedulingAttemptsErrorAttr)
}
}
@@ -481,6 +510,7 @@ internal class ComputeServiceImpl(
logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
if (activeServers.remove(server) != null) {
+ _serversActive--
_servers.add(-1, _serversActiveAttr)
}
@@ -503,7 +533,8 @@ internal class ComputeServiceImpl(
*/
private fun collectProvisionTime(result: ObservableLongMeasurement) {
for ((_, server) in servers) {
- result.record(server.lastProvisioningTimestamp, server.attributes)
+ val launchedAt = server.launchedAt ?: continue
+ result.record(launchedAt.toEpochMilli(), server.attributes)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index f1b92c66..d2a2d896 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -28,6 +28,7 @@ import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
+import java.time.Instant
import java.util.UUID
/**
@@ -75,7 +76,7 @@ internal class InternalServer(
/**
* The most recent timestamp when the server entered a provisioning state.
*/
- @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE
+ override var launchedAt: Instant? = null
/**
* The current scheduling request.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt
new file mode 100644
index 00000000..4dc70286
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.telemetry
+
+import org.opendc.compute.service.ComputeService
+
+/**
+ * Statistics about the scheduling component of the [ComputeService].
+ *
+ * @property hostsAvailable The number of hosts currently available for scheduling.
+ * @property hostsUnavailable The number of hosts unavailable for scheduling.
+ * @property attemptsSuccess Scheduling attempts that resulted into an allocation onto a host.
+ * @property attemptsFailure The number of failed scheduling attempt due to insufficient capacity at the moment.
+ * @property attemptsError The number of scheduling attempts that failed due to system error.
+ * @property serversPending The number of servers that are pending to be scheduled.
+ * @property serversActive The number of servers that are currently managed by the service and running.
+ */
+public data class SchedulerStats(
+ val hostsAvailable: Int,
+ val hostsUnavailable: Int,
+ val attemptsSuccess: Long,
+ val attemptsFailure: Long,
+ val attemptsError: Long,
+ val serversPending: Int,
+ val serversActive: Int
+)
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 7b8d0fe2..eb106817 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -24,7 +24,6 @@ package org.opendc.compute.service
import io.mockk.*
import io.opentelemetry.api.metrics.MeterProvider
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNull
@@ -48,10 +47,9 @@ import java.util.*
/**
* Test suite for the [ComputeService] interface.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class ComputeServiceTest {
- lateinit var scope: SimulationCoroutineScope
- lateinit var service: ComputeService
+ private lateinit var scope: SimulationCoroutineScope
+ private lateinit var service: ComputeService
@BeforeEach
fun setUp() {
@@ -128,14 +126,12 @@ internal class ComputeServiceTest {
every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
- assertEquals(0, service.hostCount)
assertEquals(emptySet<Host>(), service.hosts)
service.addHost(host)
verify(exactly = 1) { host.addListener(any()) }
- assertEquals(1, service.hostCount)
assertEquals(1, service.hosts.size)
service.removeHost(host)
@@ -150,7 +146,6 @@ internal class ComputeServiceTest {
every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.DOWN
- assertEquals(0, service.hostCount)
assertEquals(emptySet<Host>(), service.hosts)
service.addHost(host)
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index dfd3bc67..4e5a37ae 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -23,7 +23,6 @@
package org.opendc.compute.service
import io.mockk.*
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
@@ -41,7 +40,6 @@ import java.util.*
/**
* Test suite for the [InternalServer] implementation.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
class InternalServerTest {
@Test
fun testEquality() {
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 4eb6392e..323ae4fe 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -29,11 +29,14 @@ import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
-import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.service.driver.telemetry.GuestCpuStats
+import org.opendc.compute.service.driver.telemetry.GuestSystemStats
+import org.opendc.compute.service.driver.telemetry.HostCpuStats
+import org.opendc.compute.service.driver.telemetry.HostSystemStats
import org.opendc.compute.simulator.internal.Guest
import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
@@ -49,6 +52,8 @@ import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.FlowEngine
+import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -81,11 +86,6 @@ public class SimHost(
private val clock = engine.clock
/**
- * The logger instance of this server.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The [Meter] to track metrics of the simulated host.
*/
private val meter = meterProvider.get("org.opendc.compute.simulator")
@@ -112,6 +112,9 @@ public class SimHost(
private val guests = HashMap<Server, Guest>()
private val _guests = mutableListOf<Guest>()
+ override val instances: Set<Server>
+ get() = guests.keys
+
override val state: HostState
get() = _state
private var _state: HostState = HostState.DOWN
@@ -249,6 +252,68 @@ public class SimHost(
machine.cancel()
}
+ override fun getSystemStats(): HostSystemStats {
+ updateUptime()
+
+ var terminated = 0
+ var running = 0
+ var error = 0
+ var invalid = 0
+
+ val guests = _guests.listIterator()
+ for (guest in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ ServerState.DELETED -> {
+ // Remove guests that have been deleted
+ this.guests.remove(guest.server)
+ guests.remove()
+ }
+ else -> invalid++
+ }
+ }
+
+ return HostSystemStats(
+ Duration.ofMillis(_uptime),
+ Duration.ofMillis(_downtime),
+ Instant.ofEpochMilli(_bootTime),
+ machine.powerUsage,
+ machine.energyUsage,
+ terminated,
+ running,
+ error,
+ invalid
+ )
+ }
+
+ override fun getSystemStats(server: Server): GuestSystemStats {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ return guest.getSystemStats()
+ }
+
+ override fun getCpuStats(): HostCpuStats {
+ val counters = hypervisor.counters
+ counters.flush()
+
+ return HostCpuStats(
+ counters.cpuActiveTime / 1000L,
+ counters.cpuIdleTime / 1000L,
+ counters.cpuStealTime / 1000L,
+ counters.cpuLostTime / 1000L,
+ hypervisor.cpuCapacity,
+ hypervisor.cpuDemand,
+ hypervisor.cpuUsage,
+ hypervisor.cpuUsage / _cpuLimit
+ )
+ }
+
+ override fun getCpuStats(server: Server): GuestCpuStats {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ return guest.getCpuStats()
+ }
+
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean {
@@ -417,13 +482,12 @@ public class SimHost(
* Helper function to track the CPU time of a machine.
*/
private fun collectCpuTime(result: ObservableLongMeasurement) {
- val counters = hypervisor.counters
- counters.flush()
+ val stats = getCpuStats()
- result.record(counters.cpuActiveTime / 1000L, _activeState)
- result.record(counters.cpuIdleTime / 1000L, _idleState)
- result.record(counters.cpuStealTime / 1000L, _stealState)
- result.record(counters.cpuLostTime / 1000L, _lostState)
+ result.record(stats.activeTime, _activeState)
+ result.record(stats.idleTime, _idleState)
+ result.record(stats.stealTime, _stealState)
+ result.record(stats.lostTime, _lostState)
val guests = _guests
for (i in guests.indices) {
@@ -450,7 +514,7 @@ public class SimHost(
val guests = _guests
for (i in guests.indices) {
- guests[i].updateUptime(duration)
+ guests[i].updateUptime()
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index bb378ee3..0d4c550d 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -32,6 +32,8 @@ import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.driver.telemetry.GuestCpuStats
+import org.opendc.compute.service.driver.telemetry.GuestSystemStats
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimWorkloadMapper
import org.opendc.simulator.compute.kernel.SimHypervisor
@@ -39,6 +41,8 @@ import org.opendc.simulator.compute.kernel.SimVirtualMachine
import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Clock
+import java.time.Duration
+import java.time.Instant
import kotlin.coroutines.CoroutineContext
/**
@@ -146,6 +150,37 @@ internal class Guest(
}
/**
+ * Obtain the system statistics of this guest.
+ */
+ fun getSystemStats(): GuestSystemStats {
+ updateUptime()
+
+ return GuestSystemStats(
+ Duration.ofMillis(_uptime),
+ Duration.ofMillis(_downtime),
+ Instant.ofEpochMilli(_bootTime)
+ )
+ }
+
+ /**
+ * Obtain the CPU statistics of this guest.
+ */
+ fun getCpuStats(): GuestCpuStats {
+ val counters = machine.counters
+ counters.flush()
+
+ return GuestCpuStats(
+ counters.cpuActiveTime / 1000L,
+ counters.cpuIdleTime / 1000L,
+ counters.cpuStealTime / 1000L,
+ counters.cpuLostTime / 1000L,
+ machine.cpuCapacity,
+ machine.cpuUsage,
+ machine.cpuUsage / _cpuLimit
+ )
+ }
+
+ /**
* The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
*/
private var job: Job? = null
@@ -209,6 +244,8 @@ internal class Guest(
* This method is invoked when the guest stopped.
*/
private fun onStop(target: ServerState) {
+ updateUptime()
+
state = target
listener.onStop(this)
}
@@ -224,10 +261,16 @@ internal class Guest(
.put(STATE_KEY, "down")
.build()
+ private var _lastReport = clock.millis()
+
/**
* Helper function to track the uptime and downtime of the guest.
*/
- fun updateUptime(duration: Long) {
+ fun updateUptime() {
+ val now = clock.millis()
+ val duration = now - _lastReport
+ _lastReport = now
+
if (state == ServerState.RUNNING) {
_uptime += duration
} else if (state == ServerState.ERROR) {
@@ -239,6 +282,8 @@ internal class Guest(
* Helper function to track the uptime of the guest.
*/
fun collectUptime(result: ObservableLongMeasurement) {
+ updateUptime()
+
result.record(_uptime, _upState)
result.record(_downtime, _downState)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index f0325023..fd54ad1d 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -22,8 +22,7 @@
package org.opendc.compute.simulator
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -42,13 +41,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.telemetry.compute.ComputeMetricExporter
-import org.opendc.telemetry.compute.HOST_ID
-import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.telemetry.compute.table.ServerTableReader
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
-import org.opendc.telemetry.sdk.toOtelClock
-import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.coroutines.resume
@@ -73,45 +66,16 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var idleTime = 0L
- var activeTime = 0L
- var stealTime = 0L
-
- val hostId = UUID.randomUUID()
- val hostResource = Resource.builder()
- .put(HOST_ID, hostId.toString())
- .build()
-
- // Setup metric reader
val duration = 5 * 60L
- val reader = CoroutineMetricReader(
- this,
- object : ComputeMetricExporter() {
- override fun record(reader: HostTableReader) {
- activeTime += reader.cpuActiveTime
- idleTime += reader.cpuIdleTime
- stealTime += reader.cpuStealTime
- }
- },
- exportInterval = Duration.ofSeconds(duration)
- )
-
- val meterProvider = SdkMeterProvider
- .builder()
- .setResource(hostResource)
- .setClock(clock.toOtelClock())
- .registerMetricReader(reader)
- .build()
-
val engine = FlowEngine(coroutineContext, clock)
- val virtDriver = SimHost(
- uid = hostId,
+ val host = SimHost(
+ uid = UUID.randomUUID(),
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
engine,
- meterProvider,
+ MeterProvider.noop(),
SimFairShareHypervisorProvider()
)
val vmImageA = MockImage(
@@ -150,11 +114,11 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
coroutineScope {
- launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
- launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
+ launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
+ launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
suspendCancellableCoroutine<Unit> { cont ->
- virtDriver.addListener(object : HostListener {
+ host.addListener(object : HostListener {
private var finished = 0
override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
@@ -168,13 +132,14 @@ internal class SimHostTest {
// Ensure last cycle is collected
delay(1000L * duration)
- virtDriver.close()
- meterProvider.close()
+ host.close()
+
+ val cpuStats = host.getCpuStats()
assertAll(
- { assertEquals(658, activeTime, "Active time does not match") },
- { assertEquals(2341, idleTime, "Idle time does not match") },
- { assertEquals(637, stealTime, "Steal time does not match") },
+ { assertEquals(658, cpuStats.activeTime, "Active time does not match") },
+ { assertEquals(2341, cpuStats.idleTime, "Idle time does not match") },
+ { assertEquals(637, cpuStats.stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -184,54 +149,16 @@ internal class SimHostTest {
*/
@Test
fun testFailure() = runBlockingSimulation {
- var activeTime = 0L
- var idleTime = 0L
- var uptime = 0L
- var downtime = 0L
- var guestUptime = 0L
- var guestDowntime = 0L
-
- val hostId = UUID.randomUUID()
- val hostResource = Resource.builder()
- .put(HOST_ID, hostId.toString())
- .build()
-
- // Setup metric reader
val duration = 5 * 60L
- val reader = CoroutineMetricReader(
- this,
- object : ComputeMetricExporter() {
- override fun record(reader: HostTableReader) {
- activeTime += reader.cpuActiveTime
- idleTime += reader.cpuIdleTime
- uptime += reader.uptime
- downtime += reader.downtime
- }
-
- override fun record(reader: ServerTableReader) {
- guestUptime += reader.uptime
- guestDowntime += reader.downtime
- }
- },
- exportInterval = Duration.ofSeconds(duration)
- )
-
- val meterProvider = SdkMeterProvider
- .builder()
- .setResource(hostResource)
- .setClock(clock.toOtelClock())
- .registerMetricReader(reader)
- .build()
-
val engine = FlowEngine(coroutineContext, clock)
val host = SimHost(
- uid = hostId,
+ uid = UUID.randomUUID(),
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
engine,
- meterProvider,
+ MeterProvider.noop(),
SimFairShareHypervisorProvider()
)
val image = MockImage(
@@ -275,15 +202,17 @@ internal class SimHostTest {
// Ensure last cycle is collected
delay(1000L * duration)
- meterProvider.close()
+ val cpuStats = host.getCpuStats()
+ val sysStats = host.getSystemStats()
+ val guestSysStats = host.getSystemStats(server)
assertAll(
- { assertEquals(1775, idleTime, "Idle time does not match") },
- { assertEquals(624, activeTime, "Active time does not match") },
- { assertEquals(900001, uptime, "Uptime does not match") },
- { assertEquals(300000, downtime, "Downtime does not match") },
- { assertEquals(900000, guestUptime, "Guest uptime does not match") },
- { assertEquals(300000, guestDowntime, "Guest downtime does not match") },
+ { assertEquals(1775, cpuStats.idleTime, "Idle time does not match") },
+ { assertEquals(624, cpuStats.activeTime, "Active time does not match") },
+ { assertEquals(900001, sysStats.uptime.toMillis(), "Uptime does not match") },
+ { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") },
+ { assertEquals(900001, guestSysStats.uptime.toMillis(), "Guest uptime does not match") },
+ { assertEquals(300000, guestSysStats.downtime.toMillis(), "Guest downtime does not match") },
)
}
@@ -332,6 +261,8 @@ internal class SimHostTest {
override val state: ServerState = ServerState.TERMINATED
+ override val launchedAt: Instant? = null
+
override suspend fun start() {}
override suspend fun stop() {}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index 4b0b343f..21cfdad2 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -26,6 +26,7 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
+import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
@@ -81,9 +82,19 @@ public class ComputeServiceHelper(
}
/**
- * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace].
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
+ *
+ * @param trace The trace to simulate.
+ * @param seed The seed for the simulation.
+ * @param servers A list to which the created servers is added.
+ * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
*/
- public suspend fun run(trace: List<VirtualMachine>, seed: Long, submitImmediately: Boolean = false) {
+ public suspend fun run(
+ trace: List<VirtualMachine>,
+ seed: Long,
+ servers: MutableList<Server>? = null,
+ submitImmediately: Boolean = false
+ ) {
val random = Random(seed)
val injector = failureModel?.createInjector(context, clock, service, random)
val client = service.newClient()
@@ -129,12 +140,14 @@ public class ComputeServiceHelper(
meta = mapOf("workload" to workload)
)
+ servers?.add(server)
+
// Wait for the server reach its end time
val endTime = entry.stopTime.toEpochMilli()
delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
- // Delete the server after reaching the end-time of the virtual machine
- server.delete()
+ // Stop the server after reaching the end-time of the virtual machine
+ server.stop()
}
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt
index a46885f4..6c515118 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt
@@ -22,8 +22,6 @@
package org.opendc.compute.workload.export.parquet
-import io.opentelemetry.sdk.common.CompletableResultCode
-import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.telemetry.compute.table.ServerTableReader
@@ -33,7 +31,7 @@ import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() {
+public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
private val serverWriter = ParquetServerDataWriter(
File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
@@ -61,11 +59,9 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS
serviceWriter.write(reader)
}
- override fun shutdown(): CompletableResultCode {
+ override fun close() {
hostWriter.close()
serviceWriter.close()
serverWriter.close()
-
- return CompletableResultCode.ofSuccess()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 0bbf1443..6fd85e8c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -23,12 +23,14 @@
package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
+import kotlinx.coroutines.*
+import org.opendc.compute.api.Server
import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
-import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
+import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor
import org.opendc.compute.workload.grid5000
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -37,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMetricReader
import java.io.File
import java.time.Duration
import java.util.*
@@ -97,7 +99,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
else
null
val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder)
- val telemetry = SdkTelemetryManager(clock)
+ val telemetry = NoopTelemetryManager()
val runner = ComputeServiceHelper(
coroutineContext,
clock,
@@ -107,23 +109,35 @@ abstract class Portfolio(name: String) : Experiment(name) {
interferenceModel?.withSeed(repeat.toLong())
)
- val exporter = ParquetComputeMetricExporter(
- File(config.getString("output-path")),
- "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
- 4096
- )
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
-
val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
+ val servers = mutableListOf<Server>()
+ val exporter = ComputeMetricReader(
+ this,
+ clock,
+ runner.service,
+ servers,
+ ParquetComputeMonitor(
+ File(config.getString("output-path")),
+ "portfolio_id=$name/scenario_id=$id/run_id=$repeat",
+ bufferSize = 4096
+ ),
+ exportInterval = Duration.ofMinutes(5)
+ )
try {
// Instantiate the desired topology
runner.apply(topology)
- // Run the workload trace
- runner.run(vms, seeder.nextLong())
+ coroutineScope {
+ // Run the workload trace
+ runner.run(vms, seeder.nextLong(), servers)
+
+ // Stop the metric collection
+ exporter.close()
+ }
} finally {
runner.close()
+ exporter.close()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 01b2a8fe..62cdf123 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.compute.api.Server
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMetricReader
+import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.telemetry.compute.table.ServiceData
-import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
-import java.time.Instant
import java.util.*
/**
@@ -54,7 +52,7 @@ class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var exporter: TestComputeMetricExporter
+ private lateinit var monitor: TestComputeMonitor
/**
* The [FilterScheduler] to use for all experiments.
@@ -67,11 +65,11 @@ class CapelinIntegrationTest {
private lateinit var workloadLoader: ComputeWorkloadLoader
/**
- * Setup the experimental environment.
+ * Set up the experimental environment.
*/
@BeforeEach
fun setUp() {
- exporter = TestComputeMetricExporter()
+ monitor = TestComputeMonitor()
computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -85,22 +83,22 @@ class CapelinIntegrationTest {
@Test
fun testLarge() = runBlockingSimulation {
val (workload, _) = createTestWorkload(1.0)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology()
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, 0)
+ runner.run(workload, 0, servers)
- val serviceMetrics = exporter.serviceMetrics
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -116,15 +114,15 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
}
@@ -135,41 +133,41 @@ class CapelinIntegrationTest {
fun testSmall() = runBlockingSimulation {
val seed = 1
val (workload, _) = createTestWorkload(0.25, seed)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, seed.toLong())
+ runner.run(workload, seed.toLong(), servers)
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
+ { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -181,41 +179,41 @@ class CapelinIntegrationTest {
val seed = 0
val (workload, interferenceModel) = createTestWorkload(1.0, seed)
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
interferenceModel = interferenceModel?.withSeed(seed.toLong())
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -225,43 +223,43 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
val (workload, _) = createTestWorkload(0.25, seed)
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } }
+ { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -281,8 +279,7 @@ class CapelinIntegrationTest {
return stream.use { clusterTopology(stream) }
}
- class TestComputeMetricExporter : ComputeMetricExporter() {
- var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0)
+ class TestComputeMonitor : ComputeMonitor {
var idleTime = 0L
var activeTime = 0L
var stealTime = 0L
@@ -290,19 +287,6 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
- override fun record(reader: ServiceTableReader) {
- serviceMetrics = ServiceData(
- reader.timestamp,
- reader.hostsUp,
- reader.hostsDown,
- reader.serversPending,
- reader.serversActive,
- reader.attemptsSuccess,
- reader.attemptsFailure,
- reader.attemptsError
- )
- }
-
override fun record(reader: HostTableReader) {
idleTime += reader.cpuIdleTime
activeTime += reader.cpuActiveTime
diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
index 43093abf..5762ce64 100644
--- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
@@ -36,8 +36,7 @@ dependencies {
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
- implementation(libs.jackson.module.kotlin) {
- exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
- }
- implementation("org.jetbrains.kotlin:kotlin-reflect:1.6.10")
+ implementation(libs.jackson.module.kotlin)
+
+ testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 5245261c..99948c8e 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.tf20.core
-import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.*
import org.opendc.simulator.compute.SimBareMetalMachine
@@ -52,7 +51,7 @@ public class SimTFDevice(
context: CoroutineContext,
clock: Clock,
meter: Meter,
- private val pu: ProcessingUnit,
+ pu: ProcessingUnit,
private val memory: MemoryUnit,
powerModel: PowerModel
) : TFDevice {
@@ -70,17 +69,13 @@ public class SimTFDevice(
)
/**
- * The identifier of a device.
- */
- private val deviceId = AttributeKey.stringKey("device.id")
-
- /**
* The usage of the device.
*/
private val _usage = meter.histogramBuilder("device.usage")
.setDescription("The amount of device resources used")
.setUnit("MHz")
.build()
+ private var _resourceUsage = 0.0
/**
* The power draw of the device.
@@ -89,6 +84,8 @@ public class SimTFDevice(
.setDescription("The power draw of the device")
.setUnit("W")
.build()
+ private var _powerUsage = 0.0
+ private var _energyUsage = 0.0
/**
* The workload that will be run by the device.
@@ -175,7 +172,10 @@ public class SimTFDevice(
override fun onConverge(conn: FlowConnection, now: Long) {
_usage.record(conn.rate)
+ _resourceUsage = conn.rate
_power.record(machine.psu.powerDraw)
+ _powerUsage = machine.powerUsage
+ _energyUsage = machine.energyUsage
}
}
@@ -197,6 +197,10 @@ public class SimTFDevice(
}
}
+ override fun getDeviceStats(): TFDeviceStats {
+ return TFDeviceStats(_resourceUsage, _powerUsage, _energyUsage)
+ }
+
override fun close() {
machine.cancel()
scope.cancel()
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt
index bbc34ed9..839ed8a9 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDevice.kt
@@ -47,4 +47,9 @@ public interface TFDevice : AutoCloseable {
* Perform [flops] amount of computation on the device.
*/
public suspend fun compute(flops: Double)
+
+ /**
+ * Collect device statistics.
+ */
+ public fun getDeviceStats(): TFDeviceStats
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt
new file mode 100644
index 00000000..016d2a8b
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/TFDeviceStats.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.tf20.core
+
+/**
+ * Statistics about a TensorFlow [TFDevice].
+ *
+ * @property resourceUsage The resource usage of the device (in MHz).
+ * @property powerUsage The instantaneous power draw of the device (in W).
+ * @property energyUsage Cumulative energy usage of the device since boot (in J).
+ */
+data class TFDeviceStats(
+ val resourceUsage: Double,
+ val powerUsage: Double,
+ val energyUsage: Double
+)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
index 28a2a319..0d5fbebb 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
@@ -25,6 +25,7 @@ package org.opendc.experiments.tf20.core
import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.compute.model.MemoryUnit
@@ -57,6 +58,12 @@ internal class SimTFDeviceTest {
launch { device.compute(1e6) }
launch { device.compute(2e6) }
}
- assertEquals(3681, clock.millis())
+
+ val stats = device.getDeviceStats()
+
+ assertAll(
+ { assertEquals(3681, clock.millis()) },
+ { assertEquals(325.75, stats.energyUsage) }
+ )
}
}
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts
index c54595d3..1803ae69 100644
--- a/opendc-faas/opendc-faas-service/build.gradle.kts
+++ b/opendc-faas/opendc-faas-service/build.gradle.kts
@@ -30,6 +30,7 @@ plugins {
dependencies {
api(projects.opendcFaas.opendcFaasApi)
api(projects.opendcTelemetry.opendcTelemetryApi)
+ api(libs.commons.math3)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
implementation(libs.opentelemetry.semconv)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 1d5331cb..f7dc3c1f 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -25,10 +25,13 @@ package org.opendc.faas.service
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.faas.api.FaaSClient
+import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
import org.opendc.faas.service.deployer.FunctionDeployer
import org.opendc.faas.service.internal.FaaSServiceImpl
import org.opendc.faas.service.router.RoutingPolicy
+import org.opendc.faas.service.telemetry.FunctionStats
+import org.opendc.faas.service.telemetry.SchedulerStats
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -42,6 +45,16 @@ public interface FaaSService : AutoCloseable {
public fun newClient(): FaaSClient
/**
+ * Collect statistics about the scheduler of the service.
+ */
+ public fun getSchedulerStats(): SchedulerStats
+
+ /**
+ * Collect statistics about the specified [function].
+ */
+ public fun getFunctionStats(function: FaaSFunction): FunctionStats
+
+ /**
* Terminate the lifecycle of the FaaS service, stopping all running function instances.
*/
public override fun close()
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index 836231c8..52fcffa1 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -29,7 +29,9 @@ import io.opentelemetry.api.metrics.LongHistogram
import io.opentelemetry.api.metrics.LongUpDownCounter
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics
import org.opendc.faas.service.deployer.FunctionInstance
+import org.opendc.faas.service.telemetry.FunctionStats
import java.util.*
/**
@@ -46,7 +48,7 @@ public class FunctionObject(
/**
* The attributes of this function.
*/
- public val attributes: Attributes = Attributes.builder()
+ private val attributes: Attributes = Attributes.builder()
.put(ResourceAttributes.FAAS_ID, uid.toString())
.put(ResourceAttributes.FAAS_NAME, name)
.put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory)
@@ -56,68 +58,78 @@ public class FunctionObject(
/**
* The total amount of function invocations received by the function.
*/
- public val invocations: LongCounter = meter.counterBuilder("function.invocations.total")
+ private val invocations: LongCounter = meter.counterBuilder("function.invocations.total")
.setDescription("Number of function invocations")
.setUnit("1")
.build()
+ private var _invocations = 0L
/**
* The amount of function invocations that could be handled directly.
*/
- public val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm")
+ private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm")
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
+ private var _timelyInvocations = 0L
/**
* The amount of function invocations that were delayed due to function deployment.
*/
- public val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold")
+ private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold")
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
+ private var _delayedInvocations = 0L
/**
* The amount of function invocations that failed.
*/
- public val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed")
+ private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed")
.setDescription("Number of function invocations that failed")
.setUnit("1")
.build()
+ private var _failedInvocations = 0L
/**
* The amount of instances for this function.
*/
- public val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
+ private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
.setDescription("Number of active function instances")
.setUnit("1")
.build()
+ private var _activeInstances = 0
/**
* The amount of idle instances for this function.
*/
- public val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
+ private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
.setDescription("Number of idle function instances")
.setUnit("1")
.build()
+ private var _idleInstances = 0
/**
* The time that the function waited.
*/
- public val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait")
+ private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait")
.ofLongs()
.setDescription("Time the function has to wait before being started")
.setUnit("ms")
.build()
+ private val _waitTime = DescriptiveStatistics()
+ .apply { windowSize = 100 }
/**
* The time that the function was running.
*/
- public val activeTime: LongHistogram = meter.histogramBuilder("function.time.active")
+ private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active")
.ofLongs()
.setDescription("Time the function was running")
.setUnit("ms")
.build()
+ private val _activeTime = DescriptiveStatistics()
+ .apply { windowSize = 100 }
/**
* The instances associated with this function.
@@ -134,6 +146,80 @@ public class FunctionObject(
public val meta: MutableMap<String, Any> = meta.toMutableMap()
+ /**
+ * Report a scheduled invocation.
+ */
+ internal fun reportSubmission() {
+ invocations.add(1, attributes)
+ _invocations++
+ }
+
+ /**
+ * Report the deployment of an invocation.
+ */
+ internal fun reportDeployment(isDelayed: Boolean) {
+ if (isDelayed) {
+ delayedInvocations.add(1, attributes)
+ _delayedInvocations++
+
+ idleInstances.add(1, attributes)
+ _idleInstances++
+ } else {
+ timelyInvocations.add(1, attributes)
+ _timelyInvocations++
+ }
+ }
+
+ /**
+ * Report the start of a function invocation.
+ */
+ internal fun reportStart(start: Long, submitTime: Long) {
+ val wait = start - submitTime
+ waitTime.record(wait, attributes)
+ _waitTime.addValue(wait.toDouble())
+
+ idleInstances.add(-1, attributes)
+ _idleInstances--
+ activeInstances.add(1, attributes)
+ _activeInstances++
+ }
+
+ /**
+ * Report the failure of a function invocation.
+ */
+ internal fun reportFailure() {
+ failedInvocations.add(1, attributes)
+ _failedInvocations++
+ }
+
+ /**
+ * Report the end of a function invocation.
+ */
+ internal fun reportEnd(duration: Long) {
+ activeTime.record(duration, attributes)
+ _activeTime.addValue(duration.toDouble())
+ idleInstances.add(1, attributes)
+ _idleInstances++
+ activeInstances.add(-1, attributes)
+ _activeInstances--
+ }
+
+ /**
+ * Collect the statistics of this function.
+ */
+ internal fun getStats(): FunctionStats {
+ return FunctionStats(
+ _invocations,
+ _timelyInvocations,
+ _delayedInvocations,
+ _failedInvocations,
+ _activeInstances,
+ _idleInstances,
+ _waitTime.copy(),
+ _activeTime.copy()
+ )
+ }
+
override fun close() {
instances.forEach(FunctionInstance::close)
instances.clear()
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 1526be9d..ce3b2b98 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -38,6 +38,8 @@ import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceListener
import org.opendc.faas.service.deployer.FunctionInstanceState
import org.opendc.faas.service.router.RoutingPolicy
+import org.opendc.faas.service.telemetry.FunctionStats
+import org.opendc.faas.service.telemetry.SchedulerStats
import java.lang.IllegalStateException
import java.time.Clock
import java.util.*
@@ -103,6 +105,7 @@ internal class FaaSServiceImpl(
.setDescription("Number of function invocations")
.setUnit("1")
.build()
+ private var totalInvocations = 0L
/**
* The amount of function invocations that could be handled directly.
@@ -111,6 +114,7 @@ internal class FaaSServiceImpl(
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
+ private var timelyInvocations = 0L
/**
* The amount of function invocations that were delayed due to function deployment.
@@ -119,6 +123,7 @@ internal class FaaSServiceImpl(
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
+ private var delayedInvocations = 0L
override fun newClient(): FaaSClient {
return object : FaaSClient {
@@ -187,6 +192,15 @@ internal class FaaSServiceImpl(
}
}
+ override fun getSchedulerStats(): SchedulerStats {
+ return SchedulerStats(totalInvocations, timelyInvocations, delayedInvocations)
+ }
+
+ override fun getFunctionStats(function: FaaSFunction): FunctionStats {
+ val func = requireNotNull(functions[function.uid]) { "Unknown function" }
+ return func.getStats()
+ }
+
/**
* Indicate that a new scheduling cycle is needed due to a change to the service's state.
*/
@@ -219,7 +233,8 @@ internal class FaaSServiceImpl(
val instance = if (activeInstance != null) {
_timelyInvocations.add(1)
- function.timelyInvocations.add(1, function.attributes)
+ timelyInvocations++
+ function.reportDeployment(isDelayed = false)
activeInstance
} else {
@@ -227,29 +242,24 @@ internal class FaaSServiceImpl(
instances.add(instance)
terminationPolicy.enqueue(instance)
- function.idleInstances.add(1, function.attributes)
-
_delayedInvocations.add(1)
- function.delayedInvocations.add(1, function.attributes)
+ delayedInvocations++
+ function.reportDeployment(isDelayed = true)
instance
}
suspend {
val start = clock.millis()
- function.waitTime.record(start - submitTime, function.attributes)
- function.idleInstances.add(-1, function.attributes)
- function.activeInstances.add(1, function.attributes)
+ function.reportStart(start, submitTime)
try {
instance.invoke()
} catch (e: Throwable) {
logger.debug(e) { "Function invocation failed" }
- function.failedInvocations.add(1, function.attributes)
+ function.reportFailure()
} finally {
val end = clock.millis()
- function.activeTime.record(end - start, function.attributes)
- function.idleInstances.add(1, function.attributes)
- function.activeInstances.add(-1, function.attributes)
+ function.reportEnd(end - start)
}
}.startCoroutineCancellable(cont)
}
@@ -262,7 +272,8 @@ internal class FaaSServiceImpl(
check(function.uid in functions) { "Function does not exist (anymore)" }
_invocations.add(1)
- function.invocations.add(1, function.attributes)
+ totalInvocations++
+ function.reportSubmission()
return suspendCancellableCoroutine { cont ->
if (!queue.add(InvocationRequest(clock.millis(), function, cont))) {
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt
new file mode 100644
index 00000000..497ee423
--- /dev/null
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.faas.service.telemetry
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics
+
+/**
+ * Statistics about function invocations.
+ *
+ * @property totalInvocations The number of function invocations.
+ * @property timelyInvocations The number of function invocations that could be handled directly.
+ * @property delayedInvocations The number of function invocations that are delayed (cold starts).
+ * @property failedInvocations The number of function invocations that failed.
+ * @property activeInstances The number of active function instances.
+ * @property idleInstances The number of idle function instances.
+ * @property waitTime Statistics about the wait time of a function invocation.
+ * @property activeTime Statistics about the runtime of a function invocation.
+ */
+public data class FunctionStats(
+ val totalInvocations: Long,
+ val timelyInvocations: Long,
+ val delayedInvocations: Long,
+ val failedInvocations: Long,
+ val activeInstances: Int,
+ val idleInstances: Int,
+ val waitTime: DescriptiveStatistics,
+ val activeTime: DescriptiveStatistics
+)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt
new file mode 100644
index 00000000..cabb1d56
--- /dev/null
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.faas.service.telemetry
+
+/**
+ * Statistics reported by the FaaS scheduler.
+ *
+ * @property totalInvocations The total amount of function invocations received by the scheduler.
+ * @property timelyInvocations The amount of function invocations that could be handled directly.
+ * @property delayedInvocations The amount of function invocations that were delayed due to function deployment.
+ */
+public data class SchedulerStats(
+ val totalInvocations: Long,
+ val timelyInvocations: Long,
+ val delayedInvocations: Long
+)
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 68233c1a..a3d0d34e 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -123,7 +123,6 @@ public class SimFunctionDeployer(
/**
* Start the function instance.
*/
- @OptIn(InternalCoroutinesApi::class)
internal fun start() {
check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" }
job = scope.launch {
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 0dc9ba87..792a8584 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -28,22 +28,25 @@ import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.faas.service.FaaSService
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicyFixed
import org.opendc.faas.service.router.RandomRoutingPolicy
-import org.opendc.faas.simulator.delay.ZeroDelayInjector
+import org.opendc.faas.simulator.delay.ColdStartModel
+import org.opendc.faas.simulator.delay.StochasticDelayInjector
import org.opendc.faas.simulator.workload.SimFaaSWorkload
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import java.time.Duration
+import java.util.*
/**
* A test suite for the [FaaSService] implementation under simulated conditions.
@@ -65,10 +68,15 @@ internal class SimFaaSServiceTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) {
- override suspend fun invoke() {}
+ val random = Random(0)
+ val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000) {
+ override suspend fun invoke() {
+ delay(random.nextInt(1000).toLong())
+ }
})
- val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload }
+
+ val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random)
+ val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload }
val service = FaaSService(
coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(),
FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
@@ -84,8 +92,15 @@ internal class SimFaaSServiceTest {
yield()
+ val funcStats = service.getFunctionStats(function)
+
assertAll(
{ coVerify { workload.invoke() } },
+ { assertEquals(1, funcStats.totalInvocations) },
+ { assertEquals(1, funcStats.delayedInvocations) },
+ { assertEquals(0, funcStats.failedInvocations) },
+ { assertEquals(100.0, funcStats.waitTime.mean) },
+ { assertEquals(1285.0, funcStats.activeTime.mean) },
)
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
index 47e30a14..b476a669 100644
--- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -30,6 +30,7 @@ plugins {
dependencies {
api(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(projects.opendcCompute.opendcComputeService)
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt
new file mode 100644
index 00000000..593203fc
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt
@@ -0,0 +1,424 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.driver.Host
+import org.opendc.telemetry.compute.table.*
+import java.time.Clock
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
+ * export interval.
+ *
+ * @param scope The [CoroutineScope] to run the reader in.
+ * @param clock The virtual clock.
+ * @param service The [ComputeService] to monitor.
+ * @param servers The [Server]s to monitor.
+ * @param monitor The monitor to export the metrics to.
+ * @param exportInterval The export interval.
+ */
+public class ComputeMetricReader(
+ scope: CoroutineScope,
+ clock: Clock,
+ private val service: ComputeService,
+ private val servers: List<Server>,
+ private val monitor: ComputeMonitor,
+ private val exportInterval: Duration = Duration.ofMinutes(5)
+) : AutoCloseable {
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * Aggregator for service metrics.
+ */
+ private val serviceTableReader = ServiceTableReaderImpl(service)
+
+ /**
+ * Mapping from [Host] instances to [HostTableReaderImpl]
+ */
+ private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>()
+
+ /**
+ * Mapping from [Server] instances to [ServerTableReaderImpl]
+ */
+ private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>()
+
+ /**
+ * The background job that is responsible for collecting the metrics every cycle.
+ */
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
+
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ try {
+ val now = clock.instant()
+
+ for (host in service.hosts) {
+ val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
+ reader.record(now)
+ monitor.record(reader)
+ reader.reset()
+ }
+
+ for (server in servers) {
+ val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
+ reader.record(now)
+ monitor.record(reader)
+ reader.reset()
+ }
+
+ serviceTableReader.record(now)
+ monitor.record(serviceTableReader)
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
+ }
+ }
+ } finally {
+ if (monitor is AutoCloseable) {
+ monitor.close()
+ }
+ }
+ }
+
+ override fun close() {
+ job.cancel()
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val hostsUp: Int
+ get() = _hostsUp
+ private var _hostsUp = 0
+
+ override val hostsDown: Int
+ get() = _hostsDown
+ private var _hostsDown = 0
+
+ override val serversPending: Int
+ get() = _serversPending
+ private var _serversPending = 0
+
+ override val serversActive: Int
+ get() = _serversActive
+ private var _serversActive = 0
+
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ private var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ private var _attemptsFailure = 0
+
+ override val attemptsError: Int
+ get() = _attemptsError
+ private var _attemptsError = 0
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ _timestamp = now
+
+ val stats = service.getSchedulerStats()
+ _hostsUp = stats.hostsAvailable
+ _hostsDown = stats.hostsUnavailable
+ _serversPending = stats.serversPending
+ _serversActive = stats.serversActive
+ _attemptsSuccess = stats.attemptsSuccess.toInt()
+ _attemptsFailure = stats.attemptsFailure.toInt()
+ _attemptsError = stats.attemptsError.toInt()
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ private class HostTableReaderImpl(host: Host) : HostTableReader {
+ private val _host = host
+
+ override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity)
+
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ private var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ private var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ private var _guestsError = 0
+
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ private var _guestsInvalid = 0
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ private var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ private var _cpuDemand = 0.0
+
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ private var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ override val powerUsage: Double
+ get() = _powerUsage
+ private var _powerUsage = 0.0
+
+ override val powerTotal: Double
+ get() = _powerTotal - previousPowerTotal
+ private var _powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime = 0L
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime = 0L
+ private var previousDowntime = 0L
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val hostCpuStats = _host.getCpuStats()
+ val hostSysStats = _host.getSystemStats()
+
+ _timestamp = now
+ _guestsTerminated = hostSysStats.guestsTerminated
+ _guestsRunning = hostSysStats.guestsRunning
+ _guestsError = hostSysStats.guestsError
+ _guestsInvalid = hostSysStats.guestsInvalid
+ _cpuLimit = hostCpuStats.capacity
+ _cpuDemand = hostCpuStats.demand
+ _cpuUsage = hostCpuStats.usage
+ _cpuUtilization = hostCpuStats.utilization
+ _cpuActiveTime = hostCpuStats.activeTime
+ _cpuIdleTime = hostCpuStats.idleTime
+ _cpuStealTime = hostCpuStats.stealTime
+ _cpuLostTime = hostCpuStats.lostTime
+ _powerUsage = hostSysStats.powerUsage
+ _powerTotal = hostSysStats.energyUsage
+ _uptime = hostSysStats.uptime.toMillis()
+ _downtime = hostSysStats.downtime.toMillis()
+ _bootTime = hostSysStats.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousPowerTotal = _powerTotal
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerUsage = 0.0
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader {
+ private val _server = server
+
+ /**
+ * The static information about this server.
+ */
+ override val server = ServerInfo(
+ server.uid.toString(),
+ server.name,
+ "vm",
+ "x86",
+ server.image.uid.toString(),
+ server.image.name,
+ server.flavor.cpuCount,
+ server.flavor.memorySize
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ override var host: HostInfo? = null
+ private var _host: Host? = null
+
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ private var _uptime: Long = 0
+ private var previousUptime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ private var _downtime: Long = 0
+ private var previousDowntime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ private var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ private var _bootTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ private var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ private var _cpuActiveTime = 0L
+ private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ private var _cpuIdleTime = 0L
+ private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ private var _cpuStealTime = 0L
+ private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ private var _cpuLostTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Record the next cycle.
+ */
+ fun record(now: Instant) {
+ val newHost = service.lookupHost(_server)
+ if (newHost != null && newHost.uid != _host?.uid) {
+ _host = newHost
+ host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity)
+ }
+
+ val cpuStats = _host?.getCpuStats(_server)
+ val sysStats = _host?.getSystemStats(_server)
+
+ _timestamp = now
+ _cpuLimit = cpuStats?.capacity ?: 0.0
+ _cpuActiveTime = cpuStats?.activeTime ?: 0
+ _cpuIdleTime = cpuStats?.idleTime ?: 0
+ _cpuStealTime = cpuStats?.stealTime ?: 0
+ _cpuLostTime = cpuStats?.lostTime ?: 0
+ _uptime = sysStats?.uptime?.toMillis() ?: 0
+ _downtime = sysStats?.downtime?.toMillis() ?: 0
+ _provisionTime = _server.launchedAt
+ _bootTime = sysStats?.bootTime
+ }
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+
+ _host = null
+ _cpuLimit = 0.0
+ }
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index a9290c47..ca5da079 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -31,7 +31,6 @@ import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.time.Duration
-import java.util.*
/**
* A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index a150de4e..7c0c43ed 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -23,8 +23,9 @@
package org.opendc.web.runner
import mu.KotlinLogging
+import org.opendc.compute.api.Server
import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
@@ -35,13 +36,12 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.compute.ComputeMetricReader
import org.opendc.web.client.runner.OpenDCRunnerClient
import org.opendc.web.proto.runner.Job
import org.opendc.web.proto.runner.Scenario
import org.opendc.web.runner.internal.JobManager
-import org.opendc.web.runner.internal.WebComputeMetricExporter
+import org.opendc.web.runner.internal.WebComputeMonitor
import java.io.File
import java.time.Duration
import java.util.*
@@ -180,9 +180,9 @@ public class OpenDCRunner(
private val scenario: Scenario,
private val repeat: Int,
private val topology: Topology,
- ) : RecursiveTask<WebComputeMetricExporter.Results>() {
- override fun compute(): WebComputeMetricExporter.Results {
- val exporter = WebComputeMetricExporter()
+ ) : RecursiveTask<WebComputeMonitor.Results>() {
+ override fun compute(): WebComputeMonitor.Results {
+ val monitor = WebComputeMonitor()
// Schedule task that interrupts the simulation if it runs for too long.
val currentThread = Thread.currentThread()
@@ -206,25 +206,24 @@ public class OpenDCRunner(
else
null
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
failureModel,
interferenceModel.takeIf { phenomena.interference }
)
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
// Instantiate the topology onto the simulator
simulator.apply(topology)
// Run workload trace
- simulator.run(vms, seeder.nextLong())
+ simulator.run(vms, seeder.nextLong(), servers)
- val serviceMetrics = collectServiceMetrics(telemetry.metricProducer)
+ val serviceMetrics = simulator.service.getSchedulerStats()
logger.debug {
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -235,14 +234,14 @@ public class OpenDCRunner(
}
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
}
} finally {
interruptTask.cancel(false)
}
- return exporter.collectResults()
+ return monitor.collectResults()
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
index 8de0cee4..99b8aaf1 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
@@ -62,7 +62,7 @@ internal class JobManager(private val client: OpenDCRunnerClient) {
/**
* Persist the specified results.
*/
- fun finish(id: Long, results: List<WebComputeMetricExporter.Results>) {
+ fun finish(id: Long, results: List<WebComputeMonitor.Results>) {
client.jobs.update(
id,
Job.Update(
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
index 04437a5f..69350d8c 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt
@@ -22,7 +22,6 @@
package org.opendc.web.runner.internal
-import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.telemetry.compute.table.ServiceTableReader
@@ -32,7 +31,7 @@ import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-internal class WebComputeMetricExporter : ComputeMetricExporter() {
+internal class WebComputeMonitor : ComputeMonitor {
override fun record(reader: HostTableReader) {
val slices = reader.downtime / SLICE_LENGTH
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index ebace07d..b8bc0e33 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -30,6 +30,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import org.opendc.workflow.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import kotlin.coroutines.CoroutineContext
@@ -46,6 +47,11 @@ public interface WorkflowService : AutoCloseable {
public suspend fun invoke(job: Job)
/**
+ * Collect statistics about the workflow scheduler.
+ */
+ public fun getSchedulerStats(): SchedulerStats
+
+ /**
* Terminate the lifecycle of the workflow service, stopping all running workflows.
*/
public override fun close()
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index cdaec021..9c7f18a2 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -34,6 +34,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import org.opendc.workflow.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import java.util.*
@@ -145,6 +146,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of submitted jobs")
.setUnit("1")
.build()
+ private var _workflowsSubmitted: Int = 0
/**
* The number of jobs that are running.
@@ -153,6 +155,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of jobs running")
.setUnit("1")
.build()
+ private var _workflowsRunning: Int = 0
/**
* The number of jobs that have finished running.
@@ -161,6 +164,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of jobs that finished running")
.setUnit("1")
.build()
+ private var _workflowsFinished: Int = 0
/**
* The number of tasks that have been submitted to the service.
@@ -169,6 +173,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of submitted tasks")
.setUnit("1")
.build()
+ private var _tasksSubmitted: Int = 0
/**
* The number of jobs that are running.
@@ -177,6 +182,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of tasks running")
.setUnit("1")
.build()
+ private var _tasksRunning: Int = 0
/**
* The number of jobs that have finished running.
@@ -185,6 +191,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of tasks that finished running")
.setUnit("1")
.build()
+ private var _tasksFinished: Int = 0
/**
* The [Pacer] to use for scheduling the scheduler cycles.
@@ -223,16 +230,22 @@ public class WorkflowServiceImpl(
}
submittedTasks.add(1)
+ _tasksSubmitted++
}
instances.values.toCollection(jobInstance.tasks)
incomingJobs += jobInstance
rootListener.jobSubmitted(jobInstance)
submittedJobs.add(1)
+ _workflowsSubmitted++
pacer.enqueue()
}
+ override fun getSchedulerStats(): SchedulerStats {
+ return SchedulerStats(_workflowsSubmitted, _workflowsRunning, _workflowsFinished, _tasksSubmitted, _tasksRunning, _tasksFinished)
+ }
+
override fun close() {
scope.cancel()
}
@@ -271,6 +284,7 @@ public class WorkflowServiceImpl(
activeJobs += jobInstance
runningJobs.add(1)
+ _workflowsRunning++
rootListener.jobStarted(jobInstance)
}
@@ -350,6 +364,7 @@ public class WorkflowServiceImpl(
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
runningTasks.add(1)
+ _tasksRunning++
rootListener.taskStarted(task)
}
ServerState.TERMINATED, ServerState.ERROR -> {
@@ -368,6 +383,8 @@ public class WorkflowServiceImpl(
runningTasks.add(-1)
finishedTasks.add(1)
+ _tasksRunning--
+ _tasksFinished++
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -395,6 +412,8 @@ public class WorkflowServiceImpl(
activeJobs -= job
runningJobs.add(-1)
finishedJobs.add(1)
+ _workflowsRunning--
+ _workflowsFinished++
rootListener.jobFinished(job)
job.cont.resume(Unit)
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt
new file mode 100644
index 00000000..7c7d7c4d
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.telemetry
+
+/**
+ * Statistics about the workflow scheduler.
+ *
+ * @property workflowsSubmitted The number of workflows submitted to the scheduler.
+ * @property workflowsRunning The number of workflows that are currently running.
+ * @property workflowsFinished The number of workflows that have completed since the scheduler started.
+ * @property tasksSubmitted The number of tasks submitted to the scheduler.
+ * @property tasksRunning The number of tasks that are currently running.
+ * @property tasksFinished The number of tasks that have completed.
+ */
+public data class SchedulerStats(
+ val workflowsSubmitted: Int,
+ val workflowsRunning: Int,
+ val workflowsFinished: Int,
+ val tasksSubmitted: Int,
+ val tasksRunning: Int,
+ val tasksFinished: Int
+)
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 1fd332b9..d5f06587 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -22,7 +22,6 @@
package org.opendc.workflow.service
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
@@ -66,7 +65,6 @@ internal class WorkflowServiceTest {
@Test
fun testTrace() = runBlockingSimulation {
// Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
- val HOST_COUNT = 4
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
@@ -74,7 +72,8 @@ internal class WorkflowServiceTest {
val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
- repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) }
+ val hostCount = 4
+ repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) }
// Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
val workflowScheduler = WorkflowSchedulerSpec(
@@ -98,13 +97,13 @@ internal class WorkflowServiceTest {
computeHelper.close()
}
- val metrics = collectMetrics(workflowHelper.metricProducer)
+ val metrics = workflowHelper.service.getSchedulerStats()
assertAll(
- { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") },
- { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") },
- { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
- { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
+ { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") },
+ { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") },
+ { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") },
+ { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
{ assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } }
)
@@ -130,28 +129,4 @@ internal class WorkflowServiceTest {
SimSpaceSharedHypervisorProvider()
)
}
-
- class WorkflowMetrics {
- var jobsSubmitted = 0L
- var jobsActive = 0L
- var jobsFinished = 0L
- var tasksSubmitted = 0L
- var tasksActive = 0L
- var tasksFinished = 0L
- }
-
- /**
- * Collect the metrics of the workflow service.
- */
- private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics {
- val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
- val res = WorkflowMetrics()
- res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0
- res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0
- res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0
- res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0
- res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0
- res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0
- return res
- }
}