diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-07 16:04:46 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-07 16:04:46 +0200 |
| commit | 3eda751b725448139217dc1929dca1fc354e2a4e (patch) | |
| tree | 11d933753c515140a6ae846fe96448ad64b165aa /opendc-compute | |
| parent | eb4de7f832c6d26725e0d7c29644c704ea82604e (diff) | |
| parent | 18ff316a6b6ab984ebf8283ea48ed98ec69d8295 (diff) | |
merge: Prepare for risk analysis experiments
This pull request adds the necessary code in preparation for the risk analysis experiments:
- Track provisioning time
- Track host up/down time
- Track guest up/down time
- Support overcommitted memory
- Do not fail inactive guests
- Mark unschedulable server as terminated
- Make ExperimentMonitor optional for trace processing
- Report up/downtime metrics in experiment monitor
- Move metric collection outside Capelin code
- Resolve kotlin-reflect incompatibility
- Restructure input reading classes
**Breaking API Changes**
- `ExperimentMonitor` replaced in favour of `ComputeMonitor`
Diffstat (limited to 'opendc-compute')
6 files changed, 160 insertions, 26 deletions
diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index e0e48b0f..33cafc45 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index d7a7e8f8..f1c055d4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,7 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -160,6 +162,15 @@ internal class ComputeServiceImpl( .build() /** + * The response time of the service. + */ + private val _schedulerDuration = meter.histogramBuilder("scheduler.duration") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") + .build() + + /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ private var timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) @@ -325,7 +336,7 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - val request = SchedulingRequest(server) + val request = SchedulingRequest(server, clock.millis()) queue.add(request) _submittedServers.add(1) _waitingServers.add(1) @@ -368,6 +379,7 @@ internal class ComputeServiceImpl( * Run a single scheduling iteration. */ private fun doSchedule() { + val now = clock.millis() while (queue.isNotEmpty()) { val request = queue.peek() @@ -390,7 +402,7 @@ internal class ComputeServiceImpl( logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") - server.state = ServerState.ERROR + server.state = ServerState.TERMINATED continue } else { break @@ -402,6 +414,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _waitingServers.add(-1) + _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString())) logger.info { "Assigned server $server to host $host." } @@ -430,7 +443,7 @@ internal class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - internal data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) { /** * A flag to indicate that the request is cancelled. */ diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index c6c01ea2..d036ec00 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -170,7 +170,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -183,7 +183,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -196,7 +196,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 20ea8d20..28fd8217 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -102,7 +102,7 @@ class InternalServerTest { val image = mockk<InternalImage>() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) } + every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } server.start() @@ -160,7 +160,7 @@ class InternalServerTest { val flavor = mockk<InternalFlavor>() val image = mockk<InternalImage>() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request @@ -223,7 +223,7 @@ class InternalServerTest { val flavor = mockk<InternalFlavor>() val image = mockk<InternalImage>() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 20e5a9db..213d20ee 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,6 +22,7 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes @@ -59,7 +60,7 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, interpreter: SimResourceInterpreter, - meter: Meter, + private val meter: Meter, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -72,6 +73,11 @@ public class SimHost( override val scope: CoroutineScope = CoroutineScope(context + Job()) /** + * The clock instance used by the host. + */ + private val clock = interpreter.clock + + /** * The logger instance of this server. */ private val logger = KotlinLogging.logger {} @@ -82,11 +88,6 @@ public class SimHost( private val listeners = mutableListOf<HostListener>() /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = model.memory.sumOf { it.size } - - /** * The machine to run on. */ public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver) @@ -115,6 +116,8 @@ public class SimHost( _cpuDemand.record(cpuDemand) _cpuUsage.record(cpuUsage) _powerUsage.record(machine.powerDraw) + + reportTime() } } ) @@ -221,6 +224,33 @@ public class SimHost( .build() .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + /** + * The amount of time in the system. + */ + private val _totalTime = meter.counterBuilder("host.time.total") + .setDescription("The amount of time in the system") + .setUnit("ms") + .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + + /** + * The uptime of the host. + */ + private val _upTime = meter.counterBuilder("host.time.up") + .setDescription("The uptime of the host") + .setUnit("ms") + .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + + /** + * The downtime of the host. + */ + private val _downTime = meter.counterBuilder("host.time.down") + .setDescription("The downtime of the host") + .setUnit("ms") + .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + init { // Launch hypervisor onto machine scope.launch { @@ -238,25 +268,44 @@ public class SimHost( } } + private var _lastReport = clock.millis() + + private fun reportTime() { + if (!scope.isActive) + return + + val now = clock.millis() + val duration = now - _lastReport + + _totalTime.add(duration) + when (_state) { + HostState.UP -> _upTime.add(duration) + HostState.DOWN -> _downTime.add(duration) + } + + // Track time of guests + for (guest in guests.values) { + guest.reportTime() + } + + _lastReport = now + } + override fun canFit(server: Server): Boolean { - val sufficientMemory = availableMemory > server.flavor.memorySize - val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount + val sufficientMemory = model.memorySize >= server.flavor.memorySize + val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit } override suspend fun spawn(server: Server, start: Boolean) { - // Return if the server already exists on this host - if (server in this) { - return + val guest = guests.computeIfAbsent(server) { key -> + require(canFit(key)) { "Server does not fit" } + _guests.add(1) + Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name)) } - require(canFit(server)) { "Server does not fit" } - val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel(), server.name)) - guests[server] = guest - _guests.add(1) - if (start) { guest.start() } @@ -291,6 +340,7 @@ public class SimHost( } override fun close() { + reportTime() scope.cancel() machine.close() } @@ -320,6 +370,7 @@ public class SimHost( } override suspend fun fail() { + reportTime() _state = HostState.DOWN for (guest in guests.values) { guest.fail() @@ -327,6 +378,7 @@ public class SimHost( } override suspend fun recover() { + reportTime() _state = HostState.UP for (guest in guests.values) { guest.start() @@ -339,6 +391,33 @@ public class SimHost( private inner class Guest(val server: Server, val machine: SimMachine) { var state: ServerState = ServerState.TERMINATED + /** + * The amount of time in the system. + */ + private val _totalTime = meter.counterBuilder("guest.time.total") + .setDescription("The amount of time in the system") + .setUnit("ms") + .build() + .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + + /** + * The uptime of the guest. + */ + private val _runningTime = meter.counterBuilder("guest.time.running") + .setDescription("The uptime of the guest") + .setUnit("ms") + .build() + .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + + /** + * The time the guest is in an error state. + */ + private val _errorTime = meter.counterBuilder("guest.time.error") + .setDescription("The time the guest is in an error state") + .setUnit("ms") + .build() + .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + suspend fun start() { when (state) { ServerState.TERMINATED, ServerState.ERROR -> { @@ -373,6 +452,9 @@ public class SimHost( } suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } stop() state = ServerState.ERROR } @@ -414,8 +496,26 @@ public class SimHost( else ServerState.ERROR - availableMemory += server.flavor.memorySize onGuestStop(this) } + + private var _lastReport = clock.millis() + + fun reportTime() { + if (state == ServerState.DELETED) + return + + val now = clock.millis() + val duration = now - _lastReport + + _totalTime.add(duration) + when (state) { + ServerState.RUNNING -> _runningTime.add(duration) + ServerState.ERROR -> _errorTime.add(duration) + else -> {} + } + + _lastReport = now + } } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 1ba3a9a1..31215e9a 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -189,6 +189,10 @@ internal class SimHostTest { fun testFailure() = runBlockingSimulation { var requestedWork = 0L var grantedWork = 0L + var totalTime = 0L + var downTime = 0L + var guestTotalTime = 0L + var guestDownTime = 0L val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -238,6 +242,18 @@ internal class SimHostTest { metricsByName["cpu.work.granted"]?.let { grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() } + metricsByName["host.time.total"]?.let { + totalTime = it.longSumData.points.first().value + } + metricsByName["host.time.down"]?.let { + downTime = it.longSumData.points.first().value + } + metricsByName["guest.time.total"]?.let { + guestTotalTime = it.longSumData.points.first().value + } + metricsByName["guest.time.error"]?.let { + guestDownTime = it.longSumData.points.first().value + } return CompletableResultCode.ofSuccess() } @@ -275,6 +291,10 @@ internal class SimHostTest { assertAll( { assertEquals(2226039, requestedWork, "Total time does not match") }, { assertEquals(1086039, grantedWork, "Down time does not match") }, + { assertEquals(1200001, totalTime, "Total time does not match") }, + { assertEquals(1200001, guestTotalTime, "Guest total time does not match") }, + { assertEquals(5000, downTime, "Down time does not match") }, + { assertEquals(5000, guestDownTime, "Guest down time does not match") }, ) } |
