summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt10
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt159
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt18
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt3
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt81
-rw-r--r--opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt509
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt305
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt38
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt134
10 files changed, 795 insertions, 463 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 1873eb99..2a1fbaa0 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -23,11 +23,13 @@
package org.opendc.compute.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
* @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
scheduler: ComputeScheduler,
- schedulingQuantum: Long = 300000,
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ComputeService {
- return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index f1c055d4..57e70fcd 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,9 +22,10 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -35,6 +36,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -42,15 +44,18 @@ import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param context The [CoroutineContext] to use.
- * @param clock The clock instance to keep track of time.
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meterProvider The [MeterProvider] for creating a [Meter] for the service.
+ * @param scheduler The scheduler implementation to use.
+ * @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val scheduler: ComputeScheduler,
- private val schedulingQuantum: Long
+ private val schedulingQuantum: Duration
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -63,6 +68,11 @@ internal class ComputeServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the [ComputeService].
+ */
+ private val meter = meterProvider.get("org.opendc.compute.service")
+
+ /**
* The [Random] instance used to generate unique identifiers for the objects.
*/
private val random = Random(0)
@@ -106,69 +116,37 @@ internal class ComputeServiceImpl(
private var maxMemory = 0L
/**
- * The number of servers that have been submitted to the service for provisioning.
- */
- private val _submittedServers = meter.counterBuilder("servers.submitted")
- .setDescription("Number of start requests")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that failed to be scheduled.
- */
- private val _unscheduledServers = meter.counterBuilder("servers.unscheduled")
- .setDescription("Number of unscheduled servers")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _waitingServers = meter.upDownCounterBuilder("servers.waiting")
- .setDescription("Number of servers waiting to be provisioned")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that are waiting to be provisioned.
- */
- private val _runningServers = meter.upDownCounterBuilder("servers.active")
- .setDescription("Number of servers currently running")
- .setUnit("1")
- .build()
-
- /**
- * The number of servers that have finished running.
+ * The number of scheduling attempts.
*/
- private val _finishedServers = meter.counterBuilder("servers.finished")
- .setDescription("Number of servers that finished running")
+ private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts")
+ .setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
+ private val _schedulingAttemptsSuccess = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
+ private val _schedulingAttemptsFailure = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
+ private val _schedulingAttemptsError = _schedulingAttempts
+ .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
/**
- * The number of hosts registered at the compute service.
+ * The response time of the service.
*/
- private val _hostCount = meter.upDownCounterBuilder("hosts.total")
- .setDescription("Number of hosts")
- .setUnit("1")
+ private val _schedulingLatency = meter.histogramBuilder("scheduler.latency")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
.build()
/**
- * The number of available hosts registered at the compute service.
+ * The number of servers that are pending.
*/
- private val _availableHostCount = meter.upDownCounterBuilder("hosts.available")
- .setDescription("Number of available hosts")
+ private val _servers = meter.upDownCounterBuilder("scheduler.servers")
+ .setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
-
- /**
- * The response time of the service.
- */
- private val _schedulerDuration = meter.histogramBuilder("scheduler.duration")
- .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
- .ofLongs()
- .setUnit("ms")
- .build()
+ private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
+ private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -181,6 +159,22 @@ internal class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
+ init {
+ val upState = Attributes.of(AttributeKey.stringKey("state"), "up")
+ val downState = Attributes.of(AttributeKey.stringKey("state"), "down")
+
+ meter.upDownCounterBuilder("scheduler.hosts")
+ .setDescription("Number of hosts registered with the scheduler")
+ .setUnit("1")
+ .buildWithCallback { result ->
+ val total = hostCount
+ val available = availableHosts.size.toLong()
+
+ result.observe(available, upState)
+ result.observe(total - available, downState)
+ }
+ }
+
override fun newClient(): ComputeClient {
check(scope.isActive) { "Service is already closed" }
return object : ComputeClient {
@@ -308,24 +302,19 @@ internal class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
- _availableHostCount.add(1)
availableHosts += hv
}
scheduler.addHost(hv)
- _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
val view = hostToView.remove(host)
if (view != null) {
- if (availableHosts.remove(view)) {
- _availableHostCount.add(-1)
- }
+ availableHosts.remove(view)
scheduler.removeHost(view)
host.removeListener(this)
- _hostCount.add(-1)
}
}
@@ -338,8 +327,7 @@ internal class ComputeServiceImpl(
val request = SchedulingRequest(server, clock.millis())
queue.add(request)
- _submittedServers.add(1)
- _waitingServers.add(1)
+ _serversPending.add(1)
requestSchedulingCycle()
return request
}
@@ -365,10 +353,12 @@ internal class ComputeServiceImpl(
return
}
+ val quantum = schedulingQuantum.toMillis()
+
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
// We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+ val delay = quantum - (clock.millis() % quantum)
timerScheduler.startSingleTimer(Unit, delay) {
doSchedule()
@@ -385,7 +375,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
- _waitingServers.add(-1)
+ _serversPending.add(-1)
continue
}
@@ -397,10 +387,10 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _waitingServers.add(-1)
- _unscheduledServers.add(1)
+ _serversPending.add(-1)
+ _schedulingAttemptsFailure.add(1)
- logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
server.state = ServerState.TERMINATED
continue
@@ -413,8 +403,8 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _waitingServers.add(-1)
- _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString()))
+ _serversPending.add(-1)
+ _schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -429,12 +419,17 @@ internal class ComputeServiceImpl(
server.host = host
host.spawn(server)
activeServers[server] = host
+
+ _serversActive.add(1)
+ _schedulingAttemptsSuccess.add(1)
} catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ logger.error(e) { "Failed to deploy VM" }
hv.instanceCount--
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
+
+ _schedulingAttemptsError.add(1)
}
}
}
@@ -453,24 +448,22 @@ internal class ComputeServiceImpl(
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host]
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
- _availableHostCount.add(1)
}
// Re-schedule on the new machine
requestSchedulingCycle()
}
HostState.DOWN -> {
- logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+ logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" }
val hv = hostToView[host] ?: return
availableHosts -= hv
- _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -488,16 +481,12 @@ internal class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.RUNNING) {
- _runningServers.add(1)
- } else if (newState == ServerState.ERROR) {
- _runningServers.add(-1)
- } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
- logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- activeServers -= server
- _runningServers.add(-1)
- _finishedServers.add(1)
+ if (activeServers.remove(server) != null) {
+ _serversActive.add(-1)
+ }
val hv = hostToView[host]
if (hv != null) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index d9d0f3fc..05a7e1bf 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -22,6 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.driver.Host
@@ -50,6 +53,21 @@ internal class InternalServer(
private val watchers = mutableListOf<ServerWatcher>()
/**
+ * The attributes of a server.
+ */
+ internal val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, name)
+ .put(ResourceAttributes.HOST_ID, uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString())
+ .build()
+
+ /**
* The [Host] that has been assigned to host the server.
*/
internal var host: Host? = null
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index d036ec00..564f9493 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -61,8 +61,7 @@ internal class ComputeServiceTest {
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher())
)
- val meter = MeterProvider.noop().get("opendc-compute")
- service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler)
+ service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler)
}
@Test
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 28fd8217..dfd3bc67 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -47,8 +47,9 @@ class InternalServerTest {
fun testEquality() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
+
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
@@ -59,8 +60,8 @@ class InternalServerTest {
fun testEqualityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -73,8 +74,8 @@ class InternalServerTest {
fun testInequalityWithDifferentType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val b = mockk<Server>(relaxUnitFun = true)
@@ -87,8 +88,8 @@ class InternalServerTest {
fun testInequalityWithIncorrectType() {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
assertNotEquals(a, Unit)
@@ -98,8 +99,8 @@ class InternalServerTest {
fun testStartTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
@@ -114,8 +115,8 @@ class InternalServerTest {
fun testStartDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -127,8 +128,8 @@ class InternalServerTest {
fun testStartProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.PROVISIONING
@@ -142,8 +143,8 @@ class InternalServerTest {
fun testStartRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.RUNNING
@@ -157,8 +158,8 @@ class InternalServerTest {
fun testStopProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -175,8 +176,8 @@ class InternalServerTest {
fun testStopTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -189,8 +190,8 @@ class InternalServerTest {
fun testStopDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -203,8 +204,8 @@ class InternalServerTest {
fun testStopRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>()
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -220,8 +221,8 @@ class InternalServerTest {
fun testDeleteProvisioningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val request = ComputeServiceImpl.SchedulingRequest(server, 0)
@@ -239,8 +240,8 @@ class InternalServerTest {
fun testDeleteTerminatedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.TERMINATED
@@ -255,8 +256,8 @@ class InternalServerTest {
fun testDeleteDeletedServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
server.state = ServerState.DELETED
@@ -269,8 +270,8 @@ class InternalServerTest {
fun testDeleteRunningServer() = runBlockingSimulation {
val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
val uid = UUID.randomUUID()
- val flavor = mockk<InternalFlavor>()
- val image = mockk<InternalImage>()
+ val flavor = mockFlavor()
+ val image = mockImage()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
val host = mockk<Host>(relaxUnitFun = true)
@@ -282,4 +283,20 @@ class InternalServerTest {
coVerify { host.delete(server) }
verify { service.delete(server) }
}
+
+ private fun mockFlavor(): InternalFlavor {
+ val flavor = mockk<InternalFlavor>()
+ every { flavor.name } returns "c5.large"
+ every { flavor.uid } returns UUID.randomUUID()
+ every { flavor.cpuCount } returns 2
+ every { flavor.memorySize } returns 4096
+ return flavor
+ }
+
+ private fun mockImage(): InternalImage {
+ val image = mockk<InternalImage>()
+ every { image.name } returns "ubuntu-20.04"
+ every { image.uid } returns UUID.randomUUID()
+ return image
+ }
}
diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts
index cad051e6..aaf69f78 100644
--- a/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -40,5 +40,6 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ testImplementation(projects.opendcTelemetry.opendcTelemetryCompute)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index a1cc3390..793db907 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -25,13 +25,17 @@ package org.opendc.compute.simulator
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
+import org.opendc.compute.simulator.internal.Guest
+import org.opendc.compute.simulator.internal.GuestListener
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimHypervisorProvider
@@ -43,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.resources.SimResourceDistributorMaxMin
import org.opendc.simulator.resources.SimResourceInterpreter
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
+import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -59,7 +63,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
interpreter: SimResourceInterpreter,
- private val meter: Meter,
+ meterProvider: MeterProvider,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
@@ -82,6 +86,11 @@ public class SimHost(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to track metrics of the simulated host.
+ */
+ private val meter = meterProvider.get("org.opendc.compute.simulator")
+
+ /**
* The event listeners registered with this host.
*/
private val listeners = mutableListOf<HostListener>()
@@ -94,32 +103,29 @@ public class SimHost(
/**
* The hypervisor to run multiple workloads.
*/
- public val hypervisor: SimHypervisor = hypervisor.create(
+ private val hypervisor: SimHypervisor = hypervisor.create(
interpreter,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
- requestedWork: Double,
+ totalWork: Double,
grantedWork: Double,
overcommittedWork: Double,
interferedWork: Double,
cpuUsage: Double,
cpuDemand: Double
) {
- _totalWork.add(requestedWork)
- _grantedWork.add(grantedWork)
- _overcommittedWork.add(overcommittedWork)
- _interferedWork.add(interferedWork)
- _cpuDemand.record(cpuDemand)
- _cpuUsage.record(cpuUsage)
- _powerUsage.record(machine.powerDraw)
-
- reportTime()
+ _cpuDemand = cpuDemand
+ _cpuUsage = cpuUsage
+
+ collectTime()
}
}
)
+ private var _cpuUsage = 0.0
+ private var _cpuDemand = 0.0
/**
* The virtual machines running on the hypervisor.
@@ -139,121 +145,23 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
/**
- * The total number of guests.
+ * The [GuestListener] that listens for guest events.
*/
- private val _guests = meter.upDownCounterBuilder("guests.total")
- .setDescription("Number of guests")
- .setUnit("1")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The number of active guests on the host.
- */
- private val _activeGuests = meter.upDownCounterBuilder("guests.active")
- .setDescription("Number of active guests")
- .setUnit("1")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The CPU demand of the host.
- */
- private val _cpuDemand = meter.histogramBuilder("cpu.demand")
- .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
- .setUnit("MHz")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The CPU usage of the host.
- */
- private val _cpuUsage = meter.histogramBuilder("cpu.usage")
- .setDescription("The amount of CPU resources used by the host")
- .setUnit("MHz")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The power usage of the host.
- */
- private val _powerUsage = meter.histogramBuilder("power.usage")
- .setDescription("The amount of power used by the CPU")
- .setUnit("W")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The total amount of work supplied to the CPU.
- */
- private val _totalWork = meter.counterBuilder("cpu.work.total")
- .setDescription("The amount of work supplied to the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The work performed by the CPU.
- */
- private val _grantedWork = meter.counterBuilder("cpu.work.granted")
- .setDescription("The amount of work performed by the CPU")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount not performed by the CPU due to overcommitment.
- */
- private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit")
- .setDescription("The amount of work not performed by the CPU due to overcommitment")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount of work not performed by the CPU due to interference.
- */
- private val _interferedWork = meter.counterBuilder("cpu.work.interference")
- .setDescription("The amount of work not performed by the CPU due to interference")
- .setUnit("1")
- .ofDoubles()
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("host.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
-
- /**
- * The uptime of the host.
- */
- private val _upTime = meter.counterBuilder("host.time.up")
- .setDescription("The uptime of the host")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ private val guestListener = object : GuestListener {
+ override fun onStart(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
- /**
- * The downtime of the host.
- */
- private val _downTime = meter.counterBuilder("host.time.down")
- .setDescription("The downtime of the host")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ override fun onStop(guest: Guest) {
+ listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) }
+ }
+ }
init {
// Launch hypervisor onto machine
scope.launch {
try {
+ _bootTime = clock.millis()
_state = HostState.UP
machine.run(this@SimHost.hypervisor, emptyMap())
} catch (_: CancellationException) {
@@ -265,29 +173,48 @@ public class SimHost(
_state = HostState.DOWN
}
}
- }
-
- private var _lastReport = clock.millis()
-
- private fun reportTime() {
- if (!scope.isActive)
- return
-
- val now = clock.millis()
- val duration = now - _lastReport
-
- _totalTime.add(duration)
- when (_state) {
- HostState.UP -> _upTime.add(duration)
- HostState.DOWN -> _downTime.add(duration)
- }
-
- // Track time of guests
- for (guest in guests.values) {
- guest.reportTime()
- }
- _lastReport = now
+ meter.upDownCounterBuilder("system.guests")
+ .setDescription("Number of guests on this host")
+ .setUnit("1")
+ .buildWithCallback(::collectGuests)
+ meter.gaugeBuilder("system.cpu.limit")
+ .setDescription("Amount of CPU resources available to the host")
+ .buildWithCallback(::collectCpuLimit)
+ meter.gaugeBuilder("system.cpu.demand")
+ .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuDemand) }
+ meter.gaugeBuilder("system.cpu.usage")
+ .setDescription("Amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .buildWithCallback { result -> result.observe(_cpuUsage) }
+ meter.gaugeBuilder("system.cpu.utilization")
+ .setDescription("Utilization of the CPU resources of the host")
+ .setUnit("%")
+ .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ meter.counterBuilder("system.cpu.time")
+ .setDescription("Amount of CPU time spent by the host")
+ .setUnit("s")
+ .buildWithCallback(::collectCpuTime)
+ meter.gaugeBuilder("system.power.usage")
+ .setDescription("Power usage of the host ")
+ .setUnit("W")
+ .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ meter.counterBuilder("system.power.total")
+ .setDescription("Amount of energy used by the CPU")
+ .setUnit("J")
+ .ofDoubles()
+ .buildWithCallback(::collectPowerTotal)
+ meter.counterBuilder("system.time")
+ .setDescription("The uptime of the host")
+ .setUnit("s")
+ .buildWithCallback(::collectTime)
+ meter.gaugeBuilder("system.time.boot")
+ .setDescription("The boot time of the host")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectBootTime)
}
override fun canFit(server: Server): Boolean {
@@ -301,8 +228,17 @@ public class SimHost(
override suspend fun spawn(server: Server, start: Boolean) {
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- _guests.add(1)
- Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name))
+
+ val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ Guest(
+ scope.coroutineContext,
+ clock,
+ this,
+ mapper,
+ guestListener,
+ server,
+ machine
+ )
}
if (start) {
@@ -326,7 +262,6 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
- _guests.add(-1)
guest.terminate()
}
@@ -339,7 +274,7 @@ public class SimHost(
}
override fun close() {
- reportTime()
+ reset()
scope.cancel()
machine.close()
}
@@ -347,22 +282,35 @@ public class SimHost(
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
public suspend fun fail() {
- reportTime()
- _state = HostState.DOWN
+ reset()
+
for (guest in guests.values) {
guest.fail()
}
}
public suspend fun recover() {
- reportTime()
+ collectTime()
_state = HostState.UP
+ _bootTime = clock.millis()
+
for (guest in guests.values) {
guest.start()
}
}
/**
+ * Reset the machine.
+ */
+ private fun reset() {
+ collectTime()
+
+ _state = HostState.DOWN
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ }
+
+ /**
* Convert flavor to machine model.
*/
private fun Flavor.toMachineModel(): MachineModel {
@@ -374,147 +322,168 @@ public class SimHost(
return MachineModel(processingUnits, memoryUnits)
}
- private fun onGuestStart(vm: Guest) {
- _activeGuests.add(1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val STATE_KEY = AttributeKey.stringKey("state")
- private fun onGuestStop(vm: Guest) {
- _activeGuests.add(-1)
- listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- }
+ private val terminatedState = Attributes.of(STATE_KEY, "terminated")
+ private val runningState = Attributes.of(STATE_KEY, "running")
+ private val errorState = Attributes.of(STATE_KEY, "error")
+ private val invalidState = Attributes.of(STATE_KEY, "invalid")
/**
- * A virtual machine instance that the driver manages.
+ * Helper function to collect the guest counts on this host.
*/
- private inner class Guest(val server: Server, val machine: SimMachine) {
- var state: ServerState = ServerState.TERMINATED
-
- /**
- * The amount of time in the system.
- */
- private val _totalTime = meter.counterBuilder("guest.time.total")
- .setDescription("The amount of time in the system")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- /**
- * The uptime of the guest.
- */
- private val _runningTime = meter.counterBuilder("guest.time.running")
- .setDescription("The uptime of the guest")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- /**
- * The time the guest is in an error state.
- */
- private val _errorTime = meter.counterBuilder("guest.time.error")
- .setDescription("The time the guest is in an error state")
- .setUnit("ms")
- .build()
- .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
-
- suspend fun start() {
- when (state) {
- ServerState.TERMINATED, ServerState.ERROR -> {
- logger.info { "User requested to start server ${server.uid}" }
- launch()
- }
- ServerState.RUNNING -> return
- ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
- }
- else -> assert(false) { "Invalid state transition" }
+ private fun collectGuests(result: ObservableLongMeasurement) {
+ var terminated = 0L
+ var running = 0L
+ var error = 0L
+ var invalid = 0L
+
+ for ((_, guest) in guests) {
+ when (guest.state) {
+ ServerState.TERMINATED -> terminated++
+ ServerState.RUNNING -> running++
+ ServerState.ERROR -> error++
+ else -> invalid++
}
}
- suspend fun stop() {
- when (state) {
- ServerState.RUNNING, ServerState.ERROR -> {
- val job = job ?: throw IllegalStateException("Server should be active")
- job.cancel()
- job.join()
- }
- ServerState.TERMINATED, ServerState.DELETED -> return
- else -> assert(false) { "Invalid state transition" }
- }
- }
+ result.observe(terminated, terminatedState)
+ result.observe(running, runningState)
+ result.observe(error, errorState)
+ result.observe(invalid, invalidState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
- suspend fun terminate() {
- stop()
- machine.close()
- state = ServerState.DELETED
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit)
+
+ for (guest in guests.values) {
+ guest.collectCpuLimit(result)
}
+ }
- suspend fun fail() {
- if (state != ServerState.RUNNING) {
- return
- }
- stop()
- state = ServerState.ERROR
+ private var _lastCpuTimeCallback = clock.millis()
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastCpuTimeCallback
+
+ try {
+ collectCpuTime(duration, result)
+ } finally {
+ _lastCpuTimeCallback = now
}
+ }
- private var job: Job? = null
-
- private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
- assert(job == null) { "Concurrent job running" }
- val workload = mapper.createWorkload(server)
-
- job = scope.launch {
- try {
- delay(1) // TODO Introduce boot time
- init()
- cont.resume(Unit)
- } catch (e: Throwable) {
- cont.resumeWithException(e)
- }
- try {
- machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- job = null
- }
- }
+ private val _activeState = Attributes.of(STATE_KEY, "active")
+ private val _stealState = Attributes.of(STATE_KEY, "steal")
+ private val _lostState = Attributes.of(STATE_KEY, "lost")
+ private val _idleState = Attributes.of(STATE_KEY, "idle")
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = this.model.cpuCount
+ val d = coreCount / _cpuLimit
+
+ val counters = hypervisor.counters
+ val grantedWork = counters.actual
+ val overcommittedWork = counters.overcommit
+ val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+ val lostTime = (interferedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(lostTime, _lostState)
+
+ for (guest in guests.values) {
+ guest.collectCpuTime(duration, result)
}
+ }
- private fun init() {
- state = ServerState.RUNNING
- onGuestStart(this)
+ private var _lastPowerCallback = clock.millis()
+ private var _totalPower = 0.0
+
+ /**
+ * Helper function to collect the total power usage of the machine.
+ */
+ private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
+ val now = clock.millis()
+ val duration = now - _lastPowerCallback
+
+ _totalPower += duration / 1000.0 * machine.powerDraw
+ result.observe(_totalPower)
+
+ _lastPowerCallback = now
+ }
+
+ private var _lastReport = clock.millis()
+
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(result: ObservableLongMeasurement? = null) {
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ try {
+ collectTime(duration, result)
+ } finally {
+ _lastReport = now
}
+ }
- private fun exit(cause: Throwable?) {
- state =
- if (cause == null)
- ServerState.TERMINATED
- else
- ServerState.ERROR
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.of(STATE_KEY, "up")
+ private val _downState = Attributes.of(STATE_KEY, "down")
- onGuestStop(this)
+ /**
+ * Helper function to track the uptime of a machine.
+ */
+ private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == HostState.UP) {
+ _uptime += duration
+ } else if (state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
}
- private var _lastReport = clock.millis()
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
- fun reportTime() {
- if (state == ServerState.DELETED)
- return
+ for (guest in guests.values) {
+ guest.collectUptime(duration, result)
+ }
+ }
- val now = clock.millis()
- val duration = now - _lastReport
+ private var _bootTime = Long.MIN_VALUE
- _totalTime.add(duration)
- when (state) {
- ServerState.RUNNING -> _runningTime.add(duration)
- ServerState.ERROR -> _errorTime.add(duration)
- else -> {}
- }
+ /**
+ * Helper function to track the boot time of a machine.
+ */
+ private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
- _lastReport = now
+ for (guest in guests.values) {
+ guest.collectBootTime(result)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
new file mode 100644
index 00000000..90562e2f
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.*
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.SimAbstractMachine
+import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+import kotlin.math.roundToLong
+
+/**
+ * A virtual machine instance that is managed by a [SimHost].
+ */
+internal class Guest(
+ context: CoroutineContext,
+ private val clock: Clock,
+ val host: SimHost,
+ private val mapper: SimWorkloadMapper,
+ private val listener: GuestListener,
+ val server: Server,
+ val machine: SimMachine
+) {
+ /**
+ * The [CoroutineScope] of the guest.
+ */
+ private val scope: CoroutineScope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this guest.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The state of the [Guest].
+ *
+ * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for
+ * a server.
+ */
+ var state: ServerState = ServerState.TERMINATED
+
+ /**
+ * The attributes of the guest.
+ */
+ val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.HOST_NAME, server.name)
+ .put(ResourceAttributes.HOST_ID, server.uid.toString())
+ .put(ResourceAttributes.HOST_TYPE, server.flavor.name)
+ .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong())
+ .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize)
+ .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" })
+ .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name)
+ .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString())
+ .build()
+
+ /**
+ * Start the guest.
+ */
+ suspend fun start() {
+ when (state) {
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ doStart()
+ }
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Stop the guest.
+ */
+ suspend fun stop() {
+ when (state) {
+ ServerState.RUNNING -> doStop(ServerState.TERMINATED)
+ ServerState.ERROR -> doRecover()
+ ServerState.TERMINATED, ServerState.DELETED -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ /**
+ * Terminate the guest.
+ *
+ * This operation will stop the guest if it is running on the host and remove all resources associated with the
+ * guest.
+ */
+ suspend fun terminate() {
+ stop()
+
+ state = ServerState.DELETED
+
+ machine.close()
+ scope.cancel()
+ }
+
+ /**
+ * Fail the guest if it is active.
+ *
+ * This operation forcibly stops the guest and puts the server into an error state.
+ */
+ suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
+
+ doStop(ServerState.ERROR)
+ }
+
+ /**
+ * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
+ */
+ private var job: Job? = null
+
+ /**
+ * Launch the guest on the simulated
+ */
+ private suspend fun doStart() {
+ assert(job == null) { "Concurrent job running" }
+ val workload = mapper.createWorkload(server)
+
+ val job = scope.launch { runMachine(workload) }
+ this.job = job
+
+ state = ServerState.RUNNING
+ onStart()
+
+ job.invokeOnCompletion { cause ->
+ this.job = null
+ onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED)
+ }
+ }
+
+ /**
+ * Attempt to stop the server and put it into [target] state.
+ */
+ private suspend fun doStop(target: ServerState) {
+ assert(job != null) { "Invalid job state" }
+ val job = job ?: return
+ job.cancel()
+ job.join()
+
+ state = target
+ }
+
+ /**
+ * Attempt to recover from an error state.
+ */
+ private fun doRecover() {
+ state = ServerState.TERMINATED
+ }
+
+ /**
+ * Run the process that models the virtual machine lifecycle as a coroutine.
+ */
+ private suspend fun runMachine(workload: SimWorkload) {
+ delay(1) // TODO Introduce model for boot time
+ machine.run(workload, mapOf("driver" to host, "server" to server))
+ }
+
+ /**
+ * This method is invoked when the guest was started on the host and has booted into a running state.
+ */
+ private fun onStart() {
+ _bootTime = clock.millis()
+ state = ServerState.RUNNING
+ listener.onStart(this)
+ }
+
+ /**
+ * This method is invoked when the guest stopped.
+ */
+ private fun onStop(target: ServerState) {
+ state = target
+ listener.onStop(this)
+ }
+
+ private val STATE_KEY = AttributeKey.stringKey("state")
+
+ private var _uptime = 0L
+ private var _downtime = 0L
+ private val _upState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "up")
+ .build()
+ private val _downState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "down")
+ .build()
+
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ if (state == ServerState.RUNNING) {
+ _uptime += duration
+ } else if (state == ServerState.ERROR) {
+ _downtime += duration
+ }
+
+ result?.observe(_uptime, _upState)
+ result?.observe(_downtime, _downState)
+ }
+
+ private var _bootTime = Long.MIN_VALUE
+
+ /**
+ * Helper function to track the boot time of the guest.
+ */
+ fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ if (_bootTime != Long.MIN_VALUE) {
+ result?.observe(_bootTime)
+ }
+ }
+
+ private val _activeState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "active")
+ .build()
+ private val _stealState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "steal")
+ .build()
+ private val _lostState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "lost")
+ .build()
+ private val _idleState = Attributes.builder()
+ .putAll(attributes)
+ .put(STATE_KEY, "idle")
+ .build()
+ private var _totalTime = 0.0
+
+ /**
+ * Helper function to track the CPU time of a machine.
+ */
+ fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
+ val coreCount = server.flavor.cpuCount
+ val d = coreCount / _cpuLimit
+
+ var grantedWork = 0.0
+ var overcommittedWork = 0.0
+
+ for (cpu in (machine as SimAbstractMachine).cpus) {
+ val counters = cpu.counters
+ grantedWork += counters.actual
+ overcommittedWork += counters.overcommit
+ }
+
+ _totalTime += (duration / 1000.0) * coreCount
+ val activeTime = (grantedWork * d).roundToLong()
+ val idleTime = (_totalTime - grantedWork * d).roundToLong()
+ val stealTime = (overcommittedWork * d).roundToLong()
+
+ result.observe(activeTime, _activeState)
+ result.observe(idleTime, _idleState)
+ result.observe(stealTime, _stealState)
+ result.observe(0, _lostState)
+ }
+
+ private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
+
+ /**
+ * Helper function to collect the CPU limits of a machine.
+ */
+ fun collectCpuLimit(result: ObservableDoubleMeasurement) {
+ result.observe(_cpuLimit, attributes)
+ }
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
new file mode 100644
index 00000000..e6d0fdad
--- /dev/null
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator.internal
+
+/**
+ * Helper interface to listen for [Guest] events.
+ */
+internal interface GuestListener {
+ /**
+ * This method is invoked when the guest machine is running.
+ */
+ fun onStart(guest: Guest)
+
+ /**
+ * This method is invoked when the guest machine is stopped.
+ */
+ fun onStop(guest: Guest)
+}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 31215e9a..9c879e5e 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -23,11 +23,9 @@
package org.opendc.compute.simulator
import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.MetricData
-import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -44,15 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.HOST_ID
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
private lateinit var machineModel: MachineModel
@@ -71,24 +74,29 @@ internal class SimHostTest {
*/
@Test
fun testOvercommitted() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var overcommittedWork = 0L
+ var idleTime = 0L
+ var activeTime = 0L
+ var stealTime = 0L
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val virtDriver = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -130,26 +138,17 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
}
- metricsByName["cpu.work.overcommit"]?.let {
- overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- return CompletableResultCode.ofSuccess()
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = duration * 1000L
+ ),
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -175,9 +174,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(4147200, requestedWork, "Requested work does not match") },
- { assertEquals(2107200, grantedWork, "Granted work does not match") },
- { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(659, activeTime, "Active time does not match") },
+ { assertEquals(2342, idleTime, "Idle time does not match") },
+ { assertEquals(638, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -187,27 +186,32 @@ internal class SimHostTest {
*/
@Test
fun testFailure() = runBlockingSimulation {
- var requestedWork = 0L
- var grantedWork = 0L
- var totalTime = 0L
- var downTime = 0L
- var guestTotalTime = 0L
- var guestDownTime = 0L
-
+ var activeTime = 0L
+ var idleTime = 0L
+ var uptime = 0L
+ var downtime = 0L
+ var guestUptime = 0L
+ var guestDowntime = 0L
+
+ val hostId = UUID.randomUUID()
+ val hostResource = Resource.builder()
+ .put(HOST_ID, hostId.toString())
+ .build()
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
+ .setResource(hostResource)
.setClock(clock.toOtelClock())
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
val host = SimHost(
- uid = UUID.randomUUID(),
+ uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
interpreter,
- meterProvider.get("opendc-compute-simulator"),
+ meterProvider,
SimFairShareHypervisorProvider()
)
val duration = 5 * 60L
@@ -233,35 +237,23 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- object : MetricExporter {
- override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- metricsByName["cpu.work.total"]?.let {
- requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
+ ComputeMetricExporter(
+ clock,
+ object : ComputeMonitor {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
}
- metricsByName["cpu.work.granted"]?.let {
- grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
- }
- metricsByName["host.time.total"]?.let {
- totalTime = it.longSumData.points.first().value
- }
- metricsByName["host.time.down"]?.let {
- downTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.total"]?.let {
- guestTotalTime = it.longSumData.points.first().value
- }
- metricsByName["guest.time.error"]?.let {
- guestDownTime = it.longSumData.points.first().value
+
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
}
- return CompletableResultCode.ofSuccess()
}
-
- override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
- },
- exportInterval = duration * 1000L
+ ),
+ exportInterval = Duration.ofSeconds(duration)
)
coroutineScope {
@@ -289,12 +281,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2226039, requestedWork, "Total time does not match") },
- { assertEquals(1086039, grantedWork, "Down time does not match") },
- { assertEquals(1200001, totalTime, "Total time does not match") },
- { assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
- { assertEquals(5000, downTime, "Down time does not match") },
- { assertEquals(5000, guestDownTime, "Guest down time does not match") },
+ { assertEquals(2661, idleTime, "Idle time does not match") },
+ { assertEquals(339, activeTime, "Active time does not match") },
+ { assertEquals(1195001, uptime, "Uptime does not match") },
+ { assertEquals(5000, downtime, "Downtime does not match") },
+ { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
)
}