summaryrefslogtreecommitdiff
path: root/simulator/opendc-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-25 21:50:45 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-26 15:41:05 +0100
commit608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 (patch)
treef0130622f189815e41837993b6f66ba3fc11b899 /simulator/opendc-compute
parent0d66ef47d6e1ec0861b4939800c5070f96600ca0 (diff)
compute: Integrate OpenTelemetry Metrics in OpenDC Compute
This change integrates the OpenTelemetry Metrics API in the OpenDC Compute Service implementation. This replaces the old infrastructure for gathering metrics.
Diffstat (limited to 'simulator/opendc-compute')
-rw-r--r--simulator/opendc-compute/opendc-compute-service/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt9
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt47
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt174
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt2
6 files changed, 90 insertions, 147 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
index 41b506b2..909e2dcd 100644
--- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -32,6 +32,7 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-compute:opendc-compute-api"))
+ api(project(":opendc-telemetry:opendc-telemetry-api"))
implementation(project(":opendc-utils"))
implementation("io.github.microutils:kotlin-logging")
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 28cef83a..4bc0ba78 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.service
+import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.flow.Flow
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
@@ -35,11 +36,6 @@ import kotlin.coroutines.CoroutineContext
*/
public interface ComputeService : AutoCloseable {
/**
- * The events emitted by the service.
- */
- public val events: Flow<ComputeServiceEvent>
-
- /**
* The hosts that are used by the compute service.
*/
public val hosts: Set<Host>
@@ -80,10 +76,11 @@ public interface ComputeService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
+ meter: Meter,
allocationPolicy: AllocationPolicy,
schedulingQuantum: Long = 300000,
): ComputeService {
- return ComputeServiceImpl(context, clock, allocationPolicy, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meter, allocationPolicy, schedulingQuantum)
}
}
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt
deleted file mode 100644
index 193008a7..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service
-
-/**
- * An event that is emitted by the [ComputeService].
- */
-public sealed class ComputeServiceEvent {
- /**
- * The service that has emitted the event.
- */
- public abstract val provisioner: ComputeService
-
- /**
- * An event emitted for writing metrics.
- */
- public data class MetricsAvailable(
- override val provisioner: ComputeService,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
- ) : ComputeServiceEvent()
-}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index f9bd7fbc..26a34ad9 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,18 +22,16 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.utils.TimerScheduler
-import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -48,6 +46,7 @@ import kotlin.math.max
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
+ private val meter: Meter,
private val allocationPolicy: AllocationPolicy,
private val schedulingQuantum: Long
) : ComputeService, HostListener {
@@ -101,24 +100,70 @@ internal class ComputeServiceImpl(
*/
private val servers = mutableMapOf<UUID, InternalServer>()
- private var submittedVms: Int = 0
- private var queuedVms: Int = 0
- private var runningVms: Int = 0
- private var finishedVms: Int = 0
- private var unscheduledVms: Int = 0
-
private var maxCores = 0
private var maxMemory = 0L
/**
+ * The number of servers that have been submitted to the service for provisioning.
+ */
+ private val _submittedServers = meter.longCounterBuilder("servers.submitted")
+ .setDescription("Number of start requests")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that failed to be scheduled.
+ */
+ private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled")
+ .setDescription("Number of unscheduled servers")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that are waiting to be provisioned.
+ */
+ private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting")
+ .setDescription("Number of servers waiting to be provisioned")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that are waiting to be provisioned.
+ */
+ private val _runningServers = meter.longUpDownCounterBuilder("servers.active")
+ .setDescription("Number of servers currently running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that have finished running.
+ */
+ private val _finishedServers = meter.longCounterBuilder("servers.finished")
+ .setDescription("Number of servers that finished running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of hosts registered at the compute service.
+ */
+ private val _hostCount = meter.longUpDownCounterBuilder("hosts.total")
+ .setDescription("Number of hosts")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of available hosts registered at the compute service.
+ */
+ private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available")
+ .setDescription("Number of available hosts")
+ .setUnit("1")
+ .build()
+
+ /**
* The allocation logic to use.
*/
private val allocationLogic = allocationPolicy()
- override val events: Flow<ComputeServiceEvent>
- get() = _events
- private val _events = EventFlow<ComputeServiceEvent>()
-
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
@@ -204,18 +249,6 @@ internal class ComputeServiceImpl(
start: Boolean
): Server {
check(!isClosed) { "Client is closed" }
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
- )
- )
val uid = UUID(clock.millis(), random.nextLong())
val server = InternalServer(
@@ -269,14 +302,23 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
+ _availableHostCount.add(1)
availableHosts += hv
}
+ _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
- host.removeListener(this)
+ val view = hostToView.remove(host)
+ if (view != null) {
+ if (availableHosts.remove(view)) {
+ _availableHostCount.add(-1)
+ }
+ host.removeListener(this)
+ _hostCount.add(-1)
+ }
}
override fun close() {
@@ -288,6 +330,8 @@ internal class ComputeServiceImpl(
val request = SchedulingRequest(server)
queue.add(request)
+ _submittedServers.add(1)
+ _waitingServers.add(1)
requestSchedulingCycle()
return request
}
@@ -332,6 +376,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
+ _waitingServers.add(-1)
continue
}
@@ -341,21 +386,10 @@ internal class ComputeServiceImpl(
logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" }
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- --queuedVms,
- ++unscheduledVms
- )
- )
-
// Remove the incoming image
queue.poll()
+ _waitingServers.add(-1)
+ _unscheduledServers.add(1)
logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
@@ -370,6 +404,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
+ _waitingServers.add(-1)
logger.info { "Assigned server $server to host $host." }
@@ -384,19 +419,6 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- )
- )
} catch (e: Throwable) {
logger.error("Failed to deploy VM", e)
@@ -427,21 +449,9 @@ internal class ComputeServiceImpl(
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
+ _availableHostCount.add(1)
}
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
// Re-schedule on the new machine
requestSchedulingCycle()
}
@@ -450,19 +460,7 @@ internal class ComputeServiceImpl(
val hv = hostToView[host] ?: return
availableHosts -= hv
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
+ _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -480,23 +478,15 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ if (newState == ServerState.RUNNING) {
+ _runningServers.add(1)
+ } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
activeServers -= server
+ _runningServers.add(-1)
+ _finishedServers.add(1)
+
val hv = hostToView[host]
if (hv != null) {
hv.provisionedCores -= server.flavor.cpuCount
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index e1482152..45a306aa 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.service
import io.mockk.*
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.TestCoroutineScope
@@ -55,7 +56,8 @@ internal class ComputeServiceTest {
scope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(scope)
val policy = AvailableMemoryAllocationPolicy()
- service = ComputeService(scope.coroutineContext, clock, policy)
+ val meter = MeterProvider.noop().get("opendc-compute")
+ service = ComputeService(scope.coroutineContext, clock, meter, policy)
}
@AfterEach
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 2e4191cc..89784803 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -59,7 +59,7 @@ public class SimHost(
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
- override val scope: CoroutineScope = CoroutineScope(context)
+ override val scope: CoroutineScope = CoroutineScope(context + Job())
/**
* The logger instance of this server.