summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-service/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-04 14:43:17 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-04 14:43:17 +0200
commit564911a2458b3c54834d5cbfed91f502e9856566 (patch)
treecca136c69c5aef87071b5781da0f9a260683186b /opendc-compute/opendc-compute-service/src
parent4883d8eb0c6cae82153aaf5f12561014d08cdc41 (diff)
refactor(compute): Directly expose scheduler stats to user
This change updates the `ComputeService` interface to directly expose statistics about the scheduler to the user, such that they do not necessarily have to interact with OpenTelemetry to obtain these values.
Diffstat (limited to 'opendc-compute/opendc-compute-service/src')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt6
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt5
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt33
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt3
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt46
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt2
6 files changed, 90 insertions, 5 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index e8cd9e54..3a6baaa1 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -29,6 +29,7 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import kotlin.coroutines.CoroutineContext
@@ -67,6 +68,11 @@ public interface ComputeService : AutoCloseable {
*/
public fun lookupHost(server: Server): Host?
+ /**
+ * Collect the statistics about the scheduler component of this service.
+ */
+ public fun getSchedulerStats(): SchedulerStats
+
public companion object {
/**
* Construct a new [ComputeService] implementation.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
index f2929bf3..45775640 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
@@ -27,6 +27,7 @@ import org.opendc.compute.api.Image
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
+import java.time.Instant
import java.util.*
/**
@@ -55,6 +56,9 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
override var state: ServerState = delegate.state
private set
+ override var launchedAt: Instant? = null
+ private set
+
override suspend fun start() {
delegate.start()
refresh()
@@ -95,6 +99,7 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
labels = delegate.labels
meta = delegate.meta
state = delegate.state
+ launchedAt = delegate.launchedAt
}
override fun onStateChanged(server: Server, newState: ServerState) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index b7a47a06..e8664e5c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -36,8 +36,10 @@ import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -126,6 +128,9 @@ internal class ComputeServiceImpl(
private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success")
private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure")
private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error")
+ private var _attemptsSuccess = 0L
+ private var _attemptsFailure = 0L
+ private var _attemptsError = 0L
/**
* The response time of the service.
@@ -145,6 +150,8 @@ internal class ComputeServiceImpl(
.build()
private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending")
private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active")
+ private var _serversPending = 0
+ private var _serversActive = 0
/**
* The [Pacer] to use for scheduling the scheduler cycles.
@@ -328,13 +335,26 @@ internal class ComputeServiceImpl(
scope.cancel()
}
+ override fun getSchedulerStats(): SchedulerStats {
+ return SchedulerStats(
+ availableHosts.size,
+ hostToView.size - availableHosts.size,
+ _attemptsSuccess,
+ _attemptsFailure,
+ _attemptsError,
+ _serversPending,
+ _serversActive
+ )
+ }
+
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
val now = clock.millis()
val request = SchedulingRequest(server, now)
- server.lastProvisioningTimestamp = now
+ server.launchedAt = Instant.ofEpochMilli(now)
queue.add(request)
+ _serversPending++
_servers.add(1, _serversPendingAttr)
requestSchedulingCycle()
return request
@@ -373,6 +393,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
+ _serversPending--
_servers.add(-1, _serversPendingAttr)
continue
}
@@ -385,7 +406,9 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
+ _serversPending--
_servers.add(-1, _serversPendingAttr)
+ _attemptsFailure++
_schedulingAttempts.add(1, _schedulingAttemptsFailureAttr)
logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
@@ -401,6 +424,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
+ _serversPending--
_servers.add(-1, _serversPendingAttr)
_schedulingLatency.record(now - request.submitTime, server.attributes)
@@ -419,6 +443,8 @@ internal class ComputeServiceImpl(
activeServers[server] = host
_servers.add(1, _serversActiveAttr)
+ _serversActive++
+ _attemptsSuccess++
_schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr)
} catch (e: Throwable) {
logger.error(e) { "Failed to deploy VM" }
@@ -427,6 +453,7 @@ internal class ComputeServiceImpl(
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+ _attemptsError++
_schedulingAttempts.add(1, _schedulingAttemptsErrorAttr)
}
}
@@ -483,6 +510,7 @@ internal class ComputeServiceImpl(
logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
if (activeServers.remove(server) != null) {
+ _serversActive--
_servers.add(-1, _serversActiveAttr)
}
@@ -505,7 +533,8 @@ internal class ComputeServiceImpl(
*/
private fun collectProvisionTime(result: ObservableLongMeasurement) {
for ((_, server) in servers) {
- result.record(server.lastProvisioningTimestamp, server.attributes)
+ val launchedAt = server.launchedAt ?: continue
+ result.record(launchedAt.toEpochMilli(), server.attributes)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index f1b92c66..d2a2d896 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -28,6 +28,7 @@ import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
+import java.time.Instant
import java.util.UUID
/**
@@ -75,7 +76,7 @@ internal class InternalServer(
/**
* The most recent timestamp when the server entered a provisioning state.
*/
- @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE
+ override var launchedAt: Instant? = null
/**
* The current scheduling request.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt
new file mode 100644
index 00000000..4dc70286
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/telemetry/SchedulerStats.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.telemetry
+
+import org.opendc.compute.service.ComputeService
+
+/**
+ * Statistics about the scheduling component of the [ComputeService].
+ *
+ * @property hostsAvailable The number of hosts currently available for scheduling.
+ * @property hostsUnavailable The number of hosts unavailable for scheduling.
+ * @property attemptsSuccess Scheduling attempts that resulted into an allocation onto a host.
+ * @property attemptsFailure The number of failed scheduling attempt due to insufficient capacity at the moment.
+ * @property attemptsError The number of scheduling attempts that failed due to system error.
+ * @property serversPending The number of servers that are pending to be scheduled.
+ * @property serversActive The number of servers that are currently managed by the service and running.
+ */
+public data class SchedulerStats(
+ val hostsAvailable: Int,
+ val hostsUnavailable: Int,
+ val attemptsSuccess: Long,
+ val attemptsFailure: Long,
+ val attemptsError: Long,
+ val serversPending: Int,
+ val serversActive: Int
+)
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index dfd3bc67..4e5a37ae 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -23,7 +23,6 @@
package org.opendc.compute.service
import io.mockk.*
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
@@ -41,7 +40,6 @@ import java.util.*
/**
* Test suite for the [InternalServer] implementation.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
class InternalServerTest {
@Test
fun testEquality() {