From 2dd9cc6ded8c47046f4faa1b2dd6aed1085d6644 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Aug 2021 17:38:53 +0200 Subject: feat(compute): Track provisioning response time This change adds a metric for the provisioning time of virtual machines by the compute service. --- opendc-compute/opendc-compute-service/build.gradle.kts | 1 + .../compute/service/internal/ComputeServiceImpl.kt | 17 +++++++++++++++-- .../org/opendc/compute/service/InternalServerTest.kt | 6 +++--- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index e0e48b0f..33cafc45 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index d7a7e8f8..8c043142 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,7 +22,9 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -159,6 +161,15 @@ internal class ComputeServiceImpl( .setUnit("1") .build() + /** + * The response time of the service. + */ + private val _schedulerDuration = meter.histogramBuilder("scheduler.duration") + .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") + .ofLongs() + .setUnit("ms") + .build() + /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ @@ -325,7 +336,7 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - val request = SchedulingRequest(server) + val request = SchedulingRequest(server, clock.millis()) queue.add(request) _submittedServers.add(1) _waitingServers.add(1) @@ -368,6 +379,7 @@ internal class ComputeServiceImpl( * Run a single scheduling iteration. */ private fun doSchedule() { + val now = clock.millis() while (queue.isNotEmpty()) { val request = queue.peek() @@ -402,6 +414,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _waitingServers.add(-1) + _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString())) logger.info { "Assigned server $server to host $host." } @@ -430,7 +443,7 @@ internal class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - internal data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) { /** * A flag to indicate that the request is cancelled. */ diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt index 20ea8d20..28fd8217 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -102,7 +102,7 @@ class InternalServerTest { val image = mockk() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) } + every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) } server.start() @@ -160,7 +160,7 @@ class InternalServerTest { val flavor = mockk() val image = mockk() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request @@ -223,7 +223,7 @@ class InternalServerTest { val flavor = mockk() val image = mockk() val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) - val request = ComputeServiceImpl.SchedulingRequest(server) + val request = ComputeServiceImpl.SchedulingRequest(server, 0) every { service.schedule(any()) } returns request -- cgit v1.2.3 From 9236b3cfb7be1e9d44fe60cbdd699c19c70f6411 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Aug 2021 19:22:34 +0200 Subject: feat(compute): Track host up/down time This change adds new metrics for tracking the up and downtime of hosts due to failures. In addition, this change adds a test to verify whether the metrics are collected correctly. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 55 ++++++++++++++++++++++ .../org/opendc/compute/simulator/SimHostTest.kt | 10 ++++ 2 files changed, 65 insertions(+) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 20e5a9db..e12bd37b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -71,6 +71,11 @@ public class SimHost( */ override val scope: CoroutineScope = CoroutineScope(context + Job()) + /** + * The clock instance used by the host. + */ + private val clock = interpreter.clock + /** * The logger instance of this server. */ @@ -115,6 +120,8 @@ public class SimHost( _cpuDemand.record(cpuDemand) _cpuUsage.record(cpuUsage) _powerUsage.record(machine.powerDraw) + + reportTime() } } ) @@ -221,6 +228,33 @@ public class SimHost( .build() .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + /** + * The amount of time in the system. + */ + private val _totalTime = meter.counterBuilder("host.time.total") + .setDescription("The amount of time in the system") + .setUnit("ms") + .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + + /** + * The uptime of the host. + */ + private val _upTime = meter.counterBuilder("host.time.up") + .setDescription("The uptime of the host") + .setUnit("ms") + .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + + /** + * The downtime of the host. + */ + private val _downTime = meter.counterBuilder("host.time.down") + .setDescription("The downtime of the host") + .setUnit("ms") + .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) + init { // Launch hypervisor onto machine scope.launch { @@ -238,6 +272,24 @@ public class SimHost( } } + private var _lastReport = clock.millis() + + private fun reportTime() { + if (!scope.isActive) + return + + val now = clock.millis() + val duration = now - _lastReport + + _totalTime.add(duration) + when (_state) { + HostState.UP -> _upTime.add(duration) + HostState.DOWN -> _downTime.add(duration) + } + + _lastReport = now + } + override fun canFit(server: Server): Boolean { val sufficientMemory = availableMemory > server.flavor.memorySize val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount @@ -291,6 +343,7 @@ public class SimHost( } override fun close() { + reportTime() scope.cancel() machine.close() } @@ -320,6 +373,7 @@ public class SimHost( } override suspend fun fail() { + reportTime() _state = HostState.DOWN for (guest in guests.values) { guest.fail() @@ -327,6 +381,7 @@ public class SimHost( } override suspend fun recover() { + reportTime() _state = HostState.UP for (guest in guests.values) { guest.start() diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 1ba3a9a1..0a2ced7b 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -189,6 +189,8 @@ internal class SimHostTest { fun testFailure() = runBlockingSimulation { var requestedWork = 0L var grantedWork = 0L + var totalTime = 0L + var downTime = 0L val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -238,6 +240,12 @@ internal class SimHostTest { metricsByName["cpu.work.granted"]?.let { grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() } + metricsByName["host.time.total"]?.let { + totalTime = it.longSumData.points.first().value + } + metricsByName["host.time.down"]?.let { + downTime = it.longSumData.points.first().value + } return CompletableResultCode.ofSuccess() } @@ -275,6 +283,8 @@ internal class SimHostTest { assertAll( { assertEquals(2226039, requestedWork, "Total time does not match") }, { assertEquals(1086039, grantedWork, "Down time does not match") }, + { assertEquals(1200001, totalTime, "Total time does not match") }, + { assertEquals(5000, downTime, "Down time does not match") }, ) } -- cgit v1.2.3 From d097d65851619483a85ce16ab56f61a726bbe756 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 24 Aug 2021 16:55:42 +0200 Subject: fix(compute): Support overcommitted memory in SimHost This change enables host to overcommit their memory when testing whether new servers can fit on the host. --- .../src/main/kotlin/org/opendc/compute/simulator/SimHost.kt | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index e12bd37b..76cc7dfe 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -86,11 +86,6 @@ public class SimHost( */ private val listeners = mutableListOf() - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = model.memory.sumOf { it.size } - /** * The machine to run on. */ @@ -291,7 +286,7 @@ public class SimHost( } override fun canFit(server: Server): Boolean { - val sufficientMemory = availableMemory > server.flavor.memorySize + val sufficientMemory = machine.model.memory.size >= server.flavor.memorySize val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) @@ -469,7 +464,6 @@ public class SimHost( else ServerState.ERROR - availableMemory += server.flavor.memorySize onGuestStop(this) } } -- cgit v1.2.3 From 83d7e45a6b749df0c208e56885402c66e54c4b23 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 24 Aug 2021 17:06:35 +0200 Subject: fix(compute): Start host even if it already exists on host --- .../src/main/kotlin/org/opendc/compute/simulator/SimHost.kt | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 76cc7dfe..4526537d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -294,16 +294,12 @@ public class SimHost( } override suspend fun spawn(server: Server, start: Boolean) { - // Return if the server already exists on this host - if (server in this) { - return + val guest = guests.computeIfAbsent(server) { key -> + require(canFit(key)) { "Server does not fit" } + _guests.add(1) + Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name)) } - require(canFit(server)) { "Server does not fit" } - val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel(), server.name)) - guests[server] = guest - _guests.add(1) - if (start) { guest.start() } -- cgit v1.2.3 From 2c507c6ca4b7d809b410d351f0c1c5c3ddf7bb5c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 14:38:08 +0200 Subject: feat(compute): Track guest up/down time This change updates the SimHost implementation to track the up and downtime of hypervisor guests. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 54 +++++++++++++++++++++- .../org/opendc/compute/simulator/SimHostTest.kt | 10 ++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 4526537d..9a1f05fc 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,6 +22,7 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes @@ -59,7 +60,7 @@ public class SimHost( override val meta: Map, context: CoroutineContext, interpreter: SimResourceInterpreter, - meter: Meter, + private val meter: Meter, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -282,6 +283,11 @@ public class SimHost( HostState.DOWN -> _downTime.add(duration) } + // Track time of guests + for (guest in guests.values) { + guest.reportTime() + } + _lastReport = now } @@ -385,6 +391,33 @@ public class SimHost( private inner class Guest(val server: Server, val machine: SimMachine) { var state: ServerState = ServerState.TERMINATED + /** + * The amount of time in the system. + */ + private val _totalTime = meter.counterBuilder("guest.time.total") + .setDescription("The amount of time in the system") + .setUnit("ms") + .build() + .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + + /** + * The uptime of the guest. + */ + private val _runningTime = meter.counterBuilder("guest.time.running") + .setDescription("The uptime of the guest") + .setUnit("ms") + .build() + .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + + /** + * The time the guest is in an error state. + */ + private val _errorTime = meter.counterBuilder("guest.time.error") + .setDescription("The time the guest is in an error state") + .setUnit("ms") + .build() + .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString())) + suspend fun start() { when (state) { ServerState.TERMINATED, ServerState.ERROR -> { @@ -462,5 +495,24 @@ public class SimHost( onGuestStop(this) } + + private var _lastReport = clock.millis() + + fun reportTime() { + if (state == ServerState.DELETED) + return + + val now = clock.millis() + val duration = now - _lastReport + + _totalTime.add(duration) + when (state) { + ServerState.RUNNING -> _runningTime.add(duration) + ServerState.ERROR -> _errorTime.add(duration) + else -> {} + } + + _lastReport = now + } } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 0a2ced7b..31215e9a 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -191,6 +191,8 @@ internal class SimHostTest { var grantedWork = 0L var totalTime = 0L var downTime = 0L + var guestTotalTime = 0L + var guestDownTime = 0L val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -246,6 +248,12 @@ internal class SimHostTest { metricsByName["host.time.down"]?.let { downTime = it.longSumData.points.first().value } + metricsByName["guest.time.total"]?.let { + guestTotalTime = it.longSumData.points.first().value + } + metricsByName["guest.time.error"]?.let { + guestDownTime = it.longSumData.points.first().value + } return CompletableResultCode.ofSuccess() } @@ -284,7 +292,9 @@ internal class SimHostTest { { assertEquals(2226039, requestedWork, "Total time does not match") }, { assertEquals(1086039, grantedWork, "Down time does not match") }, { assertEquals(1200001, totalTime, "Total time does not match") }, + { assertEquals(1200001, guestTotalTime, "Guest total time does not match") }, { assertEquals(5000, downTime, "Down time does not match") }, + { assertEquals(5000, guestDownTime, "Guest down time does not match") }, ) } -- cgit v1.2.3 From b6a8c642b598bfb2eaaea2a8a7e6ad6702d349b6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 26 Aug 2021 10:29:41 +0200 Subject: fix(compute): Use correct memory size for host memory This change fixes an issue where all servers could not be scheduled due to the memory size of the host being computed incorrectly. --- .../src/main/kotlin/org/opendc/compute/simulator/SimHost.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 9a1f05fc..a4d24740 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -292,8 +292,8 @@ public class SimHost( } override fun canFit(server: Server): Boolean { - val sufficientMemory = machine.model.memory.size >= server.flavor.memorySize - val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount + val sufficientMemory = model.memorySize >= server.flavor.memorySize + val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit -- cgit v1.2.3 From 561f07bd6547c8273627e9c860324ab892ba5fa7 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 26 Aug 2021 10:30:53 +0200 Subject: fix(compute): Do not allow failure of inactive guests This change fixes an issue in SimHost where guests that where inactive were also failed, causing an IllegalStateException. --- .../src/main/kotlin/org/opendc/compute/simulator/SimHost.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index a4d24740..213d20ee 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -452,6 +452,9 @@ public class SimHost( } suspend fun fail() { + if (state != ServerState.RUNNING) { + return + } stop() state = ServerState.ERROR } -- cgit v1.2.3 From 1e8c8ebd2b537d3795022c3222d3e37b6e61e624 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 26 Aug 2021 10:32:07 +0200 Subject: fix(compute): Mark unschedulable server as terminated This change updates the compute service to mark servers that cannot be scheduled as terminated instead of error. Error is instead reserved for cases where the server is in an error state while running. --- .../org/opendc/compute/service/internal/ComputeServiceImpl.kt | 2 +- .../test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 8c043142..f1c055d4 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -402,7 +402,7 @@ internal class ComputeServiceImpl( logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") - server.state = ServerState.ERROR + server.state = ServerState.TERMINATED continue } else { break diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index c6c01ea2..d036ec00 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -170,7 +170,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -183,7 +183,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test @@ -196,7 +196,7 @@ internal class ComputeServiceTest { server.start() delay(5L * 60 * 1000) server.refresh() - assertEquals(ServerState.ERROR, server.state) + assertEquals(ServerState.TERMINATED, server.state) } @Test -- cgit v1.2.3 From 289cd3b6bc9d86b017dbb1ce6c50d346841a0ee6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 26 Aug 2021 10:33:42 +0200 Subject: refactor(capelin): Make ExperimentMonitor optional for trace processing --- .../main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 46e11056..9d23a5dd 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -255,7 +255,7 @@ suspend fun processTrace( reader: TraceReader, scheduler: ComputeService, chan: Channel, - monitor: ExperimentMonitor + monitor: ExperimentMonitor? = null, ) { val client = scheduler.newClient() val image = client.newImage("vm-image") @@ -289,7 +289,7 @@ suspend fun processTrace( suspendCancellableCoroutine { cont -> server.watch(object : ServerWatcher { override fun onStateChanged(server: Server, newState: ServerState) { - monitor.reportVmStateChange(clock.millis(), server, newState) + monitor?.reportVmStateChange(clock.millis(), server, newState) if (newState == ServerState.TERMINATED) { cont.resume(Unit) -- cgit v1.2.3 From befec2f1ddf3a6e6d15d9d1b9fd1ecbbc4f38960 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 26 Aug 2021 10:34:18 +0200 Subject: feat(capelin): Report up/downtime metrics in experiment monitor --- .../capelin/monitor/ExperimentMetricExporter.kt | 40 ++++++++++------------ .../capelin/monitor/ExperimentMonitor.kt | 2 ++ .../capelin/monitor/ParquetExperimentMonitor.kt | 2 ++ .../experiments/capelin/CapelinIntegrationTest.kt | 2 ++ .../org/opendc/web/runner/WebExperimentMonitor.kt | 2 ++ 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 42b7cbb8..79be9ac4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -54,7 +54,6 @@ public class ExperimentMetricExporter( private fun reportHostMetrics(metrics: Collection) { val hostMetrics = mutableMapOf() - hosts.mapValuesTo(hostMetrics) { HostMetrics() } for (metric in metrics) { when (metric.name) { @@ -66,12 +65,15 @@ public class ExperimentMetricExporter( "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } + "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v } + "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v } } } for ((id, hostMetric) in hostMetrics) { val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) - val host = hosts.getValue(id) + val host = hosts[id] ?: continue + monitor.reportHostData( clock.millis(), hostMetric.totalWork - lastHostMetric.totalWork, @@ -82,6 +84,8 @@ public class ExperimentMetricExporter( hostMetric.cpuDemand, hostMetric.powerDraw, hostMetric.instanceCount, + hostMetric.uptime - lastHostMetric.uptime, + hostMetric.downtime - lastHostMetric.downtime, host ) } @@ -92,38 +96,28 @@ public class ExperimentMetricExporter( private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { val points = data.doubleSummaryData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] - val hostMetric = hostMetrics[uid] - - if (hostMetric != null) { - // Take the average of the summary - val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 - block(hostMetric, avg) - } + val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } + val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 + block(hostMetric, avg) } } private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Long) -> Unit) { val points = data?.longSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] - val hostMetric = hostMetrics[uid] - - if (hostMetric != null) { - block(hostMetric, point.value) - } + val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } + block(hostMetric, point.value) } } private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { val points = data?.doubleSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] - val hostMetric = hostMetrics[uid] - - if (hostMetric != null) { - block(hostMetric, point.value) - } + val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } + block(hostMetric, point.value) } } @@ -151,6 +145,8 @@ public class ExperimentMetricExporter( var cpuDemand: Double = 0.0 var instanceCount: Int = 0 var powerDraw: Double = 0.0 + var uptime: Long = 0 + var downtime: Long = 0 } override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 9a4aec35..dc28b816 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -54,6 +54,8 @@ public interface ExperimentMonitor : AutoCloseable { cpuDemand: Double, powerDraw: Double, instanceCount: Int, + uptime: Long, + downtime: Long, host: Host ) {} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 83351c41..c49499da 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -67,6 +67,8 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuDemand: Double, powerDraw: Double, instanceCount: Int, + uptime: Long, + downtime: Long, host: Host ) { hostWriter.write( diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 2934bbe6..24d8f768 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -300,6 +300,8 @@ class CapelinIntegrationTest { cpuDemand: Double, powerDraw: Double, instanceCount: Int, + uptime: Long, + downtime: Long, host: Host, ) { this.totalWork += totalWork.toLong() diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt index 82e2a334..281c8dbb 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt @@ -53,6 +53,8 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuDemand: Double, powerDraw: Double, instanceCount: Int, + uptime: Long, + downtime: Long, host: Host, ) { processHostEvent( -- cgit v1.2.3 From aaedd4f3eed83d0c3ebc829fec08a1749a2bfba4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 27 Aug 2021 16:41:55 +0200 Subject: refactor(capelin): Move metric collection outside Capelin code This change moves the metric collection outside the Capelin codebase in a separate module so other modules can also benefit from the compute metric collection code. --- .../opendc-experiments-capelin/build.gradle.kts | 1 + .../experiments/capelin/ExperimentHelpers.kt | 98 +---------- .../org/opendc/experiments/capelin/Portfolio.kt | 19 +- .../capelin/export/parquet/ParquetDataWriter.kt | 127 ++++++++++++++ .../capelin/export/parquet/ParquetExportMonitor.kt | 55 ++++++ .../export/parquet/ParquetHostDataWriter.kt | 73 ++++++++ .../export/parquet/ParquetServiceDataWriter.kt | 65 +++++++ .../capelin/monitor/ExperimentMetricExporter.kt | 155 ----------------- .../capelin/monitor/ExperimentMonitor.kt | 75 -------- .../capelin/monitor/ParquetExperimentMonitor.kt | 120 ------------- .../opendc/experiments/capelin/telemetry/Event.kt | 35 ---- .../experiments/capelin/telemetry/HostEvent.kt | 43 ----- .../capelin/telemetry/ProvisionerEvent.kt | 39 ----- .../experiments/capelin/telemetry/RunEvent.kt | 34 ---- .../experiments/capelin/telemetry/VmEvent.kt | 41 ----- .../telemetry/parquet/ParquetEventWriter.kt | 126 -------------- .../telemetry/parquet/ParquetHostEventWriter.kt | 81 --------- .../parquet/ParquetProvisionerEventWriter.kt | 65 ------- .../telemetry/parquet/ParquetRunEventWriter.kt | 72 -------- .../experiments/capelin/CapelinIntegrationTest.kt | 92 +++++----- .../opendc-experiments-energy21/build.gradle.kts | 1 + .../experiments/energy21/EnergyExperiment.kt | 18 +- .../opendc-telemetry-compute/build.gradle.kts | 37 ++++ .../telemetry/compute/ComputeMetricExporter.kt | 149 ++++++++++++++++ .../org/opendc/telemetry/compute/ComputeMonitor.kt | 61 +++++++ .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 111 ++++++++++++ .../org/opendc/telemetry/compute/table/HostData.kt | 43 +++++ .../opendc/telemetry/compute/table/ServerData.kt | 35 ++++ .../opendc/telemetry/compute/table/ServiceData.kt | 37 ++++ opendc-web/opendc-web-runner/build.gradle.kts | 1 + .../src/main/kotlin/org/opendc/web/runner/Main.kt | 20 ++- .../org/opendc/web/runner/ScenarioManager.kt | 10 +- .../org/opendc/web/runner/WebComputeMonitor.kt | 145 ++++++++++++++++ .../org/opendc/web/runner/WebExperimentMonitor.kt | 191 --------------------- settings.gradle.kts | 1 + 35 files changed, 1039 insertions(+), 1237 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/build.gradle.kts create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt create mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt delete mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 65cebe1b..1a4caf91 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 9d23a5dd..0230409e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,16 +24,10 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.ReplayScheduler @@ -46,8 +40,6 @@ import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.EnvironmentReader -import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel @@ -57,7 +49,7 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock import kotlin.coroutines.resume @@ -65,11 +57,6 @@ import kotlin.math.ln import kotlin.math.max import kotlin.random.Random -/** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - /** * Construct the failure domain for the experiments. */ @@ -168,85 +155,6 @@ suspend fun withComputeService( } } -/** - * Attach the specified monitor to the VM provisioner. - */ -suspend fun withMonitor( - monitor: ExperimentMonitor, - clock: Clock, - metricProducer: MetricProducer, - scheduler: ComputeService, - block: suspend CoroutineScope.() -> Unit -): Unit = coroutineScope { - // Monitor host events - for (host in scheduler.hosts) { - monitor.reportHostStateChange(clock.millis(), host, HostState.UP) - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, newState: HostState) { - monitor.reportHostStateChange(clock.millis(), host, newState) - } - }) - } - - val reader = CoroutineMetricReader( - this, - listOf(metricProducer), - ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), - exportInterval = 5L * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ - ) - - try { - block(this) - } finally { - reader.close() - monitor.close() - } -} - -class ComputeMetrics { - var submittedVms: Int = 0 - var queuedVms: Int = 0 - var runningVms: Int = 0 - var unscheduledVms: Int = 0 - var finishedVms: Int = 0 - var hosts: Int = 0 - var availableHosts = 0 -} - -/** - * Collect the metrics of the compute service. - */ -fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { - return extractComputeMetrics(metricProducer.collectAllMetrics()) -} - -/** - * Extract an [ComputeMetrics] object from the specified list of metric data. - */ -internal fun extractComputeMetrics(metrics: Collection): ComputeMetrics { - val res = ComputeMetrics() - for (metric in metrics) { - val points = metric.longSumData.points - - if (points.isEmpty()) { - continue - } - - val value = points.first().value.toInt() - when (metric.name) { - "servers.submitted" -> res.submittedVms = value - "servers.waiting" -> res.queuedVms = value - "servers.unscheduled" -> res.unscheduledVms = value - "servers.active" -> res.runningVms = value - "servers.finished" -> res.finishedVms = value - "hosts.total" -> res.hosts = value - "hosts.available" -> res.availableHosts = value - } - } - - return res -} - /** * Process the trace. */ @@ -255,7 +163,7 @@ suspend fun processTrace( reader: TraceReader, scheduler: ComputeService, chan: Channel, - monitor: ExperimentMonitor? = null, + monitor: ComputeMonitor? = null, ) { val client = scheduler.newClient() val image = client.newImage("vm-image") @@ -289,7 +197,7 @@ suspend fun processTrace( suspendCancellableCoroutine { cont -> server.watch(object : ServerWatcher { override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.reportVmStateChange(clock.millis(), server, newState) + monitor?.onStateChange(clock.millis(), server, newState) if (newState == ServerState.TERMINATED) { cont.resume(Unit) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index ee832af8..d3bba182 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -29,11 +29,11 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import mu.KotlinLogging import org.opendc.experiments.capelin.env.ClusterEnvironmentReader +import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -41,6 +41,8 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.io.FileInputStream import java.util.* @@ -127,7 +129,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) - val monitor = ParquetExperimentMonitor( + val monitor = ParquetExportMonitor( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 @@ -148,7 +150,7 @@ abstract class Portfolio(name: String) : Experiment(name) { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -159,9 +161,16 @@ abstract class Portfolio(name: String) : Experiment(name) { } failureDomain?.cancel() + monitor.close() } - val monitorResults = collectMetrics(meterProvider as MetricProducer) - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + logger.debug { + "Finish " + + "SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.activeHostCount}" + } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt new file mode 100644 index 00000000..c5cb80e2 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import mu.KotlinLogging +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.util.parquet.LocalOutputFile +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * A writer that writes data in Parquet format. + */ +public open class ParquetDataWriter( + private val path: File, + private val schema: Schema, + private val converter: (T, GenericData.Record) -> Unit, + private val bufferSize: Int = 4096 +) : Runnable, Closeable { + /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The writer to write the Parquet file. + */ + private val writer = AvroParquetWriter.builder(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + + /** + * The queue of commands to process. + */ + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = thread(start = false, name = this.toString()) { run() } + + /** + * Write the specified metrics to the database. + */ + public fun write(event: T) { + queue.put(Action.Write(event)) + } + + /** + * Signal the writer to stop. + */ + public override fun close() { + queue.put(Action.Stop) + writerThread.join() + } + + init { + writerThread.start() + } + + /** + * Start the writer thread. + */ + override fun run() { + try { + loop@ while (true) { + val action = queue.take() + when (action) { + is Action.Stop -> break@loop + is Action.Write<*> -> { + val record = GenericData.Record(schema) + @Suppress("UNCHECKED_CAST") + converter(action.data as T, record) + writer.write(record) + } + } + } + } catch (e: Throwable) { + logger.error("Writer failed", e) + } finally { + writer.close() + } + } + + public sealed class Action { + /** + * A poison pill that will stop the writer thread. + */ + public object Stop : Action() + + /** + * Write the specified metrics to the database. + */ + public data class Write(val data: T) : Action() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt new file mode 100644 index 00000000..79b84e9d --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(data: HostData) { + hostWriter.write(data) + } + + override fun record(data: ServiceData) { + serviceWriter.write(data) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt new file mode 100644 index 00000000..8912c12e --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.telemetry.compute.table.HostData +import java.io.File + +/** + * A Parquet event writer for [HostData]s. + */ +public class ParquetHostDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "host-writer" + + public companion object { + private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> + record.put("host_id", data.host.name) + record.put("state", data.host.state.name) + record.put("timestamp", data.timestamp) + record.put("total_work", data.totalWork) + record.put("granted_work", data.grantedWork) + record.put("overcommitted_work", data.overcommittedWork) + record.put("interfered_work", data.interferedWork) + record.put("cpu_usage", data.cpuUsage) + record.put("cpu_demand", data.cpuDemand) + record.put("power_draw", data.powerDraw) + record.put("instance_count", data.instanceCount) + record.put("cores", data.host.model.cpuCount) + } + + private val schema: Schema = SchemaBuilder + .record("host") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_id").type().stringType().noDefault() + .name("state").type().stringType().noDefault() + .name("requested_work").type().longType().noDefault() + .name("granted_work").type().longType().noDefault() + .name("overcommitted_work").type().longType().noDefault() + .name("interfered_work").type().longType().noDefault() + .name("cpu_usage").type().doubleType().noDefault() + .name("cpu_demand").type().doubleType().noDefault() + .name("power_draw").type().doubleType().noDefault() + .name("instance_count").type().intType().noDefault() + .name("cores").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt new file mode 100644 index 00000000..36d630f3 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.opendc.telemetry.compute.table.ServiceData +import java.io.File + +/** + * A Parquet event writer for [ServiceData]s. + */ +public class ParquetServiceDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter(path, schema, convert, bufferSize) { + + override fun toString(): String = "service-writer" + + public companion object { + private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> + record.put("timestamp", data.timestamp) + record.put("host_total_count", data.hostCount) + record.put("host_available_count", data.activeHostCount) + record.put("instance_total_count", data.instanceCount) + record.put("instance_active_count", data.runningInstanceCount) + record.put("instance_inactive_count", data.finishedInstanceCount) + record.put("instance_waiting_count", data.queuedInstanceCount) + record.put("instance_failed_count", data.failedInstanceCount) + } + + private val schema: Schema = SchemaBuilder + .record("service") + .namespace("org.opendc.telemetry.compute") + .fields() + .name("timestamp").type().longType().noDefault() + .name("host_total_count").type().intType().noDefault() + .name("host_available_count").type().intType().noDefault() + .name("instance_total_count").type().intType().noDefault() + .name("instance_active_count").type().intType().noDefault() + .name("instance_inactive_count").type().intType().noDefault() + .name("instance_waiting_count").type().intType().noDefault() + .name("instance_failed_count").type().intType().noDefault() + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt deleted file mode 100644 index 79be9ac4..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.driver.Host -import org.opendc.experiments.capelin.extractComputeMetrics -import java.time.Clock - -/** - * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. - */ -public class ExperimentMetricExporter( - private val monitor: ExperimentMonitor, - private val clock: Clock, - private val hosts: Map -) : MetricExporter { - - override fun export(metrics: Collection): CompletableResultCode { - return try { - reportHostMetrics(metrics) - reportProvisionerMetrics(metrics) - CompletableResultCode.ofSuccess() - } catch (e: Throwable) { - CompletableResultCode.ofFailure() - } - } - - private var lastHostMetrics: Map = emptyMap() - private val hostMetricsSingleton = HostMetrics() - - private fun reportHostMetrics(metrics: Collection) { - val hostMetrics = mutableMapOf() - - for (metric in metrics) { - when (metric.name) { - "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } - "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } - "power.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.powerDraw = v } - "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } - "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } - "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } - "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } - "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } - "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v } - "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v } - } - } - - for ((id, hostMetric) in hostMetrics) { - val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) - val host = hosts[id] ?: continue - - monitor.reportHostData( - clock.millis(), - hostMetric.totalWork - lastHostMetric.totalWork, - hostMetric.grantedWork - lastHostMetric.grantedWork, - hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, - hostMetric.interferedWork - lastHostMetric.interferedWork, - hostMetric.cpuUsage, - hostMetric.cpuDemand, - hostMetric.powerDraw, - hostMetric.instanceCount, - hostMetric.uptime - lastHostMetric.uptime, - hostMetric.downtime - lastHostMetric.downtime, - host - ) - } - - lastHostMetrics = hostMetrics - } - - private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { - val points = data.doubleSummaryData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } - val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 - block(hostMetric, avg) - } - } - - private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Long) -> Unit) { - val points = data?.longSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } - block(hostMetric, point.value) - } - } - - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HostMetrics() } - block(hostMetric, point.value) - } - } - - private fun reportProvisionerMetrics(metrics: Collection) { - val res = extractComputeMetrics(metrics) - - monitor.reportServiceData( - clock.millis(), - res.hosts, - res.availableHosts, - res.submittedVms, - res.runningVms, - res.finishedVms, - res.queuedVms, - res.unscheduledVms - ) - } - - private class HostMetrics { - var totalWork: Double = 0.0 - var grantedWork: Double = 0.0 - var overcommittedWork: Double = 0.0 - var interferedWork: Double = 0.0 - var cpuUsage: Double = 0.0 - var cpuDemand: Double = 0.0 - var instanceCount: Int = 0 - var powerDraw: Double = 0.0 - var uptime: Long = 0 - var downtime: Long = 0 - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt deleted file mode 100644 index dc28b816..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState - -/** - * A monitor watches the events of an experiment. - */ -public interface ExperimentMonitor : AutoCloseable { - /** - * This method is invoked when the state of a VM changes. - */ - public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - /** - * This method is invoked when the state of a host changes. - */ - public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} - - /** - * This method is invoked for a host for each slice that is finishes. - */ - public fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host - ) {} - - /** - * This method is invoked for reporting service data. - */ - public fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) {} -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt deleted file mode 100644 index c49499da..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState -import org.opendc.experiments.capelin.telemetry.HostEvent -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent -import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter -import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File - -/** - * The logger instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * An [ExperimentMonitor] that logs the events to a Parquet file. - */ -public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { - private val hostWriter = ParquetHostEventWriter( - File(base, "host-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - private val provisionerWriter = ParquetProvisionerEventWriter( - File(base, "provisioner-metrics/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host - ) { - hostWriter.write( - HostEvent( - time, - 5 * 60 * 1000L, - host, - instanceCount, - totalWork.toLong(), - grantedWork.toLong(), - overcommittedWork.toLong(), - interferedWork.toLong(), - cpuUsage, - cpuDemand, - powerDraw, - host.model.cpuCount - ) - ) - } - - override fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) { - provisionerWriter.write( - ProvisionerEvent( - time, - totalHostCount, - availableHostCount, - totalVmCount, - activeVmCount, - inactiveVmCount, - waitingVmCount, - failedVmCount - ) - ) - } - - override fun close() { - hostWriter.close() - provisionerWriter.close() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt deleted file mode 100644 index c29e116e..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -/** - * An event that occurs within the system. - */ -public abstract class Event(public val name: String) { - /** - * The time of occurrence of this event. - */ - public abstract val timestamp: Long -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt deleted file mode 100644 index 899fc9b1..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.compute.service.driver.Host - -/** - * A periodic report of the host machine metrics. - */ -public data class HostEvent( - override val timestamp: Long, - public val duration: Long, - public val host: Host, - public val vmCount: Int, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val powerDraw: Double, - public val cores: Int -) : Event("host-metrics") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt deleted file mode 100644 index 539c9bc9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -/** - * A periodic report of the provisioner's metrics. - */ -public data class ProvisionerEvent( - override val timestamp: Long, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int -) : Event("provisioner-metrics") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt deleted file mode 100644 index 6c8fc941..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.experiments.capelin.Portfolio - -/** - * A periodic report of the host machine metrics. - */ -public data class RunEvent( - val portfolio: Portfolio, - val repeat: Int, - override val timestamp: Long -) : Event("run") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt deleted file mode 100644 index 7631f55f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.compute.api.Server - -/** - * A periodic report of a virtual machine's metrics. - */ -public data class VmEvent( - override val timestamp: Long, - public val duration: Long, - public val vm: Server, - public val host: Server, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double -) : Event("vm-metrics") diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt deleted file mode 100644 index 897a6692..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.capelin.telemetry.Event -import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * The logging instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * A writer that writes events in Parquet format. - */ -public open class ParquetEventWriter( - private val path: File, - private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { - /** - * The writer to write the Parquet file. - */ - private val writer = AvroParquetWriter.builder(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = thread(start = false, name = "parquet-writer") { run() } - - /** - * Write the specified metrics to the database. - */ - public fun write(event: T) { - queue.put(Action.Write(event)) - } - - /** - * Signal the writer to stop. - */ - public override fun close() { - queue.put(Action.Stop) - writerThread.join() - } - - init { - writerThread.start() - } - - /** - * Start the writer thread. - */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.event as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write(val event: T) : Action() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt deleted file mode 100644 index c8fe1cb2..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.HostEvent -import java.io.File - -/** - * A Parquet event writer for [HostEvent]s. - */ -public class ParquetHostEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "host-writer" - - public companion object { - private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> - // record.put("portfolio_id", event.run.parent.parent.id) - // record.put("scenario_id", event.run.parent.id) - // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) - record.put("timestamp", event.timestamp) - record.put("duration", event.duration) - record.put("vm_count", event.vmCount) - record.put("requested_burst", event.requestedBurst) - record.put("granted_burst", event.grantedBurst) - record.put("overcommissioned_burst", event.overcommissionedBurst) - record.put("interfered_burst", event.interferedBurst) - record.put("cpu_usage", event.cpuUsage) - record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw) - record.put("cores", event.cores) - } - - private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - // .name("portfolio_id").type().intType().noDefault() - // .name("scenario_id").type().intType().noDefault() - // .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("cores").type().intType().noDefault() - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt deleted file mode 100644 index 8feff8d9..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent -import java.io.File - -/** - * A Parquet event writer for [ProvisionerEvent]s. - */ -public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "provisioner-writer" - - public companion object { - private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> - record.put("timestamp", event.timestamp) - record.put("host_total_count", event.totalHostCount) - record.put("host_available_count", event.availableHostCount) - record.put("vm_total_count", event.totalVmCount) - record.put("vm_active_count", event.activeVmCount) - record.put("vm_inactive_count", event.inactiveVmCount) - record.put("vm_waiting_count", event.waitingVmCount) - record.put("vm_failed_count", event.failedVmCount) - } - - private val schema: Schema = SchemaBuilder - .record("provisioner_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt deleted file mode 100644 index 946410eb..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.RunEvent -import java.io.File - -/** - * A Parquet event writer for [RunEvent]s. - */ -public class ParquetRunEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter(path, schema, convert, bufferSize) { - - override fun toString(): String = "run-writer" - - public companion object { - private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> - val portfolio = event.portfolio - record.put("portfolio_name", portfolio.name) - record.put("scenario_id", portfolio.id) - record.put("run_id", event.repeat) - record.put("topology", portfolio.topology.name) - record.put("workload_name", portfolio.workload.name) - record.put("workload_fraction", portfolio.workload.fraction) - record.put("workload_sampler", portfolio.workload.samplingStrategy) - record.put("allocation_policy", portfolio.allocationPolicy) - record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) - record.put("interference", portfolio.operationalPhenomena.hasInterference) - record.put("seed", event.repeat) - } - - private val schema: Schema = SchemaBuilder - .record("runs") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("portfolio_name").type().stringType().noDefault() - .name("scenario_id").type().intType().noDefault() - .name("run_id").type().intType().noDefault() - .name("topology").type().stringType().noDefault() - .name("workload_name").type().stringType().noDefault() - .name("workload_fraction").type().doubleType().noDefault() - .name("workload_sampler").type().stringType().noDefault() - .name("allocation_policy").type().stringType().noDefault() - .name("failure_frequency").type().doubleType().noDefault() - .name("interference").type().booleanType().noDefault() - .name("seed").type().intType().noDefault() - .endRecord() - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 24d8f768..abd8efeb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.util.* @@ -80,7 +82,6 @@ class CapelinIntegrationTest { ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var monitorResults: ComputeMetrics val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> @@ -98,7 +99,7 @@ class CapelinIntegrationTest { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -111,15 +112,21 @@ class CapelinIntegrationTest { failureDomain?.cancel() } - monitorResults = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, - { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, - { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, + { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, + { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -145,7 +152,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -156,8 +163,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -189,7 +202,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -200,8 +213,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -239,7 +258,7 @@ class CapelinIntegrationTest { chan ) - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -252,8 +271,14 @@ class CapelinIntegrationTest { failureDomain.cancel() } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -283,34 +308,19 @@ class CapelinIntegrationTest { return ClusterEnvironmentReader(stream) } - class TestExperimentReporter : ExperimentMonitor { + class TestExperimentReporter : ComputeMonitor { var totalWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L var totalInterferedWork = 0L var totalPowerDraw = 0.0 - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host, - ) { - this.totalWork += totalWork.toLong() - totalGrantedWork += grantedWork.toLong() - totalOvercommittedWork += overcommittedWork.toLong() - totalInterferedWork += interferedWork.toLong() - totalPowerDraw += powerDraw + override fun record(data: HostData) { + this.totalWork += data.totalWork.toLong() + totalGrantedWork += data.grantedWork.toLong() + totalOvercommittedWork += data.overcommittedWork.toLong() + totalInterferedWork += data.interferedWork.toLong() + totalPowerDraw += data.powerDraw } - - override fun close() {} } } diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts index 7d34d098..40ac2967 100644 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcExperiments.opendcExperimentsCapelin) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.kotlin.logging) implementation(libs.config) diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index e64e20a2..02aaab3c 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -37,7 +37,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* -import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor +import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -50,6 +50,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.time.Clock import java.util.* @@ -87,11 +89,11 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { ) val meterProvider: MeterProvider = createMeterProvider(clock) - val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) + val monitor = ParquetExportMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -102,12 +104,12 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { } } - val monitorResults = collectMetrics(meterProvider as MetricProducer) + val monitorResults = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) logger.debug { - "Finish SUBMIT=${monitorResults.submittedVms} " + - "FAIL=${monitorResults.unscheduledVms} " + - "QUEUE=${monitorResults.queuedVms} " + - "RUNNING=${monitorResults.runningVms}" + "Finish SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.runningInstanceCount}" } } diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts new file mode 100644 index 00000000..6a3de9bc --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Telemetry for OpenDC Compute" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTelemetry.opendcTelemetrySdk) + + implementation(projects.opendcCompute.opendcComputeSimulator) + implementation(libs.opentelemetry.semconv) + implementation(libs.kotlin.logging) +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt new file mode 100644 index 00000000..95e7ff9e --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.opendc.compute.service.driver.Host +import org.opendc.telemetry.compute.table.HostData +import java.time.Clock + +/** + * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. + */ +public class ComputeMetricExporter( + private val clock: Clock, + private val hosts: Map, + private val monitor: ComputeMonitor +) : MetricExporter { + + override fun export(metrics: Collection): CompletableResultCode { + return try { + reportHostMetrics(metrics) + reportServiceMetrics(metrics) + CompletableResultCode.ofSuccess() + } catch (e: Throwable) { + CompletableResultCode.ofFailure() + } + } + + private var lastHostMetrics: Map = emptyMap() + private val hostMetricsSingleton = HBuffer() + + private fun reportHostMetrics(metrics: Collection) { + val hostMetrics = mutableMapOf() + + for (metric in metrics) { + when (metric.name) { + "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } + "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } + "power.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.powerDraw = v } + "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } + "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } + "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } + "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } + "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } + "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v } + "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v } + } + } + + for ((id, hostMetric) in hostMetrics) { + val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) + val host = hosts[id] ?: continue + + monitor.record( + HostData( + clock.millis(), + host, + hostMetric.totalWork - lastHostMetric.totalWork, + hostMetric.grantedWork - lastHostMetric.grantedWork, + hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, + hostMetric.interferedWork - lastHostMetric.interferedWork, + hostMetric.cpuUsage, + hostMetric.cpuDemand, + hostMetric.instanceCount, + hostMetric.powerDraw, + hostMetric.uptime - lastHostMetric.uptime, + hostMetric.downtime - lastHostMetric.downtime, + ) + ) + } + + lastHostMetrics = hostMetrics + } + + private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap, block: (HBuffer, Double) -> Unit) { + val points = data.doubleSummaryData?.points ?: emptyList() + for (point in points) { + val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } + val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 + block(hostMetric, avg) + } + } + + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HBuffer, Long) -> Unit) { + val points = data?.longSumData?.points ?: emptyList() + for (point in points) { + val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } + block(hostMetric, point.value) + } + } + + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HBuffer, Double) -> Unit) { + val points = data?.doubleSumData?.points ?: emptyList() + for (point in points) { + val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue + val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } + block(hostMetric, point.value) + } + } + + /** + * A buffer for host metrics before they are reported. + */ + private class HBuffer { + var totalWork: Double = 0.0 + var grantedWork: Double = 0.0 + var overcommittedWork: Double = 0.0 + var interferedWork: Double = 0.0 + var cpuUsage: Double = 0.0 + var cpuDemand: Double = 0.0 + var instanceCount: Int = 0 + var powerDraw: Double = 0.0 + var uptime: Long = 0 + var downtime: Long = 0 + } + + private fun reportServiceMetrics(metrics: Collection) { + monitor.record(extractServiceMetrics(clock.millis(), metrics)) + } + + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt new file mode 100644 index 00000000..ec303b37 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServiceData + +/** + * A monitor that tracks the metrics and events of the OpenDC Compute service. + */ +public interface ComputeMonitor { + /** + * This method is invoked when the state of a [Server] changes. + */ + public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {} + + /** + * This method is invoked when the state of a [Host] changes. + */ + public fun onStateChange(time: Long, host: Host, newState: HostState) {} + + /** + * Record the specified [data]. + */ + public fun record(data: ServerData) {} + + /** + * Record the specified [data]. + */ + public fun record(data: HostData) {} + + /** + * Record the specified [data]. + */ + public fun record(data: ServiceData) {} +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt new file mode 100644 index 00000000..d3d983b9 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostState +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import java.time.Clock + +/** + * Attach the specified monitor to the OpenDC Compute service. + */ +public suspend fun withMonitor( + scheduler: ComputeService, + clock: Clock, + metricProducer: MetricProducer, + monitor: ComputeMonitor, + exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */ + block: suspend CoroutineScope.() -> Unit +): Unit = coroutineScope { + // Monitor host events + for (host in scheduler.hosts) { + monitor.onStateChange(clock.millis(), host, HostState.UP) + host.addListener(object : HostListener { + override fun onStateChanged(host: Host, newState: HostState) { + monitor.onStateChange(clock.millis(), host, newState) + } + }) + } + + val reader = CoroutineMetricReader( + this, + listOf(metricProducer), + ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor), + exportInterval + ) + + try { + block(this) + } finally { + reader.close() + } +} + +/** + * Collect the metrics of the compute service. + */ +public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData { + return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) +} + +/** + * Extract a [ServiceData] object from the specified list of metric data. + */ +public fun extractServiceMetrics(timestamp: Long, metrics: Collection): ServiceData { + var submittedVms = 0 + var queuedVms = 0 + var unscheduledVms = 0 + var runningVms = 0 + var finishedVms = 0 + var hosts = 0 + var availableHosts = 0 + + for (metric in metrics) { + val points = metric.longSumData.points + + if (points.isEmpty()) { + continue + } + + val value = points.first().value.toInt() + when (metric.name) { + "servers.submitted" -> submittedVms = value + "servers.waiting" -> queuedVms = value + "servers.unscheduled" -> unscheduledVms = value + "servers.active" -> runningVms = value + "servers.finished" -> finishedVms = value + "hosts.total" -> hosts = value + "hosts.available" -> availableHosts = value + } + } + + return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms) +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt new file mode 100644 index 00000000..8e6c34d0 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import org.opendc.compute.service.driver.Host + +/** + * A trace entry for a particular host. + */ +public data class HostData( + public val timestamp: Long, + public val host: Host, + public val totalWork: Double, + public val grantedWork: Double, + public val overcommittedWork: Double, + public val interferedWork: Double, + public val cpuUsage: Double, + public val cpuDemand: Double, + public val instanceCount: Int, + public val powerDraw: Double, + public val uptime: Long, + public val downtime: Long, +) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt new file mode 100644 index 00000000..2a9fa8a6 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import org.opendc.compute.api.Server + +/** + * A trace entry for a particular server. + */ +public data class ServerData( + public val timestamp: Long, + public val server: Server, + public val uptime: Long, + public val downtime: Long, +) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt new file mode 100644 index 00000000..f6ff5db5 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + public val timestamp: Long, + public val hostCount: Int, + public val activeHostCount: Int, + public val instanceCount: Int, + public val runningInstanceCount: Int, + public val finishedInstanceCount: Int, + public val queuedInstanceCount: Int, + public val failedInstanceCount: Int +) diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index ec4a4673..99051e8e 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -39,6 +39,7 @@ dependencies { implementation(projects.opendcExperiments.opendcExperimentsCapelin) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.kotlin.logging) implementation(libs.clikt) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 53d50357..65527141 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -47,6 +47,8 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.withMonitor import org.opendc.telemetry.sdk.toOtelClock import org.opendc.web.client.ApiClient import org.opendc.web.client.AuthConfiguration @@ -131,7 +133,7 @@ class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List { + private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List { val id = scenario.id logger.info { "Constructing performance interference model" } @@ -176,8 +178,8 @@ class RunnerCli : CliktCommand(name = "runner") { environment: EnvironmentReader, traceReader: RawParquetTraceReader, interferenceModel: VmInterferenceModel? - ): WebExperimentMonitor.Result { - val monitor = WebExperimentMonitor() + ): WebComputeMonitor.Result { + val monitor = WebComputeMonitor() try { runBlockingSimulation { @@ -220,7 +222,7 @@ class RunnerCli : CliktCommand(name = "runner") { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, trace, @@ -233,8 +235,14 @@ class RunnerCli : CliktCommand(name = "runner") { failureDomain?.cancel() } - val monitorResults = collectMetrics(metricProducer) - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + val monitorResults = collectServiceMetrics(clock.millis(), metricProducer) + logger.debug { + "Finish " + + "SUBMIT=${monitorResults.instanceCount} " + + "FAIL=${monitorResults.failedInstanceCount} " + + "QUEUE=${monitorResults.queuedInstanceCount} " + + "RUNNING=${monitorResults.runningInstanceCount}" + } } } catch (cause: Throwable) { logger.warn(cause) { "Experiment failed" } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt index 4044cec9..e0e3488f 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -61,14 +61,14 @@ public class ScenarioManager(private val client: ApiClient) { /** * Persist the specified results. */ - public suspend fun finish(id: String, results: List) { + public suspend fun finish(id: String, results: List) { client.updateJob( id, SimulationState.FINISHED, mapOf( - "total_requested_burst" to results.map { it.totalRequestedBurst }, - "total_granted_burst" to results.map { it.totalGrantedBurst }, - "total_overcommitted_burst" to results.map { it.totalOvercommittedBurst }, - "total_interfered_burst" to results.map { it.totalInterferedBurst }, + "total_requested_burst" to results.map { it.totalWork }, + "total_granted_burst" to results.map { it.totalGrantedWork }, + "total_overcommitted_burst" to results.map { it.totalOvercommittedWork }, + "total_interfered_burst" to results.map { it.totalInterferedWork }, "mean_cpu_usage" to results.map { it.meanCpuUsage }, "mean_cpu_demand" to results.map { it.meanCpuDemand }, "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt new file mode 100644 index 00000000..c8e58dde --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import mu.KotlinLogging +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServiceData +import kotlin.math.max + +/** + * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. + */ +public class WebComputeMonitor : ComputeMonitor { + private val logger = KotlinLogging.logger {} + + override fun onStateChange(time: Long, host: Host, newState: HostState) { + logger.debug { "Host ${host.uid} changed state $newState [$time]" } + } + + override fun record(data: HostData) { + val duration = 5 * 60 * 1000L + val slices = duration / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalWork + data.totalWork, + hostAggregateMetrics.totalGrantedWork + data.grantedWork, + hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork, + hostAggregateMetrics.totalInterferedWork + data.overcommittedWork, + hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600, + hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0, + hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0 + ) + + hostMetrics.compute(data.host) { _, prev -> + HostMetrics( + (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + data.instanceCount + (prev?.instanceCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap = mutableMapOf() + private val SLICE_LENGTH: Long = 5 * 60 * 1000 + + data class AggregateHostMetrics( + val totalWork: Double = 0.0, + val totalGrantedWork: Double = 0.0, + val totalOvercommittedWork: Double = 0.0, + val totalInterferedWork: Double = 0.0, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Long = 0, + val totalFailureVmSlices: Long = 0, + ) + + data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val instanceCount: Long, + val count: Long + ) + + private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + + override fun record(data: ServiceData) { + serviceMetrics = AggregateServiceMetrics( + max(data.instanceCount, serviceMetrics.vmTotalCount), + max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount), + max(data.runningInstanceCount, serviceMetrics.vmActiveCount), + max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount), + max(data.failedInstanceCount, serviceMetrics.vmFailedCount), + ) + } + + public data class AggregateServiceMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + public fun getResult(): Result { + return Result( + hostAggregateMetrics.totalWork, + hostAggregateMetrics.totalGrantedWork, + hostAggregateMetrics.totalOvercommittedWork, + hostAggregateMetrics.totalInterferedWork, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices, + hostAggregateMetrics.totalFailureVmSlices, + serviceMetrics.vmTotalCount, + serviceMetrics.vmWaitingCount, + serviceMetrics.vmInactiveCount, + serviceMetrics.vmFailedCount, + ) + } + + data class Result( + val totalWork: Double, + val totalGrantedWork: Double, + val totalOvercommittedWork: Double, + val totalInterferedWork: Double, + val meanCpuUsage: Double, + val meanCpuDemand: Double, + val meanNumDeployedImages: Double, + val maxNumDeployedImages: Double, + val totalPowerDraw: Double, + val totalFailureSlices: Long, + val totalFailureVmSlices: Long, + val totalVmsSubmitted: Int, + val totalVmsQueued: Int, + val totalVmsFinished: Int, + val totalVmsFailed: Int + ) +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt deleted file mode 100644 index 281c8dbb..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner - -import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState -import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.telemetry.HostEvent -import kotlin.math.max - -/** - * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat. - */ -public class WebExperimentMonitor : ExperimentMonitor { - private val logger = KotlinLogging.logger {} - - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - uptime: Long, - downtime: Long, - host: Host, - ) { - processHostEvent( - HostEvent( - time, - 5 * 60 * 1000L, - host, - instanceCount, - totalWork.toLong(), - grantedWork.toLong(), - overcommittedWork.toLong(), - interferedWork.toLong(), - cpuUsage, - cpuDemand, - powerDraw, - host.model.cpuCount - ) - ) - } - - private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap = mutableMapOf() - - private fun processHostEvent(event: HostEvent) { - val slices = event.duration / SLICE_LENGTH - - hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalRequestedBurst + event.requestedBurst, - hostAggregateMetrics.totalGrantedBurst + event.grantedBurst, - hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, - hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, - hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600, - hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0, - hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0 - ) - - hostMetrics.compute(event.host) { _, prev -> - HostMetrics( - (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), - event.vmCount + (prev?.vmCount ?: 0), - 1 + (prev?.count ?: 0) - ) - } - } - - private val SLICE_LENGTH: Long = 5 * 60 * 1000 - - public data class AggregateHostMetrics( - val totalRequestedBurst: Long = 0, - val totalGrantedBurst: Long = 0, - val totalOvercommittedBurst: Long = 0, - val totalInterferedBurst: Long = 0, - val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Long = 0, - val totalFailureVmSlices: Long = 0, - ) - - public data class HostMetrics( - val cpuUsage: Double, - val cpuDemand: Double, - val vmCount: Long, - val count: Long - ) - - private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - - override fun reportServiceData( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) { - provisionerMetrics = AggregateProvisionerMetrics( - max(totalVmCount, provisionerMetrics.vmTotalCount), - max(waitingVmCount, provisionerMetrics.vmWaitingCount), - max(activeVmCount, provisionerMetrics.vmActiveCount), - max(inactiveVmCount, provisionerMetrics.vmInactiveCount), - max(failedVmCount, provisionerMetrics.vmFailedCount), - ) - } - - public data class AggregateProvisionerMetrics( - val vmTotalCount: Int = 0, - val vmWaitingCount: Int = 0, - val vmActiveCount: Int = 0, - val vmInactiveCount: Int = 0, - val vmFailedCount: Int = 0 - ) - - override fun close() {} - - public fun getResult(): Result { - return Result( - hostAggregateMetrics.totalRequestedBurst, - hostAggregateMetrics.totalGrantedBurst, - hostAggregateMetrics.totalOvercommittedBurst, - hostAggregateMetrics.totalInterferedBurst, - hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), - hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), - hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(), - hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, - hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices, - hostAggregateMetrics.totalFailureVmSlices, - provisionerMetrics.vmTotalCount, - provisionerMetrics.vmWaitingCount, - provisionerMetrics.vmInactiveCount, - provisionerMetrics.vmFailedCount, - ) - } - - public data class Result( - public val totalRequestedBurst: Long, - public val totalGrantedBurst: Long, - public val totalOvercommittedBurst: Long, - public val totalInterferedBurst: Long, - public val meanCpuUsage: Double, - public val meanCpuDemand: Double, - public val meanNumDeployedImages: Double, - public val maxNumDeployedImages: Double, - public val totalPowerDraw: Double, - public val totalFailureSlices: Long, - public val totalFailureVmSlices: Long, - public val totalVmsSubmitted: Int, - public val totalVmsQueued: Int, - public val totalVmsFinished: Int, - public val totalVmsFailed: Int - ) -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 5cae0a31..cee8887b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -45,6 +45,7 @@ include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") include(":opendc-telemetry:opendc-telemetry-api") include(":opendc-telemetry:opendc-telemetry-sdk") +include(":opendc-telemetry:opendc-telemetry-compute") include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") -- cgit v1.2.3 From 05aeb87093903df3fa00e05353a5f135293fca7e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 2 Sep 2021 13:34:35 +0200 Subject: build(web): Resolve kotlin-reflect incompatibility This change addresses an incompatibility issue with the kotlin-reflect transitive dependency in the opendc-web-runner module. --- opendc-web/opendc-web-runner/build.gradle.kts | 1 + 1 file changed, 1 insertion(+) diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index 99051e8e..bfbb1687 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -48,6 +48,7 @@ dependencies { implementation(libs.ktor.client.auth) implementation(libs.ktor.client.jackson) implementation(libs.jackson.datatype.jsr310) + implementation(kotlin("reflect")) runtimeOnly(libs.log4j.slf4j) -- cgit v1.2.3 From 18ff316a6b6ab984ebf8283ea48ed98ec69d8295 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 2 Sep 2021 13:20:05 +0200 Subject: refactor(capelin): Restructure input reading classes --- .../opendc-experiments-capelin/build.gradle.kts | 5 ++--- .../org/opendc/experiments/capelin/Portfolio.kt | 5 +++-- .../capelin/trace/PerformanceInterferenceReader.kt | 21 ++++++++------------- .../capelin/trace/StreamingParquetTraceReader.kt | 4 ++-- .../experiments/capelin/trace/VmPlacementReader.kt | 19 +++++++------------ .../capelin/trace/sv/SvResourceStateTable.kt | 6 +++--- .../capelin/trace/sv/SvResourceStateTableReader.kt | 4 ++-- .../experiments/capelin/CapelinIntegrationTest.kt | 4 +++- .../trace/PerformanceInterferenceReaderTest.kt | 4 +--- .../src/main/kotlin/org/opendc/web/runner/Main.kt | 3 +-- 10 files changed, 32 insertions(+), 43 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 1a4caf91..b2330af0 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -45,9 +45,8 @@ dependencies { implementation(libs.config) implementation(libs.progressbar) implementation(libs.clikt) - implementation(libs.jackson.module.kotlin) { - exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") - } + implementation(libs.jackson.module.kotlin) + implementation(kotlin("reflect")) implementation(libs.parquet) testImplementation(libs.log4j.slf4j) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index d3bba182..4db04591 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -122,8 +122,9 @@ abstract class Portfolio(name: String) : Experiment(name) { } val performanceInterferenceModel = if (operationalPhenomena.hasInterference) - PerformanceInterferenceReader(FileInputStream(config.getString("interference-model"))) - .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) } + PerformanceInterferenceReader() + .read(FileInputStream(config.getString("interference-model"))) + .let { VmInterferenceModel(it, Random(seeder.nextLong())) } else null diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt index a19f5699..9549af42 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt @@ -31,14 +31,13 @@ import java.io.InputStream /** * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. */ -class PerformanceInterferenceReader( - private val input: InputStream, - private val mapper: ObjectMapper = jacksonObjectMapper() -) : AutoCloseable { +class PerformanceInterferenceReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + init { mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) } @@ -46,12 +45,8 @@ class PerformanceInterferenceReader( /** * Read the performance interface model from the input. */ - fun read(): List { - return mapper.readValue(input) - } - - override fun close() { - input.close() + fun read(input: InputStream): List { + return input.use { mapper.readValue(input) } } private data class GroupMixin( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 61e4cab5..ed82217d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -41,8 +41,6 @@ import java.util.UUID import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread -private val logger = KotlinLogging.logger {} - /** * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. * @@ -50,6 +48,8 @@ private val logger = KotlinLogging.logger {} * @param selectedVms The list of VMs to read from the trace. */ class StreamingParquetTraceReader(traceFile: File, selectedVms: List = emptyList()) : TraceReader { + private val logger = KotlinLogging.logger {} + /** * The internal iterator to use for this reader. */ diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt index 7a1683f0..b55bd577 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt @@ -29,24 +29,19 @@ import java.io.InputStream /** * A parser for the JSON VM placement data files used for the TPDS article on Capelin. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. */ -public class VmPlacementReader( - private val input: InputStream, - private val mapper: ObjectMapper = jacksonObjectMapper() -) : AutoCloseable { +class VmPlacementReader { + /** + * The [ObjectMapper] to parse the placement. + */ + private val mapper = jacksonObjectMapper() + /** * Read the VM placements from the input. */ - public fun read(): Map { + fun read(input: InputStream): Map { return mapper.readValue>(input) .mapKeys { "vm__workload__${it.key}.txt" } .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 } - - override fun close() { - input.close() - } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt index 71c9d52e..24abb109 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -118,9 +118,9 @@ internal class SvResourceStateTable(path: Path) : Table { private fun nextDelegate(): TableReader? { return if (it.hasNext()) { - val (partition, path) = it.next() + val (_, path) = it.next() val reader = path.bufferedReader() - return SvResourceStateTableReader(partition, reader) + return SvResourceStateTableReader(reader) } else { null } @@ -133,7 +133,7 @@ internal class SvResourceStateTable(path: Path) : Table { override fun newReader(partition: String): TableReader { val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } val reader = path.bufferedReader() - return SvResourceStateTableReader(partition, reader) + return SvResourceStateTableReader(reader) } override fun toString(): String = "SvResourceStateTable" diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt index adcdb2ea..1a556f8d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -29,7 +29,7 @@ import java.time.Instant /** * A [TableReader] for the Bitbrains resource state table. */ -internal class SvResourceStateTableReader(partition: String, private val reader: BufferedReader) : TableReader { +internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { /** * The current parser state. */ @@ -57,7 +57,7 @@ internal class SvResourceStateTableReader(partition: String, private val reader: val length = line.length var col = 0 - var start = 0 + var start: Int var end = 0 while (end < length) { diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index abd8efeb..aed9a4bb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -197,7 +197,9 @@ class CapelinIntegrationTest { val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = - PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) } + PerformanceInterferenceReader() + .read(perfInterferenceInput) + .let { VmInterferenceModel(it, Random(seed.toLong())) } val meterProvider = createMeterProvider(clock) diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt index 9b1513dc..fbc39b87 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt @@ -33,9 +33,7 @@ class PerformanceInterferenceReaderTest { @Test fun testSmoke() { val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val reader = PerformanceInterferenceReader(input) - - val result = reader.use { reader.read() } + val result = PerformanceInterferenceReader().read(input) assertAll( { assertEquals(2, result.size) }, diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 65527141..5d481270 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -32,7 +32,6 @@ import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import mu.KotlinLogging -import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.env.MachineDef @@ -152,7 +151,7 @@ class RunnerCli : CliktCommand(name = "runner") { return@let null } - PerformanceInterferenceReader(path.inputStream()).use { reader -> reader.read() } + PerformanceInterferenceReader().read(path.inputStream()) } val targets = portfolio.targets -- cgit v1.2.3