diff options
Diffstat (limited to 'opendc-compute')
10 files changed, 795 insertions, 463 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 1873eb99..2a1fbaa0 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -23,11 +23,13 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -70,16 +72,18 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, scheduler: ComputeScheduler, - schedulingQuantum: Long = 300000, + schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meter, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index f1c055d4..57e70fcd 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,9 +22,10 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -35,6 +36,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -42,15 +44,18 @@ import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param context The [CoroutineContext] to use. - * @param clock The clock instance to keep track of time. + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. + * @param scheduler The scheduler implementation to use. + * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val scheduler: ComputeScheduler, - private val schedulingQuantum: Long + private val schedulingQuantum: Duration ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -63,6 +68,11 @@ internal class ComputeServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] to track metrics of the [ComputeService]. + */ + private val meter = meterProvider.get("org.opendc.compute.service") + + /** * The [Random] instance used to generate unique identifiers for the objects. */ private val random = Random(0) @@ -106,69 +116,37 @@ internal class ComputeServiceImpl( private var maxMemory = 0L /** - * The number of servers that have been submitted to the service for provisioning. - */ - private val _submittedServers = meter.counterBuilder("servers.submitted") - .setDescription("Number of start requests") - .setUnit("1") - .build() - - /** - * The number of servers that failed to be scheduled. - */ - private val _unscheduledServers = meter.counterBuilder("servers.unscheduled") - .setDescription("Number of unscheduled servers") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _waitingServers = meter.upDownCounterBuilder("servers.waiting") - .setDescription("Number of servers waiting to be provisioned") - .setUnit("1") - .build() - - /** - * The number of servers that are waiting to be provisioned. - */ - private val _runningServers = meter.upDownCounterBuilder("servers.active") - .setDescription("Number of servers currently running") - .setUnit("1") - .build() - - /** - * The number of servers that have finished running. + * The number of scheduling attempts. */ - private val _finishedServers = meter.counterBuilder("servers.finished") - .setDescription("Number of servers that finished running") + private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") + .setDescription("Number of scheduling attempts") .setUnit("1") .build() + private val _schedulingAttemptsSuccess = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "success")) + private val _schedulingAttemptsFailure = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "failure")) + private val _schedulingAttemptsError = _schedulingAttempts + .bind(Attributes.of(AttributeKey.stringKey("result"), "error")) /** - * The number of hosts registered at the compute service. + * The response time of the service. */ - private val _hostCount = meter.upDownCounterBuilder("hosts.total") - .setDescription("Number of hosts") - .setUnit("1") + private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") .build() /** - * The number of available hosts registered at the compute service. + * The number of servers that are pending. */ - private val _availableHostCount = meter.upDownCounterBuilder("hosts.available") - .setDescription("Number of available hosts") + private val _servers = meter.upDownCounterBuilder("scheduler.servers") + .setDescription("Number of servers managed by the scheduler") .setUnit("1") .build() - - /** - * The response time of the service. - */ - private val _schedulerDuration = meter.histogramBuilder("scheduler.duration") - .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") - .ofLongs() - .setUnit("ms") - .build() + private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending")) + private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active")) /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -181,6 +159,22 @@ internal class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size + init { + val upState = Attributes.of(AttributeKey.stringKey("state"), "up") + val downState = Attributes.of(AttributeKey.stringKey("state"), "down") + + meter.upDownCounterBuilder("scheduler.hosts") + .setDescription("Number of hosts registered with the scheduler") + .setUnit("1") + .buildWithCallback { result -> + val total = hostCount + val available = availableHosts.size.toLong() + + result.observe(available, upState) + result.observe(total - available, downState) + } + } + override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -308,24 +302,19 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { - _availableHostCount.add(1) availableHosts += hv } scheduler.addHost(hv) - _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { val view = hostToView.remove(host) if (view != null) { - if (availableHosts.remove(view)) { - _availableHostCount.add(-1) - } + availableHosts.remove(view) scheduler.removeHost(view) host.removeListener(this) - _hostCount.add(-1) } } @@ -338,8 +327,7 @@ internal class ComputeServiceImpl( val request = SchedulingRequest(server, clock.millis()) queue.add(request) - _submittedServers.add(1) - _waitingServers.add(1) + _serversPending.add(1) requestSchedulingCycle() return request } @@ -365,10 +353,12 @@ internal class ComputeServiceImpl( return } + val quantum = schedulingQuantum.toMillis() + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. // We calculate here the delay until the next scheduling slot. - val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) + val delay = quantum - (clock.millis() % quantum) timerScheduler.startSingleTimer(Unit, delay) { doSchedule() @@ -385,7 +375,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() - _waitingServers.add(-1) + _serversPending.add(-1) continue } @@ -397,10 +387,10 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() - _waitingServers.add(-1) - _unscheduledServers.add(1) + _serversPending.add(-1) + _schedulingAttemptsFailure.add(1) - logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } server.state = ServerState.TERMINATED continue @@ -413,8 +403,8 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() - _waitingServers.add(-1) - _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString())) + _serversPending.add(-1) + _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -429,12 +419,17 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host + + _serversActive.add(1) + _schedulingAttemptsSuccess.add(1) } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + logger.error(e) { "Failed to deploy VM" } hv.instanceCount-- hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize + + _schedulingAttemptsError.add(1) } } } @@ -453,24 +448,22 @@ internal class ComputeServiceImpl( override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv - _availableHostCount.add(1) } // Re-schedule on the new machine requestSchedulingCycle() } HostState.DOWN -> { - logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } + logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return availableHosts -= hv - _availableHostCount.add(-1) requestSchedulingCycle() } @@ -488,16 +481,12 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.RUNNING) { - _runningServers.add(1) - } else if (newState == ServerState.ERROR) { - _runningServers.add(-1) - } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { - logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - activeServers -= server - _runningServers.add(-1) - _finishedServers.add(1) + if (activeServers.remove(server) != null) { + _serversActive.add(-1) + } val hv = hostToView[host] if (hv != null) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d9d0f3fc..05a7e1bf 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,6 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -50,6 +53,21 @@ internal class InternalServer( private val watchers = mutableListOf<ServerWatcher>() /** + * The attributes of a server. + */ + internal val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, name) + .put(ResourceAttributes.HOST_ID, uid.toString()) + .put(ResourceAttributes.HOST_TYPE, flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) + .build() + + /** * The [Host] that has been assigned to host the server. */ internal var host: Host? = null diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index d036ec00..564f9493 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -61,8 +61,7 @@ internal class ComputeServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - val meter = MeterProvider.noop().get("opendc-compute") - service = ComputeService(scope.coroutineContext, clock, meter, computeScheduler) + service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 28fd8217..dfd3bc67 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -47,8 +47,9 @@ class InternalServerTest { fun testEquality() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) @@ -59,8 +60,8 @@ class InternalServerTest { fun testEqualityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -73,8 +74,8 @@ class InternalServerTest { fun testInequalityWithDifferentType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val b = mockk<Server>(relaxUnitFun = true) @@ -87,8 +88,8 @@ class InternalServerTest { fun testInequalityWithIncorrectType() { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) assertNotEquals(a, Unit) @@ -98,8 +99,8 @@ class InternalServerTest { fun testStartTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } @@ -114,8 +115,8 @@ class InternalServerTest { fun testStartDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -127,8 +128,8 @@ class InternalServerTest { fun testStartProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.PROVISIONING @@ -142,8 +143,8 @@ class InternalServerTest { fun testStartRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.RUNNING @@ -157,8 +158,8 @@ class InternalServerTest { fun testStopProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -175,8 +176,8 @@ class InternalServerTest { fun testStopTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -189,8 +190,8 @@ class InternalServerTest { fun testStopDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -203,8 +204,8 @@ class InternalServerTest { fun testStopRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>() val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -220,8 +221,8 @@ class InternalServerTest { fun testDeleteProvisioningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val request = ComputeServiceImpl.SchedulingRequest(server, 0) @@ -239,8 +240,8 @@ class InternalServerTest { fun testDeleteTerminatedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.TERMINATED @@ -255,8 +256,8 @@ class InternalServerTest { fun testDeleteDeletedServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) server.state = ServerState.DELETED @@ -269,8 +270,8 @@ class InternalServerTest { fun testDeleteRunningServer() = runBlockingSimulation { val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) val uid = UUID.randomUUID() - val flavor = mockk<InternalFlavor>() - val image = mockk<InternalImage>() + val flavor = mockFlavor() + val image = mockImage() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) val host = mockk<Host>(relaxUnitFun = true) @@ -282,4 +283,20 @@ class InternalServerTest { coVerify { host.delete(server) } verify { service.delete(server) } } + + private fun mockFlavor(): InternalFlavor { + val flavor = mockk<InternalFlavor>() + every { flavor.name } returns "c5.large" + every { flavor.uid } returns UUID.randomUUID() + every { flavor.cpuCount } returns 2 + every { flavor.memorySize } returns 4096 + return flavor + } + + private fun mockImage(): InternalImage { + val image = mockk<InternalImage>() + every { image.name } returns "ubuntu-20.04" + every { image.uid } returns UUID.randomUUID() + return image + } } diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index cad051e6..aaf69f78 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -40,5 +40,6 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index a1cc3390..793db907 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -25,13 +25,17 @@ package org.opendc.compute.simulator import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* +import org.opendc.compute.simulator.internal.Guest +import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimHypervisorProvider @@ -43,11 +47,11 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.resources.SimResourceDistributorMaxMin import org.opendc.simulator.resources.SimResourceInterpreter import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException +import kotlin.math.roundToLong /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -59,7 +63,7 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, interpreter: SimResourceInterpreter, - private val meter: Meter, + meterProvider: MeterProvider, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -82,6 +86,11 @@ public class SimHost( private val logger = KotlinLogging.logger {} /** + * The [Meter] to track metrics of the simulated host. + */ + private val meter = meterProvider.get("org.opendc.compute.simulator") + + /** * The event listeners registered with this host. */ private val listeners = mutableListOf<HostListener>() @@ -94,32 +103,29 @@ public class SimHost( /** * The hypervisor to run multiple workloads. */ - public val hypervisor: SimHypervisor = hypervisor.create( + private val hypervisor: SimHypervisor = hypervisor.create( interpreter, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain, listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Double, + totalWork: Double, grantedWork: Double, overcommittedWork: Double, interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { - _totalWork.add(requestedWork) - _grantedWork.add(grantedWork) - _overcommittedWork.add(overcommittedWork) - _interferedWork.add(interferedWork) - _cpuDemand.record(cpuDemand) - _cpuUsage.record(cpuUsage) - _powerUsage.record(machine.powerDraw) - - reportTime() + _cpuDemand = cpuDemand + _cpuUsage = cpuUsage + + collectTime() } } ) + private var _cpuUsage = 0.0 + private var _cpuDemand = 0.0 /** * The virtual machines running on the hypervisor. @@ -139,121 +145,23 @@ public class SimHost( override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) /** - * The total number of guests. + * The [GuestListener] that listens for guest events. */ - private val _guests = meter.upDownCounterBuilder("guests.total") - .setDescription("Number of guests") - .setUnit("1") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The number of active guests on the host. - */ - private val _activeGuests = meter.upDownCounterBuilder("guests.active") - .setDescription("Number of active guests") - .setUnit("1") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The CPU demand of the host. - */ - private val _cpuDemand = meter.histogramBuilder("cpu.demand") - .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The CPU usage of the host. - */ - private val _cpuUsage = meter.histogramBuilder("cpu.usage") - .setDescription("The amount of CPU resources used by the host") - .setUnit("MHz") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The power usage of the host. - */ - private val _powerUsage = meter.histogramBuilder("power.usage") - .setDescription("The amount of power used by the CPU") - .setUnit("W") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The total amount of work supplied to the CPU. - */ - private val _totalWork = meter.counterBuilder("cpu.work.total") - .setDescription("The amount of work supplied to the CPU") - .setUnit("1") - .ofDoubles() - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The work performed by the CPU. - */ - private val _grantedWork = meter.counterBuilder("cpu.work.granted") - .setDescription("The amount of work performed by the CPU") - .setUnit("1") - .ofDoubles() - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The amount not performed by the CPU due to overcommitment. - */ - private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit") - .setDescription("The amount of work not performed by the CPU due to overcommitment") - .setUnit("1") - .ofDoubles() - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The amount of work not performed by the CPU due to interference. - */ - private val _interferedWork = meter.counterBuilder("cpu.work.interference") - .setDescription("The amount of work not performed by the CPU due to interference") - .setUnit("1") - .ofDoubles() - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The amount of time in the system. - */ - private val _totalTime = meter.counterBuilder("host.time.total") - .setDescription("The amount of time in the system") - .setUnit("ms") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) - - /** - * The uptime of the host. - */ - private val _upTime = meter.counterBuilder("host.time.up") - .setDescription("The uptime of the host") - .setUnit("ms") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + private val guestListener = object : GuestListener { + override fun onStart(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } - /** - * The downtime of the host. - */ - private val _downTime = meter.counterBuilder("host.time.down") - .setDescription("The downtime of the host") - .setUnit("ms") - .build() - .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + override fun onStop(guest: Guest) { + listeners.forEach { it.onStateChanged(this@SimHost, guest.server, guest.state) } + } + } init { // Launch hypervisor onto machine scope.launch { try { + _bootTime = clock.millis() _state = HostState.UP machine.run(this@SimHost.hypervisor, emptyMap()) } catch (_: CancellationException) { @@ -265,29 +173,48 @@ public class SimHost( _state = HostState.DOWN } } - } - - private var _lastReport = clock.millis() - - private fun reportTime() { - if (!scope.isActive) - return - - val now = clock.millis() - val duration = now - _lastReport - - _totalTime.add(duration) - when (_state) { - HostState.UP -> _upTime.add(duration) - HostState.DOWN -> _downTime.add(duration) - } - - // Track time of guests - for (guest in guests.values) { - guest.reportTime() - } - _lastReport = now + meter.upDownCounterBuilder("system.guests") + .setDescription("Number of guests on this host") + .setUnit("1") + .buildWithCallback(::collectGuests) + meter.gaugeBuilder("system.cpu.limit") + .setDescription("Amount of CPU resources available to the host") + .buildWithCallback(::collectCpuLimit) + meter.gaugeBuilder("system.cpu.demand") + .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(_cpuDemand) } + meter.gaugeBuilder("system.cpu.usage") + .setDescription("Amount of CPU resources used by the host") + .setUnit("MHz") + .buildWithCallback { result -> result.observe(_cpuUsage) } + meter.gaugeBuilder("system.cpu.utilization") + .setDescription("Utilization of the CPU resources of the host") + .setUnit("%") + .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) } + meter.counterBuilder("system.cpu.time") + .setDescription("Amount of CPU time spent by the host") + .setUnit("s") + .buildWithCallback(::collectCpuTime) + meter.gaugeBuilder("system.power.usage") + .setDescription("Power usage of the host ") + .setUnit("W") + .buildWithCallback { result -> result.observe(machine.powerDraw) } + meter.counterBuilder("system.power.total") + .setDescription("Amount of energy used by the CPU") + .setUnit("J") + .ofDoubles() + .buildWithCallback(::collectPowerTotal) + meter.counterBuilder("system.time") + .setDescription("The uptime of the host") + .setUnit("s") + .buildWithCallback(::collectTime) + meter.gaugeBuilder("system.time.boot") + .setDescription("The boot time of the host") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { @@ -301,8 +228,17 @@ public class SimHost( override suspend fun spawn(server: Server, start: Boolean) { val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - _guests.add(1) - Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name)) + + val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + Guest( + scope.coroutineContext, + clock, + this, + mapper, + guestListener, + server, + machine + ) } if (start) { @@ -326,7 +262,6 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return - _guests.add(-1) guest.terminate() } @@ -339,7 +274,7 @@ public class SimHost( } override fun close() { - reportTime() + reset() scope.cancel() machine.close() } @@ -347,22 +282,35 @@ public class SimHost( override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" public suspend fun fail() { - reportTime() - _state = HostState.DOWN + reset() + for (guest in guests.values) { guest.fail() } } public suspend fun recover() { - reportTime() + collectTime() _state = HostState.UP + _bootTime = clock.millis() + for (guest in guests.values) { guest.start() } } /** + * Reset the machine. + */ + private fun reset() { + collectTime() + + _state = HostState.DOWN + _cpuUsage = 0.0 + _cpuDemand = 0.0 + } + + /** * Convert flavor to machine model. */ private fun Flavor.toMachineModel(): MachineModel { @@ -374,147 +322,168 @@ public class SimHost( return MachineModel(processingUnits, memoryUnits) } - private fun onGuestStart(vm: Guest) { - _activeGuests.add(1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - } + private val STATE_KEY = AttributeKey.stringKey("state") - private fun onGuestStop(vm: Guest) { - _activeGuests.add(-1) - listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - } + private val terminatedState = Attributes.of(STATE_KEY, "terminated") + private val runningState = Attributes.of(STATE_KEY, "running") + private val errorState = Attributes.of(STATE_KEY, "error") + private val invalidState = Attributes.of(STATE_KEY, "invalid") /** - * A virtual machine instance that the driver manages. + * Helper function to collect the guest counts on this host. */ - private inner class Guest(val server: Server, val machine: SimMachine) { - var state: ServerState = ServerState.TERMINATED - - /** - * The amount of time in the system. - */ - private val _totalTime = meter.counterBuilder("guest.time.total") - .setDescription("The amount of time in the system") - .setUnit("ms") - .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) - - /** - * The uptime of the guest. - */ - private val _runningTime = meter.counterBuilder("guest.time.running") - .setDescription("The uptime of the guest") - .setUnit("ms") - .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) - - /** - * The time the guest is in an error state. - */ - private val _errorTime = meter.counterBuilder("guest.time.error") - .setDescription("The time the guest is in an error state") - .setUnit("ms") - .build() - .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) - - suspend fun start() { - when (state) { - ServerState.TERMINATED, ServerState.ERROR -> { - logger.info { "User requested to start server ${server.uid}" } - launch() - } - ServerState.RUNNING -> return - ServerState.DELETED -> { - logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") - } - else -> assert(false) { "Invalid state transition" } + private fun collectGuests(result: ObservableLongMeasurement) { + var terminated = 0L + var running = 0L + var error = 0L + var invalid = 0L + + for ((_, guest) in guests) { + when (guest.state) { + ServerState.TERMINATED -> terminated++ + ServerState.RUNNING -> running++ + ServerState.ERROR -> error++ + else -> invalid++ } } - suspend fun stop() { - when (state) { - ServerState.RUNNING, ServerState.ERROR -> { - val job = job ?: throw IllegalStateException("Server should be active") - job.cancel() - job.join() - } - ServerState.TERMINATED, ServerState.DELETED -> return - else -> assert(false) { "Invalid state transition" } - } - } + result.observe(terminated, terminatedState) + result.observe(running, runningState) + result.observe(error, errorState) + result.observe(invalid, invalidState) + } + + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } - suspend fun terminate() { - stop() - machine.close() - state = ServerState.DELETED + /** + * Helper function to collect the CPU limits of a machine. + */ + private fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit) + + for (guest in guests.values) { + guest.collectCpuLimit(result) } + } - suspend fun fail() { - if (state != ServerState.RUNNING) { - return - } - stop() - state = ServerState.ERROR + private var _lastCpuTimeCallback = clock.millis() + + /** + * Helper function to track the CPU time of a machine. + */ + private fun collectCpuTime(result: ObservableLongMeasurement) { + val now = clock.millis() + val duration = now - _lastCpuTimeCallback + + try { + collectCpuTime(duration, result) + } finally { + _lastCpuTimeCallback = now } + } - private var job: Job? = null - - private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> - assert(job == null) { "Concurrent job running" } - val workload = mapper.createWorkload(server) - - job = scope.launch { - try { - delay(1) // TODO Introduce boot time - init() - cont.resume(Unit) - } catch (e: Throwable) { - cont.resumeWithException(e) - } - try { - machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - job = null - } - } + private val _activeState = Attributes.of(STATE_KEY, "active") + private val _stealState = Attributes.of(STATE_KEY, "steal") + private val _lostState = Attributes.of(STATE_KEY, "lost") + private val _idleState = Attributes.of(STATE_KEY, "idle") + private var _totalTime = 0.0 + + /** + * Helper function to track the CPU time of a machine. + */ + private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { + val coreCount = this.model.cpuCount + val d = coreCount / _cpuLimit + + val counters = hypervisor.counters + val grantedWork = counters.actual + val overcommittedWork = counters.overcommit + val interferedWork = (counters as? SimResourceDistributorMaxMin.Counters)?.interference ?: 0.0 + + _totalTime += (duration / 1000.0) * coreCount + val activeTime = (grantedWork * d).roundToLong() + val idleTime = (_totalTime - grantedWork * d).roundToLong() + val stealTime = (overcommittedWork * d).roundToLong() + val lostTime = (interferedWork * d).roundToLong() + + result.observe(activeTime, _activeState) + result.observe(idleTime, _idleState) + result.observe(stealTime, _stealState) + result.observe(lostTime, _lostState) + + for (guest in guests.values) { + guest.collectCpuTime(duration, result) } + } - private fun init() { - state = ServerState.RUNNING - onGuestStart(this) + private var _lastPowerCallback = clock.millis() + private var _totalPower = 0.0 + + /** + * Helper function to collect the total power usage of the machine. + */ + private fun collectPowerTotal(result: ObservableDoubleMeasurement) { + val now = clock.millis() + val duration = now - _lastPowerCallback + + _totalPower += duration / 1000.0 * machine.powerDraw + result.observe(_totalPower) + + _lastPowerCallback = now + } + + private var _lastReport = clock.millis() + + /** + * Helper function to track the uptime of a machine. + */ + private fun collectTime(result: ObservableLongMeasurement? = null) { + val now = clock.millis() + val duration = now - _lastReport + + try { + collectTime(duration, result) + } finally { + _lastReport = now } + } - private fun exit(cause: Throwable?) { - state = - if (cause == null) - ServerState.TERMINATED - else - ServerState.ERROR + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.of(STATE_KEY, "up") + private val _downState = Attributes.of(STATE_KEY, "down") - onGuestStop(this) + /** + * Helper function to track the uptime of a machine. + */ + private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) { + if (state == HostState.UP) { + _uptime += duration + } else if (state == HostState.DOWN && scope.isActive) { + // Only increment downtime if the machine is in a failure state + _downtime += duration } - private var _lastReport = clock.millis() + result?.observe(_uptime, _upState) + result?.observe(_downtime, _downState) - fun reportTime() { - if (state == ServerState.DELETED) - return + for (guest in guests.values) { + guest.collectUptime(duration, result) + } + } - val now = clock.millis() - val duration = now - _lastReport + private var _bootTime = Long.MIN_VALUE - _totalTime.add(duration) - when (state) { - ServerState.RUNNING -> _runningTime.add(duration) - ServerState.ERROR -> _errorTime.add(duration) - else -> {} - } + /** + * Helper function to track the boot time of a machine. + */ + private fun collectBootTime(result: ObservableLongMeasurement? = null) { + if (_bootTime != Long.MIN_VALUE) { + result?.observe(_bootTime) + } - _lastReport = now + for (guest in guests.values) { + guest.collectBootTime(result) } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt new file mode 100644 index 00000000..90562e2f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement +import io.opentelemetry.api.metrics.ObservableLongMeasurement +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.* +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.SimAbstractMachine +import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.coroutines.CoroutineContext +import kotlin.math.roundToLong + +/** + * A virtual machine instance that is managed by a [SimHost]. + */ +internal class Guest( + context: CoroutineContext, + private val clock: Clock, + val host: SimHost, + private val mapper: SimWorkloadMapper, + private val listener: GuestListener, + val server: Server, + val machine: SimMachine +) { + /** + * The [CoroutineScope] of the guest. + */ + private val scope: CoroutineScope = CoroutineScope(context + Job()) + + /** + * The logger instance of this guest. + */ + private val logger = KotlinLogging.logger {} + + /** + * The state of the [Guest]. + * + * [ServerState.PROVISIONING] is an invalid value for a guest, since it applies before the host is selected for + * a server. + */ + var state: ServerState = ServerState.TERMINATED + + /** + * The attributes of the guest. + */ + val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.HOST_NAME, server.name) + .put(ResourceAttributes.HOST_ID, server.uid.toString()) + .put(ResourceAttributes.HOST_TYPE, server.flavor.name) + .put(AttributeKey.longKey("host.num_cpus"), server.flavor.cpuCount.toLong()) + .put(AttributeKey.longKey("host.mem_capacity"), server.flavor.memorySize) + .put(AttributeKey.stringArrayKey("host.labels"), server.labels.map { (k, v) -> "$k:$v" }) + .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(ResourceAttributes.HOST_IMAGE_NAME, server.image.name) + .put(ResourceAttributes.HOST_IMAGE_ID, server.image.uid.toString()) + .build() + + /** + * Start the guest. + */ + suspend fun start() { + when (state) { + ServerState.TERMINATED, ServerState.ERROR -> { + logger.info { "User requested to start server ${server.uid}" } + doStart() + } + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Stop the guest. + */ + suspend fun stop() { + when (state) { + ServerState.RUNNING -> doStop(ServerState.TERMINATED) + ServerState.ERROR -> doRecover() + ServerState.TERMINATED, ServerState.DELETED -> return + else -> assert(false) { "Invalid state transition" } + } + } + + /** + * Terminate the guest. + * + * This operation will stop the guest if it is running on the host and remove all resources associated with the + * guest. + */ + suspend fun terminate() { + stop() + + state = ServerState.DELETED + + machine.close() + scope.cancel() + } + + /** + * Fail the guest if it is active. + * + * This operation forcibly stops the guest and puts the server into an error state. + */ + suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } + + doStop(ServerState.ERROR) + } + + /** + * The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active. + */ + private var job: Job? = null + + /** + * Launch the guest on the simulated + */ + private suspend fun doStart() { + assert(job == null) { "Concurrent job running" } + val workload = mapper.createWorkload(server) + + val job = scope.launch { runMachine(workload) } + this.job = job + + state = ServerState.RUNNING + onStart() + + job.invokeOnCompletion { cause -> + this.job = null + onStop(if (cause != null && cause !is CancellationException) ServerState.ERROR else ServerState.TERMINATED) + } + } + + /** + * Attempt to stop the server and put it into [target] state. + */ + private suspend fun doStop(target: ServerState) { + assert(job != null) { "Invalid job state" } + val job = job ?: return + job.cancel() + job.join() + + state = target + } + + /** + * Attempt to recover from an error state. + */ + private fun doRecover() { + state = ServerState.TERMINATED + } + + /** + * Run the process that models the virtual machine lifecycle as a coroutine. + */ + private suspend fun runMachine(workload: SimWorkload) { + delay(1) // TODO Introduce model for boot time + machine.run(workload, mapOf("driver" to host, "server" to server)) + } + + /** + * This method is invoked when the guest was started on the host and has booted into a running state. + */ + private fun onStart() { + _bootTime = clock.millis() + state = ServerState.RUNNING + listener.onStart(this) + } + + /** + * This method is invoked when the guest stopped. + */ + private fun onStop(target: ServerState) { + state = target + listener.onStop(this) + } + + private val STATE_KEY = AttributeKey.stringKey("state") + + private var _uptime = 0L + private var _downtime = 0L + private val _upState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "up") + .build() + private val _downState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "down") + .build() + + /** + * Helper function to track the uptime of the guest. + */ + fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) { + if (state == ServerState.RUNNING) { + _uptime += duration + } else if (state == ServerState.ERROR) { + _downtime += duration + } + + result?.observe(_uptime, _upState) + result?.observe(_downtime, _downState) + } + + private var _bootTime = Long.MIN_VALUE + + /** + * Helper function to track the boot time of the guest. + */ + fun collectBootTime(result: ObservableLongMeasurement? = null) { + if (_bootTime != Long.MIN_VALUE) { + result?.observe(_bootTime) + } + } + + private val _activeState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "active") + .build() + private val _stealState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "steal") + .build() + private val _lostState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "lost") + .build() + private val _idleState = Attributes.builder() + .putAll(attributes) + .put(STATE_KEY, "idle") + .build() + private var _totalTime = 0.0 + + /** + * Helper function to track the CPU time of a machine. + */ + fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { + val coreCount = server.flavor.cpuCount + val d = coreCount / _cpuLimit + + var grantedWork = 0.0 + var overcommittedWork = 0.0 + + for (cpu in (machine as SimAbstractMachine).cpus) { + val counters = cpu.counters + grantedWork += counters.actual + overcommittedWork += counters.overcommit + } + + _totalTime += (duration / 1000.0) * coreCount + val activeTime = (grantedWork * d).roundToLong() + val idleTime = (_totalTime - grantedWork * d).roundToLong() + val stealTime = (overcommittedWork * d).roundToLong() + + result.observe(activeTime, _activeState) + result.observe(idleTime, _idleState) + result.observe(stealTime, _stealState) + result.observe(0, _lostState) + } + + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } + + /** + * Helper function to collect the CPU limits of a machine. + */ + fun collectCpuLimit(result: ObservableDoubleMeasurement) { + result.observe(_cpuLimit, attributes) + } +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt new file mode 100644 index 00000000..e6d0fdad --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/GuestListener.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.internal + +/** + * Helper interface to listen for [Guest] events. + */ +internal interface GuestListener { + /** + * This method is invoked when the guest machine is running. + */ + fun onStart(guest: Guest) + + /** + * This method is invoked when the guest machine is stopped. + */ + fun onStop(guest: Guest) +} diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 31215e9a..9c879e5e 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -23,11 +23,9 @@ package org.opendc.compute.simulator import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -44,15 +42,20 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.HOST_ID +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock +import java.time.Duration import java.util.* import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { private lateinit var machineModel: MachineModel @@ -71,24 +74,29 @@ internal class SimHostTest { */ @Test fun testOvercommitted() = runBlockingSimulation { - var requestedWork = 0L - var grantedWork = 0L - var overcommittedWork = 0L + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) val virtDriver = SimHost( - uid = UUID.randomUUID(), + uid = hostId, name = "test", model = machineModel, meta = emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + meterProvider, SimFairShareHypervisorProvider() ) val duration = 5 * 60L @@ -130,26 +138,17 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - metricsByName["cpu.work.total"]?.let { - requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["cpu.work.granted"]?.let { - grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() + ComputeMetricExporter( + clock, + object : ComputeMonitor { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + stealTime += data.cpuStealTime } - metricsByName["cpu.work.overcommit"]?.let { - overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - return CompletableResultCode.ofSuccess() } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, - exportInterval = duration * 1000L + ), + exportInterval = Duration.ofSeconds(duration) ) coroutineScope { @@ -175,9 +174,9 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(4147200, requestedWork, "Requested work does not match") }, - { assertEquals(2107200, grantedWork, "Granted work does not match") }, - { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(659, activeTime, "Active time does not match") }, + { assertEquals(2342, idleTime, "Idle time does not match") }, + { assertEquals(638, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } @@ -187,27 +186,32 @@ internal class SimHostTest { */ @Test fun testFailure() = runBlockingSimulation { - var requestedWork = 0L - var grantedWork = 0L - var totalTime = 0L - var downTime = 0L - var guestTotalTime = 0L - var guestDownTime = 0L - + var activeTime = 0L + var idleTime = 0L + var uptime = 0L + var downtime = 0L + var guestUptime = 0L + var guestDowntime = 0L + + val hostId = UUID.randomUUID() + val hostResource = Resource.builder() + .put(HOST_ID, hostId.toString()) + .build() val meterProvider: MeterProvider = SdkMeterProvider .builder() + .setResource(hostResource) .setClock(clock.toOtelClock()) .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) val host = SimHost( - uid = UUID.randomUUID(), + uid = hostId, name = "test", model = machineModel, meta = emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + meterProvider, SimFairShareHypervisorProvider() ) val duration = 5 * 60L @@ -233,35 +237,23 @@ internal class SimHostTest { // Setup metric reader val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), - object : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - metricsByName["cpu.work.total"]?.let { - requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() + ComputeMetricExporter( + clock, + object : ComputeMonitor { + override fun record(data: HostData) { + activeTime += data.cpuActiveTime + idleTime += data.cpuIdleTime + uptime += data.uptime + downtime += data.downtime } - metricsByName["cpu.work.granted"]?.let { - grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() - } - metricsByName["host.time.total"]?.let { - totalTime = it.longSumData.points.first().value - } - metricsByName["host.time.down"]?.let { - downTime = it.longSumData.points.first().value - } - metricsByName["guest.time.total"]?.let { - guestTotalTime = it.longSumData.points.first().value - } - metricsByName["guest.time.error"]?.let { - guestDownTime = it.longSumData.points.first().value + + override fun record(data: ServerData) { + guestUptime += data.uptime + guestDowntime += data.downtime } - return CompletableResultCode.ofSuccess() } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, - exportInterval = duration * 1000L + ), + exportInterval = Duration.ofSeconds(duration) ) coroutineScope { @@ -289,12 +281,12 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(2226039, requestedWork, "Total time does not match") }, - { assertEquals(1086039, grantedWork, "Down time does not match") }, - { assertEquals(1200001, totalTime, "Total time does not match") }, - { assertEquals(1200001, guestTotalTime, "Guest total time does not match") }, - { assertEquals(5000, downTime, "Down time does not match") }, - { assertEquals(5000, guestDownTime, "Guest down time does not match") }, + { assertEquals(2661, idleTime, "Idle time does not match") }, + { assertEquals(339, activeTime, "Active time does not match") }, + { assertEquals(1195001, uptime, "Uptime does not match") }, + { assertEquals(5000, downtime, "Downtime does not match") }, + { assertEquals(1195000, guestUptime, "Guest uptime does not match") }, + { assertEquals(5000, guestDowntime, "Guest downtime does not match") }, ) } |
