From 6a1aea440c3066edc2ea6b79a65adb5313f4dd48 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 12 Oct 2021 13:25:41 +0200 Subject: feat(compute): Support filtering hosts based on CPU capacity This change allows users to create servers with a smaller CPU capacity than the host, by specifying the CPU capacity via metadata. This also allows filtering hosts based on their available CPU capacity. --- .../org/opendc/compute/service/driver/HostModel.kt | 9 ++- .../compute/service/internal/ComputeServiceImpl.kt | 2 +- .../opendc/compute/service/internal/HostView.kt | 2 +- .../compute/service/scheduler/filters/RamFilter.kt | 2 +- .../scheduler/filters/VCpuCapacityFilter.kt | 40 +++++++++++++ .../scheduler/weights/VCpuCapacityWeigher.kt | 40 +++++++++++++ .../opendc/compute/service/ComputeServiceTest.kt | 16 +++--- .../service/scheduler/FilterSchedulerTest.kt | 65 +++++++++++++++------- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 7 ++- .../compute/workload/ComputeWorkloadLoader.kt | 13 +++-- .../compute/workload/ComputeWorkloadRunner.kt | 3 +- .../org/opendc/compute/workload/VirtualMachine.kt | 4 +- 12 files changed, 160 insertions(+), 43 deletions(-) create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt create mode 100644 opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt index fc092a3f..f3b94e3d 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -25,7 +25,12 @@ package org.opendc.compute.service.driver /** * Describes the static machine properties of the host. * + * @property cpuCapacity The total CPU capacity of the host in MHz. * @property cpuCount The number of logical processing cores available for this host. - * @property memorySize The amount of memory available for this host in MB. + * @property memoryCapacity The amount of memory available for this host in MB. */ -public data class HostModel(public val cpuCount: Int, public val memorySize: Long) +public data class HostModel( + public val cpuCapacity: Double, + public val cpuCount: Int, + public val memoryCapacity: Long +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 57e70fcd..2a07a208 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -298,7 +298,7 @@ internal class ComputeServiceImpl( val hv = HostView(host) maxCores = max(maxCores, host.model.cpuCount) - maxMemory = max(maxMemory, host.model.memorySize) + maxMemory = max(maxMemory, host.model.memoryCapacity) hostToView[host] = hv if (host.state == HostState.UP) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index e2f33f11..0876209a 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -37,7 +37,7 @@ public class HostView(public val host: Host) { get() = host.uid public var instanceCount: Int = 0 - public var availableMemory: Long = host.model.memorySize + public var availableMemory: Long = host.model.memoryCapacity public var provisionedCores: Int = 0 override fun toString(): String = "HostView[host=$host]" diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt index a470a453..8a7a646c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -34,7 +34,7 @@ public class RamFilter(private val allocationRatio: Double) : HostFilter { override fun test(host: HostView, server: Server): Boolean { val requested = server.flavor.memorySize val available = host.availableMemory - val total = host.host.model.memorySize + val total = host.host.model.memoryCapacity // Do not allow an instance to overcommit against itself, only against // other instances. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt new file mode 100644 index 00000000..791710c8 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.filters + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [Server] and the available + * capacity on the host. + */ +public class VCpuCapacityFilter : HostFilter { + override fun test(host: HostView, server: Server): Boolean { + val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double + val hostModel = host.host.model + val availableCapacity = hostModel.cpuCapacity / hostModel.cpuCount + + return requiredCapacity == null || availableCapacity >= (requiredCapacity / server.flavor.cpuCount) + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt new file mode 100644 index 00000000..a86226e2 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.weights + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity. + */ +public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher { + + override fun getWeight(host: HostView, server: Server): Double { + val model = host.host.model + val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double ?: 0.0 + return model.cpuCapacity / model.cpuCount - requiredCapacity / server.flavor.cpuCount + } + + override fun toString(): String = "VCpuWeigher" +} diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 564f9493..7b8d0fe2 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -125,7 +125,7 @@ internal class ComputeServiceTest { fun testAddHost() = scope.runBlockingSimulation { val host = mockk(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP assertEquals(0, service.hostCount) @@ -147,7 +147,7 @@ internal class ComputeServiceTest { fun testAddHostDouble() = scope.runBlockingSimulation { val host = mockk(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN assertEquals(0, service.hostCount) @@ -216,7 +216,7 @@ internal class ComputeServiceTest { fun testServerCannotFitOnHost() = scope.runBlockingSimulation { val host = mockk(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns false @@ -241,7 +241,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } every { host.canFit(any()) } returns false @@ -272,7 +272,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } every { host.canFit(any()) } returns false @@ -303,7 +303,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } @@ -326,7 +326,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } @@ -369,7 +369,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt index cafd4498..3f2ce43b 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -33,10 +33,7 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.internal.HostView -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.InstanceCountFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.filters.* import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher @@ -183,12 +180,12 @@ internal class FilterSchedulerTest { val hostA = mockk() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.availableMemory } returns 512 val hostB = mockk() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 2048 scheduler.addHost(hostA) @@ -210,7 +207,7 @@ internal class FilterSchedulerTest { val host = mockk() every { host.host.state } returns HostState.UP - every { host.host.model } returns HostModel(4, 2048) + every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.availableMemory } returns 2048 scheduler.addHost(host) @@ -231,12 +228,12 @@ internal class FilterSchedulerTest { val hostA = mockk() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.provisionedCores } returns 3 val hostB = mockk() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.provisionedCores } returns 0 scheduler.addHost(hostA) @@ -258,7 +255,7 @@ internal class FilterSchedulerTest { val host = mockk() every { host.host.state } returns HostState.UP - every { host.host.model } returns HostModel(4, 2048) + every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.provisionedCores } returns 0 scheduler.addHost(host) @@ -270,6 +267,34 @@ internal class FilterSchedulerTest { assertNull(scheduler.select(server)) } + @Test + fun testVCpuCapacityFilter() { + val scheduler = FilterScheduler( + filters = listOf(VCpuCapacityFilter()), + weighers = emptyList(), + ) + + val hostA = mockk() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(8 * 2600.0, 8, 2048) + every { hostA.availableMemory } returns 512 + scheduler.addHost(hostA) + + val hostB = mockk() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4 * 3200.0, 4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostB) + + val server = mockk() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + every { server.flavor.meta } returns mapOf("cpu-capacity" to 2 * 3200.0) + + assertEquals(hostB, scheduler.select(server)) + } + @Test fun testInstanceCountFilter() { val scheduler = FilterScheduler( @@ -279,12 +304,12 @@ internal class FilterSchedulerTest { val hostA = mockk() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.instanceCount } returns 2 val hostB = mockk() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.instanceCount } returns 0 scheduler.addHost(hostA) @@ -306,12 +331,12 @@ internal class FilterSchedulerTest { val hostA = mockk() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.availableMemory } returns 1024 val hostB = mockk() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 512 scheduler.addHost(hostA) @@ -333,12 +358,12 @@ internal class FilterSchedulerTest { val hostA = mockk() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(12, 2048) + every { hostA.host.model } returns HostModel(12 * 2600.0, 12, 2048) every { hostA.availableMemory } returns 1024 val hostB = mockk() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 512 scheduler.addHost(hostA) @@ -360,12 +385,12 @@ internal class FilterSchedulerTest { val hostA = mockk() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.provisionedCores } returns 2 val hostB = mockk() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.provisionedCores } returns 0 scheduler.addHost(hostA) @@ -387,12 +412,12 @@ internal class FilterSchedulerTest { val hostA = mockk() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.instanceCount } returns 2 val hostB = mockk() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.instanceCount } returns 0 scheduler.addHost(hostA) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index b9d02185..10faeb5b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -121,7 +121,7 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) + override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size }) /** * The [GuestListener] that listens for guest events. @@ -188,7 +188,7 @@ public class SimHost( } override fun canFit(server: Server): Boolean { - val sufficientMemory = model.memorySize >= server.flavor.memorySize + val sufficientMemory = model.memoryCapacity >= server.flavor.memorySize val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) @@ -319,8 +319,9 @@ public class SimHost( */ private fun Flavor.toMachineModel(): MachineModel { val originalCpu = machine.model.cpus[0] + val cpuCapacity = (this.meta["cpu-capacity"] as? Double ?: Double.MAX_VALUE).coerceAtMost(originalCpu.frequency) val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) return MachineModel(processingUnits, memoryUnits).optimize() diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 1a6624f7..f23becda 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -92,7 +92,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val idCol = reader.resolve(RESOURCE_ID) val startTimeCol = reader.resolve(RESOURCE_START_TIME) val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) - val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) var counter = 0 @@ -108,8 +109,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val submissionTime = reader.get(startTimeCol) as Instant val endTime = reader.get(stopTimeCol) as Instant - val maxCores = reader.getInt(coresCol) - val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB + val cpuCount = reader.getInt(cpuCountCol) + val cpuCapacity = reader.getDouble(cpuCapacityCol) + val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val builder = fragments.getValue(id) @@ -119,8 +121,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) { VirtualMachine( uid, id, - maxCores, - requiredMemory.roundToLong(), + cpuCount, + cpuCapacity, + memCapacity.roundToLong(), totalLoad, submissionTime, endTime, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt index 283f82fe..90ee56cb 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -128,7 +128,8 @@ public class ComputeWorkloadRunner( client.newFlavor( entry.name, entry.cpuCount, - entry.memCapacity + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() ), meta = mapOf("workload" to workload) ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 5dd239f6..88e80719 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -31,8 +31,9 @@ import java.util.* * * @param uid The unique identifier of the virtual machine. * @param name The name of the virtual machine. + * @param cpuCapacity The required CPU capacity for the VM in MHz. * @param cpuCount The number of vCPUs in the VM. - * @param memCapacity The provisioned memory for the VM. + * @param memCapacity The provisioned memory for the VM in MB. * @param startTime The start time of the VM. * @param stopTime The stop time of the VM. * @param trace The trace that belong to this VM. @@ -41,6 +42,7 @@ public data class VirtualMachine( val uid: UUID, val name: String, val cpuCount: Int, + val cpuCapacity: Double, val memCapacity: Long, val totalLoad: Double, val startTime: Instant, -- cgit v1.2.3 From a1be58f1013697223a339a6a49302e1e42a6662d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 14 Oct 2021 17:00:52 +0200 Subject: perf(telemetry): Do not allocate lambda in fast-path This commit changes the lookup for host and server aggregators to use `getOrPut` instead of `computeIfAbsent`. The former will inline the lambda and not cause any memory allocation in the fast-path (e.g., the key exists), while the latter always allocates lambda for constructing the aggregator. --- .../kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index 738ec38b..d5257bbd 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -222,7 +222,7 @@ public class ComputeMetricAggregator { private fun getHost(hosts: MutableMap, resource: Resource): HostAggregator? { val id = resource.attributes[HOST_ID] return if (id != null) { - hosts.computeIfAbsent(id) { HostAggregator(resource) } + hosts.getOrPut(id) { HostAggregator(resource) } } else { null } @@ -234,7 +234,7 @@ public class ComputeMetricAggregator { private fun getServer(servers: MutableMap, point: PointData): ServerAggregator? { val id = point.attributes[ResourceAttributes.HOST_ID] return if (id != null) { - servers.computeIfAbsent(id) { ServerAggregator(point.attributes) } + servers.getOrPut(id) { ServerAggregator(point.attributes) } } else { null } -- cgit v1.2.3 From e76bebe9e81c3813422da6d67fbab7d9f471a317 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 15 Oct 2021 14:43:43 +0200 Subject: perf(compute): Redesign VM interference algorithm This change redesigns the virtual machine interference algorithm to have a fixed memory usage per `VmInterferenceModel` instance. Previously, for every interference domain, a copy of the model would be created, leading to OutOfMemory errors when running multiple experiments at the same time. --- .../workload/util/PerformanceInterferenceReader.kt | 68 ---- .../workload/util/VmInterferenceModelReader.kt | 128 +++++++ .../util/PerformanceInterferenceReaderTest.kt | 45 --- .../workload/util/VmInterferenceModelReaderTest.kt | 37 ++ .../org/opendc/experiments/capelin/Portfolio.kt | 8 +- .../experiments/capelin/CapelinIntegrationTest.kt | 9 +- .../opendc/simulator/compute/SimAbstractMachine.kt | 82 +++-- .../compute/kernel/SimAbstractHypervisor.kt | 30 +- .../kernel/interference/VmInterferenceDomain.kt | 22 +- .../kernel/interference/VmInterferenceGroup.kt | 44 --- .../kernel/interference/VmInterferenceModel.kt | 385 ++++++++++++++++----- .../compute/kernel/SimFairShareHypervisorTest.kt | 12 +- .../src/main/kotlin/org/opendc/web/runner/Main.kt | 9 +- 13 files changed, 577 insertions(+), 302 deletions(-) delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt delete mode 100644 opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt create mode 100644 opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt deleted file mode 100644 index 67f9626c..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup -import java.io.File -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -public class PerformanceInterferenceReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - init { - mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) - } - - /** - * Read the performance interface model from [file]. - */ - public fun read(file: File): List { - return mapper.readValue(file) - } - - /** - * Read the performance interface model from the input. - */ - public fun read(input: InputStream): List { - return mapper.readValue(input) - } - - private data class GroupMixin( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set, - ) -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt new file mode 100644 index 00000000..e0fa8904 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import java.io.File +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + */ +public class VmInterferenceModelReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + + /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(file) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Read the performance interface model from the input. + */ + public fun read(input: InputStream): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(input) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Parse all groups in an interference JSON file. + */ + private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) { + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + + while (parser.nextToken() != JsonToken.END_ARRAY) { + parseGroup(parser, builder) + } + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + builder.addGroup(members, targetLoad, score) + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers(parser: JsonParser, members: MutableSet) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt") + members.add(member) + } + } + + private data class Group( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set, + ) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt deleted file mode 100644 index c79f0584..00000000 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll - -/** - * Test suite for the [PerformanceInterferenceReader] class. - */ -class PerformanceInterferenceReaderTest { - @Test - fun testSmoke() { - val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val result = PerformanceInterferenceReader().read(input) - - assertAll( - { assertEquals(2, result.size) }, - { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, - { assertEquals(0.0, result[0].targetLoad, 0.001) }, - { assertEquals(0.8830158730158756, result[0].score, 0.001) } - ) - } -} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt new file mode 100644 index 00000000..1c3e7149 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow + +/** + * Test suite for the [VmInterferenceModelReader] class. + */ +class VmInterferenceModelReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json")) + assertDoesNotThrow { VmInterferenceModelReader().read(input) } + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 4e855f82..53c9de11 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -30,14 +30,13 @@ import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader @@ -99,9 +98,8 @@ abstract class Portfolio(name: String) : Experiment(name) { val seeder = Random(repeat.toLong()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) - PerformanceInterferenceReader() + VmInterferenceModelReader() .read(File(config.getString("interference-model"))) - .let { VmInterferenceModel(it, Random(seeder.nextLong())) } else null @@ -116,7 +114,7 @@ abstract class Portfolio(name: String) : Experiment(name) { clock, computeScheduler, failureModel, - performanceInterferenceModel + performanceInterferenceModel?.withSeed(repeat.toLong()) ) val exporter = ParquetComputeMetricExporter( diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 94e92c1b..56ba9cfe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,9 +34,8 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics @@ -177,9 +176,9 @@ class CapelinIntegrationTest { val workload = createTestWorkload(1.0, seed) val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = - PerformanceInterferenceReader() + VmInterferenceModelReader() .read(perfInterferenceInput) - .let { VmInterferenceModel(it, Random(seed.toLong())) } + .withSeed(seed.toLong()) val simulator = ComputeWorkloadRunner( coroutineContext, @@ -213,7 +212,7 @@ class CapelinIntegrationTest { { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 60a10f20..5909d980 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -77,33 +77,41 @@ public abstract class SimAbstractMachine( private var isTerminated = false /** - * The continuation to resume when the virtual machine workload has finished. + * The current active [Context]. */ - private var cont: Continuation? = null + private var _ctx: Context? = null + + /** + * This method is invoked when the machine is started. + */ + protected open fun onStart(ctx: SimMachineContext) {} + + /** + * This method is invoked when the machine is stopped. + */ + protected open fun onStop(ctx: SimMachineContext) { + _ctx = null + } /** * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ override suspend fun run(workload: SimWorkload, meta: Map) { check(!isTerminated) { "Machine is terminated" } - check(cont == null) { "A machine cannot run concurrently" } - - val ctx = Context(meta) + check(_ctx == null) { "A machine cannot run concurrently" } return suspendCancellableCoroutine { cont -> - this.cont = cont + val ctx = Context(meta, cont) + _ctx = ctx // Cancel all cpus on cancellation - cont.invokeOnCancellation { - this.cont = null - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - } + cont.invokeOnCancellation { ctx.close() } - engine.batch { workload.onStart(ctx) } + engine.batch { + onStart(ctx) + + workload.onStart(ctx) + } } } @@ -113,34 +121,22 @@ public abstract class SimAbstractMachine( } isTerminated = true - cancel() + _ctx?.close() } override fun onConverge(now: Long, delta: Long) { parent?.onConverge(now, delta) } - /** - * Cancel the workload that is currently running on the machine. - */ - private fun cancel() { - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - - val cont = cont - if (cont != null) { - this.cont = null - cont.resume(Unit) - } - } - /** * The execution context in which the workload runs. */ - private inner class Context(override val meta: Map) : SimMachineContext { + private inner class Context(override val meta: Map, private val cont: Continuation) : SimMachineContext { + /** + * A flag to indicate that the context has been closed. + */ + private var isClosed = false + override val engine: FlowEngine get() = this@SimAbstractMachine.engine @@ -152,7 +148,21 @@ public abstract class SimAbstractMachine( override val storage: List = this@SimAbstractMachine.storage - override fun close() = cancel() + override fun close() { + if (isClosed) { + return + } + + isClosed = true + engine.batch { + for (cpu in cpus) { + cpu.cancel() + } + } + + onStop(this) + cont.resume(Unit) + } } /** @@ -166,7 +176,7 @@ public abstract class SimAbstractMachine( * The [SimNetworkAdapter] implementation for a machine. */ private class NetworkAdapterImpl( - private val engine: FlowEngine, + engine: FlowEngine, model: NetworkAdapter, index: Int ) : SimNetworkAdapter(), SimNetworkInterface { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index f6d8f628..90bf5e25 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -29,6 +29,7 @@ import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -141,11 +142,14 @@ public abstract class SimAbstractHypervisor( * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + private inner class VirtualMachine( + model: MachineModel, + private val interferenceId: String? = null + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { /** * The interference key of this virtual machine. */ - private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) } + private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. @@ -187,6 +191,24 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) + override fun onStart(ctx: SimMachineContext) { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + + super.onStart(ctx) + } + + override fun onStop(ctx: SimMachineContext) { + super.onStop(ctx) + + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.leave(interferenceKey) + } + } + override fun close() { super.close() @@ -195,8 +217,10 @@ public abstract class SimAbstractHypervisor( } _vms.remove(this) + + val interferenceKey = interferenceKey if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) + interferenceDomain?.removeKey(interferenceKey) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index b737d61a..09b03306 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey */ public interface VmInterferenceDomain : InterferenceDomain { /** - * Join this interference domain. + * Construct an [InterferenceKey] for the specified [id]. * * @param id The identifier of the virtual machine. + * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine + * does not participate in the domain. */ - public fun join(id: String): InterferenceKey + public fun createKey(id: String): InterferenceKey? /** - * Leave this interference domain. + * Remove the specified [key] from this domain. + */ + public fun removeKey(key: InterferenceKey) + + /** + * Mark the specified [key] as active in this interference domain. + * + * @param key The key to join the interference domain with. + */ + public fun join(key: InterferenceKey) + + /** + * Mark the specified [key] as inactive in this interference domain. + * + * @param key The key of the virtual machine that wants to leave the domain. */ public fun leave(key: InterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt deleted file mode 100644 index 708ddede..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel.interference - -/** - * A group of virtual machines that together can interfere when operating on the same resources, causing performance - * variability. - */ -public data class VmInterferenceGroup( - /** - * The minimum load of the host before the interference occurs. - */ - public val targetLoad: Double, - - /** - * A score in [0, 1] representing the performance variability as a result of resource interference. - */ - public val score: Double, - - /** - * The members of this interference group. - */ - public val members: Set -) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index b3d72507..977292be 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -28,143 +28,366 @@ import java.util.* /** * An interference model that models the resource interference between virtual machines on a host. * - * @param groups The groups of virtual machines that interfere with each other. - * @param random The [Random] instance to select the affected virtual machines. + * @param targets The target load of each group. + * @param scores The performance score of each group. + * @param members The members belonging to each group. + * @param membership The identifier of each key. + * @param size The number of groups. + * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ -public class VmInterferenceModel( - private val groups: List, - private val random: Random = Random(0) +public class VmInterferenceModel private constructor( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map, + private val members: Array, + private val membership: Array, + private val size: Int, + seed: Long, ) { + /** + * A [SplittableRandom] used for selecting the virtual machines that are affected. + */ + private val random = SplittableRandom(seed) + /** * Construct a new [VmInterferenceDomain]. */ - public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain { + public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random) + + /** + * Create a copy of this model with a different seed. + */ + public fun withSeed(seed: Long): VmInterferenceModel { + return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed) + } + + public companion object { /** - * The stateful groups of this domain. + * Construct a [Builder] instance. */ - private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) } + @JvmStatic + public fun builder(): Builder = Builder() + } + /** + * Builder class for a [VmInterferenceModel] + */ + public class Builder internal constructor() { /** - * The set of keys active in this domain. + * The initial capacity of the builder. */ - private val keys = mutableSetOf() + private val INITIAL_CAPACITY = 256 - override fun join(id: String): InterferenceKey { - val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad }) - keys += key - return key - } + /** + * The target load of each group. + */ + private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } - override fun leave(key: InterferenceKey) { - if (key is InterferenceKeyImpl) { - keys -= key - key.leave() + /** + * The performance score of each group. + */ + private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } + + /** + * The members of each group. + */ + private var _members = ArrayList>(INITIAL_CAPACITY) + + /** + * The mapping from member to group id. + */ + private val ids = TreeSet() + + /** + * The number of groups in the model. + */ + private var size = 0 + + /** + * Add the specified group to the model. + */ + public fun addGroup(members: Set, targetLoad: Double, score: Double): Builder { + val size = size + + if (size == _targets.size) { + grow() } + + _targets[size] = targetLoad + _scores[size] = score + _members.add(members) + ids.addAll(members) + + this.size++ + + return this } - override fun apply(key: InterferenceKey?, load: Double): Double { - if (key == null || key !is InterferenceKeyImpl) { - return 1.0 - } + /** + * Build the [VmInterferenceModel]. + */ + public fun build(seed: Long = 0): VmInterferenceModel { + val size = size + val targets = _targets + val scores = _scores + val members = _members - val ctx = key.findGroup(load) - val group = ctx?.group + val indices = Array(size) { it } + indices.sortWith( + Comparator { l, r -> + var cmp = targets[l].compareTo(targets[r]) // Order by target load + if (cmp != 0) { + return@Comparator cmp + } - // Apply performance penalty to (on average) only one of the VMs - return if (group != null && random.nextInt(group.members.size) == 0) { - group.score - } else { - 1.0 + cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first) + if (cmp != 0) + cmp + else + l.compareTo(r) + } + ) + + val newTargets = DoubleArray(size) + val newScores = DoubleArray(size) + val newMembers = arrayOfNulls(size) + + var nextId = 0 + val idMapping = ids.associateWith { nextId++ } + val membership = ids.associateWithTo(TreeMap()) { ArrayList() } + + for ((group, j) in indices.withIndex()) { + newTargets[group] = targets[j] + newScores[group] = scores[j] + val groupMembers = members[j] + val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray() + + newGroupMembers.sort() + newMembers[group] = newGroupMembers + + for (member in groupMembers) { + membership.getValue(member).add(group) + } } + + @Suppress("UNCHECKED_CAST") + return VmInterferenceModel( + newTargets, + newScores, + idMapping, + newMembers as Array, + membership.map { it.value.toIntArray() }.toTypedArray(), + size, + seed + ) } - override fun toString(): String = "VmInterferenceDomain" + /** + * Helper function to grow the capacity of the internal arrays. + */ + private fun grow() { + val oldSize = _targets.size + val newSize = oldSize + (oldSize shr 1) + + _targets = _targets.copyOf(newSize) + _scores = _scores.copyOf(newSize) + } } /** - * An interference key. - * - * @param id The identifier of the member. - * @param groups The groups to which the key belongs. + * Internal implementation of [VmInterferenceDomain]. */ - private inner class InterferenceKeyImpl(val id: String, private val groups: List) : InterferenceKey { - init { - for (group in groups) { - group.join(this) - } - } + private class InterferenceDomainImpl( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map, + private val members: Array, + private val membership: Array, + private val random: SplittableRandom + ) : VmInterferenceDomain { + /** + * Keys registered with this domain. + */ + private val keys = HashMap() /** - * Find the active group that applies for the interference member. + * The set of keys active in this domain. */ - fun findGroup(load: Double): GroupContext? { - // Find the first active group whose target load is lower than the current load - val index = groups.binarySearchBy(load) { it.group.targetLoad } - val target = if (index >= 0) index else -(index + 1) + private val activeKeys = ArrayList() - // Check whether there are active groups ahead of the index - for (i in target until groups.size) { - val group = groups[i] - if (group.group.targetLoad > load) { - break - } else if (group.isActive) { - return group + override fun createKey(id: String): InterferenceKey? { + val intId = idMapping[id] ?: return null + return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } + } + + override fun removeKey(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (activeKeys.remove(key)) { + computeActiveGroups(key.id) + } + + keys.remove(key.id) + } + + override fun join(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (key.acquire()) { + val pos = activeKeys.binarySearch(key) + if (pos < 0) { + activeKeys.add(-pos - 1, key) } + computeActiveGroups(key.id) } + } + + override fun leave(key: InterferenceKey) { + if (key is InterferenceKeyImpl && key.release()) { + activeKeys.remove(key) + computeActiveGroups(key.id) + } + } + + override fun apply(key: InterferenceKey?, load: Double): Double { + if (key == null || key !is InterferenceKeyImpl) { + return 1.0 + } + + val groups = key.groups + val groupSize = groups.size + + if (groupSize == 0) { + return 1.0 + } + + val targets = targets + val scores = scores + var low = 0 + var high = groups.size - 1 - // Check whether there are active groups before the index - for (i in (target - 1) downTo 0) { - val group = groups[i] - if (group.isActive) { - return group + var group = -1 + var score = 1.0 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + score = scores[midGroup] + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + score = scores[midGroup] + break } } - return null + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + score + } else { + 1.0 + } } + override fun toString(): String = "VmInterferenceDomain" + /** - * Leave all the groups. + * Queue of participants that will be removed or added to the active groups. */ - fun leave() { + private val _participants = ArrayDeque() + + /** + * (Re-)Compute the active groups. + */ + private fun computeActiveGroups(id: Int) { + val activeKeys = activeKeys + val groups = membership[id] + + if (activeKeys.isEmpty()) { + return + } + + val members = members + val participants = _participants + for (group in groups) { - group.leave(this) + val groupMembers = members[group] + + var i = 0 + var j = 0 + var intersection = 0 + + // Compute the intersection of the group members and the current active members + while (i < groupMembers.size && j < activeKeys.size) { + val l = groupMembers[i] + val rightEntry = activeKeys[j] + val r = rightEntry.id + + if (l < r) { + i++ + } else if (l > r) { + j++ + } else { + participants.add(rightEntry) + intersection++ + + i++ + j++ + } + } + + while (true) { + val participant = participants.poll() ?: break + val participantGroups = participant.groups + if (intersection <= 1) { + participantGroups.remove(group) + } else { + val pos = participantGroups.binarySearch(group) + if (pos < 0) { + participantGroups.add(-pos - 1, group) + } + } + } } } } /** - * A group context is used to track the active keys per interference group. + * An interference key. + * + * @param id The identifier of the member. */ - private inner class GroupContext(val group: VmInterferenceGroup) { + private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable { /** - * The active keys that are part of this group. + * The active groups to which the key belongs. */ - private val keys = mutableSetOf() + @JvmField val groups: MutableList = ArrayList() /** - * A flag to indicate that the group is active. + * The number of users of the interference key. */ - val isActive - get() = keys.size > 1 + private var refCount: Int = 0 /** - * Determine whether the specified [id] is part of this group. + * Join the domain. */ - operator fun contains(id: String): Boolean = id in group.members + fun acquire(): Boolean = refCount++ <= 0 /** - * Join this group with the specified [key]. + * Leave the domain. */ - fun join(key: InterferenceKeyImpl) { - keys += key - } + fun release(): Boolean = --refCount <= 0 - /** - * Leave this group with the specified [key]. - */ - fun leave(key: InterferenceKeyImpl) { - keys -= key - } + override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 6f32cf46..b7f5bf8e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -187,12 +186,11 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val groups = listOf( - VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")), - VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")), - VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) - ) - val interferenceModel = VmInterferenceModel(groups) + val interferenceModel = VmInterferenceModel.builder() + .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")) + .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")) + .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) + .build() val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 59308e11..a1bc869e 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -32,7 +32,7 @@ import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -131,7 +131,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.info { "Constructing performance interference model" } val workloadLoader = ComputeWorkloadLoader(tracePath) - val interferenceGroups = let { + val interferenceModel = let { val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") val operational = scenario.operationalPhenomena val enabled = operational.performanceInterferenceEnabled @@ -140,15 +140,14 @@ class RunnerCli : CliktCommand(name = "runner") { return@let null } - PerformanceInterferenceReader().read(path.inputStream()) + VmInterferenceModelReader().read(path.inputStream()) } val targets = portfolio.targets val results = (0 until targets.repeatsPerScenario).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } - runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel) + runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong())) } } -- cgit v1.2.3 From f565afb1ef7b940804af62aa73b6859dcb78a847 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 13 Oct 2021 15:42:41 +0200 Subject: feat(telemetry): Report provisioning time of virtual machines This change adds support for collecting the provisioning time of virtual machines in addition to their boot time. --- .../compute/service/internal/ComputeServiceImpl.kt | 20 ++++++++++++++- .../compute/service/internal/InternalServer.kt | 9 +++++-- .../org/opendc/compute/simulator/internal/Guest.kt | 2 +- .../export/parquet/ParquetServerDataWriter.kt | 7 +++-- .../telemetry/compute/ComputeMetricAggregator.kt | 30 +++++++++++----------- .../opendc/telemetry/compute/table/ServerData.kt | 2 +- 6 files changed, 46 insertions(+), 24 deletions(-) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 2a07a208..292feabe 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -26,6 +26,7 @@ import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -173,6 +174,12 @@ internal class ComputeServiceImpl( result.observe(available, upState) result.observe(total - available, downState) } + + meter.gaugeBuilder("system.time.provision") + .setDescription("The most recent timestamp where the server entered a provisioned state") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectProvisionTime) } override fun newClient(): ComputeClient { @@ -324,8 +331,10 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } + val now = clock.millis() + val request = SchedulingRequest(server, now) - val request = SchedulingRequest(server, clock.millis()) + server.lastProvisioningTimestamp = now queue.add(request) _serversPending.add(1) requestSchedulingCycle() @@ -501,4 +510,13 @@ internal class ComputeServiceImpl( requestSchedulingCycle() } } + + /** + * Collect the timestamp when each server entered its provisioning state most recently. + */ + private fun collectProvisionTime(result: ObservableLongMeasurement) { + for ((_, server) in servers) { + result.observe(server.lastProvisioningTimestamp, server.attributes) + } + } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index 05a7e1bf..f1b92c66 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -55,7 +55,7 @@ internal class InternalServer( /** * The attributes of a server. */ - internal val attributes: Attributes = Attributes.builder() + @JvmField internal val attributes: Attributes = Attributes.builder() .put(ResourceAttributes.HOST_NAME, name) .put(ResourceAttributes.HOST_ID, uid.toString()) .put(ResourceAttributes.HOST_TYPE, flavor.name) @@ -70,7 +70,12 @@ internal class InternalServer( /** * The [Host] that has been assigned to host the server. */ - internal var host: Host? = null + @JvmField internal var host: Host? = null + + /** + * The most recent timestamp when the server entered a provisioning state. + */ + @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE /** * The current scheduling request. diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 5ea1860d..61b3214e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -248,7 +248,7 @@ internal class Guest( */ fun collectBootTime(result: ObservableLongMeasurement) { if (_bootTime != Long.MIN_VALUE) { - result.observe(_bootTime) + result.observe(_bootTime, attributes) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 0d11ec23..4ebf8c62 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -55,9 +55,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : builder["uptime"] = data.uptime builder["downtime"] = data.downtime - val bootTime = data.bootTime - builder["boot_time"] = bootTime?.toEpochMilli() - builder["scheduling_latency"] = data.schedulingLatency + builder["boot_time"] = data.bootTime?.toEpochMilli() + builder["provision_time"] = data.provisionTime?.toEpochMilli() builder["cpu_count"] = data.server.cpuCount builder["cpu_limit"] = data.cpuLimit @@ -81,8 +80,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .name("host_id").type(UUID_SCHEMA.optional()).noDefault() .requiredLong("uptime") .requiredLong("downtime") + .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredLong("scheduling_latency") .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index d5257bbd..b293f7b5 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -30,7 +30,6 @@ import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.telemetry.compute.table.* import java.time.Instant -import kotlin.math.roundToLong /** * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. @@ -81,12 +80,6 @@ public class ComputeMetricAggregator { } } } - "scheduler.latency" -> { - for (point in metric.doubleHistogramData.points) { - val server = getServer(servers, point) ?: continue - server.schedulingLatency = (point.sum / point.count).roundToLong() - } - } // SimHost "system.guests" -> { @@ -190,13 +183,20 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { - server.bootTime = point.value + server.bootTime = Instant.ofEpochMilli(point.value) server.host = agg.host } else { - agg.bootTime = point.value + agg.bootTime = Instant.ofEpochMilli(point.value) } } } + "system.time.provision" -> { + for (point in metric.longGaugeData.points) { + val server = getServer(servers, point) ?: continue + server.recordTimestamp(point) + server.provisionTime = Instant.ofEpochMilli(point.value) + } + } } } } @@ -323,7 +323,7 @@ public class ComputeMetricAggregator { private var previousUptime = 0L @JvmField var downtime = 0L private var previousDowntime = 0L - @JvmField var bootTime = Long.MIN_VALUE + @JvmField var bootTime: Instant? = null /** * Finish the aggregation for this cycle. @@ -379,7 +379,7 @@ public class ComputeMetricAggregator { powerTotal - previousPowerTotal, uptime - previousUptime, downtime - previousDowntime, - if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null + bootTime ) } @@ -419,8 +419,8 @@ public class ComputeMetricAggregator { private var previousUptime = 0L @JvmField var downtime: Long = 0 private var previousDowntime = 0L - @JvmField var bootTime: Long = 0 - @JvmField var schedulingLatency = 0L + @JvmField var provisionTime: Instant? = null + @JvmField var bootTime: Instant? = null @JvmField var cpuLimit = 0.0 @JvmField var cpuActiveTime = 0L @JvmField var cpuIdleTime = 0L @@ -461,8 +461,8 @@ public class ComputeMetricAggregator { host, uptime - previousUptime, downtime - previousDowntime, - if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null, - schedulingLatency, + provisionTime, + bootTime, cpuLimit, cpuActiveTime - previousCpuActiveTime, cpuIdleTime - previousCpuIdleTime, diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt index c48bff3a..6fd2a81b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -33,8 +33,8 @@ public data class ServerData( val host: HostInfo?, val uptime: Long, val downtime: Long, + val provisionTime: Instant?, val bootTime: Instant?, - val schedulingLatency: Long, val cpuLimit: Double, val cpuActiveTime: Long, val cpuIdleTime: Long, -- cgit v1.2.3 From 17951889c6d805b907d936d54e7e66efb7376879 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 15 Oct 2021 15:32:11 +0200 Subject: perf(telemetry): Prevent allocations during collection cycle This change redesigns the ComputeMonitor interface to reduce the number of memory allocations necessary during a collection cycle. --- .../org/opendc/compute/simulator/SimHostTest.kt | 28 +- .../export/parquet/ParquetComputeMetricExporter.kt | 18 +- .../workload/export/parquet/ParquetDataWriter.kt | 23 +- .../export/parquet/ParquetHostDataWriter.kt | 8 +- .../export/parquet/ParquetServerDataWriter.kt | 8 +- .../export/parquet/ParquetServiceDataWriter.kt | 8 +- .../experiments/capelin/CapelinIntegrationTest.kt | 16 +- .../telemetry/compute/ComputeMetricAggregator.kt | 393 +++++++++++---------- .../org/opendc/telemetry/compute/ComputeMonitor.kt | 18 +- .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 6 +- .../org/opendc/telemetry/compute/table/HostData.kt | 50 --- .../telemetry/compute/table/HostTableReader.kt | 125 +++++++ .../opendc/telemetry/compute/table/ServerData.kt | 43 --- .../telemetry/compute/table/ServerTableReader.kt | 90 +++++ .../opendc/telemetry/compute/table/ServiceData.kt | 7 + .../telemetry/compute/table/ServiceTableReader.kt | 70 ++++ .../opendc/web/runner/WebComputeMetricExporter.kt | 38 +- 17 files changed, 587 insertions(+), 362 deletions(-) delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a0ff9228..799a8cf0 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -46,8 +46,8 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.HOST_ID -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration @@ -140,10 +140,10 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - stealTime += data.cpuStealTime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + stealTime += reader.cpuStealTime } }, exportInterval = Duration.ofSeconds(duration) @@ -236,16 +236,16 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - uptime += data.uptime - downtime += data.downtime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + uptime += reader.uptime + downtime += reader.downtime } - override fun record(data: ServerData) { - guestUptime += data.uptime - guestDowntime += data.downtime + override fun record(reader: ServerTableReader) { + guestUptime += reader.uptime + guestDowntime += reader.downtime } }, exportInterval = Duration.ofSeconds(duration) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt index ad182d67..a46885f4 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import io.opentelemetry.sdk.common.CompletableResultCode import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader import java.io.File /** @@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS bufferSize ) - override fun record(data: ServerData) { - serverWriter.write(data) + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) } - override fun record(data: HostData) { - hostWriter.write(data) + override fun record(reader: HostTableReader) { + hostWriter.write(reader) } - override fun record(data: ServiceData) { - serviceWriter.write(data) + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) } override fun shutdown(): CompletableResultCode { diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index 4172d729..84387bbc 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -50,9 +50,9 @@ public abstract class ParquetDataWriter( private val logger = KotlinLogging.logger {} /** - * The queue of commands to process. + * The queue of records to process. */ - private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) /** * An exception to be propagated to the actual writer. @@ -72,20 +72,20 @@ public abstract class ParquetDataWriter( } val queue = queue - val buf = mutableListOf() + val buf = mutableListOf() var shouldStop = false try { while (!shouldStop) { try { - process(writer, queue.take()) + writer.write(queue.take()) } catch (e: InterruptedException) { shouldStop = true } if (queue.drainTo(buf) > 0) { for (data in buf) { - process(writer, data) + writer.write(data) } buf.clear() } @@ -119,7 +119,9 @@ public abstract class ParquetDataWriter( throw IllegalStateException("Writer thread failed", exception) } - queue.put(data) + val builder = GenericRecordBuilder(schema) + convert(builder, data) + queue.put(builder.build()) } /** @@ -133,13 +135,4 @@ public abstract class ParquetDataWriter( init { writerThread.start() } - - /** - * Process the specified [data] to be written to the Parquet file. - */ - private fun process(writer: ParquetWriter, data: T) { - val builder = GenericRecordBuilder(schema) - convert(builder, data) - writer.write(builder.build()) - } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 98a0739e..2b7cac8f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [HostData]s. + * A Parquet event writer for [HostTableReader]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { + ParquetDataWriter(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { return builder @@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: HostData) { + override fun convert(builder: GenericRecordBuilder, data: HostTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["host_id"] = data.host.id diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 4ebf8c62..144b6624 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [ServerData]s. + * A Parquet event writer for [ServerTableReader]s. */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { + ParquetDataWriter(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder): ParquetWriter { return builder @@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: ServerData) { + override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["server_id"] = data.server.id diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 47824b29..ec8a2b65 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.io.File /** - * A Parquet event writer for [ServiceData]s. + * A Parquet event writer for [ServiceTableReader]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter(path, SCHEMA, bufferSize) { + ParquetDataWriter(path, SCHEMA, bufferSize) { - override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["hosts_up"] = data.hostsUp builder["hosts_down"] = data.hostsDown diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 56ba9cfe..f3a6ed1a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -39,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration @@ -284,13 +284,13 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L - override fun record(data: HostData) { - idleTime += data.cpuIdleTime - activeTime += data.cpuActiveTime - stealTime += data.cpuStealTime - lostTime += data.cpuLostTime - energyUsage += data.powerTotal - uptime += data.uptime + override fun record(reader: HostTableReader) { + idleTime += reader.cpuIdleTime + activeTime += reader.cpuActiveTime + stealTime += reader.cpuStealTime + lostTime += reader.cpuLostTime + energyUsage += reader.powerTotal + uptime += reader.uptime } } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index b293f7b5..418dc201 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -20,6 +20,8 @@ * SOFTWARE. */ +@file:Suppress("PropertyName") + package org.opendc.telemetry.compute import io.opentelemetry.api.common.AttributeKey @@ -32,7 +34,7 @@ import org.opendc.telemetry.compute.table.* import java.time.Instant /** - * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. + * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader]. */ public class ComputeMetricAggregator { private val _service = ServiceAggregator() @@ -58,25 +60,25 @@ public class ComputeMetricAggregator { service.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> service.hostsUp = point.value.toInt() - "down" -> service.hostsDown = point.value.toInt() + "up" -> service._hostsUp = point.value.toInt() + "down" -> service._hostsDown = point.value.toInt() } } } "scheduler.servers" -> { for (point in metric.longSumData.points) { when (point.attributes[STATE_KEY]) { - "pending" -> service.serversPending = point.value.toInt() - "active" -> service.serversActive = point.value.toInt() + "pending" -> service._serversPending = point.value.toInt() + "active" -> service._serversActive = point.value.toInt() } } } "scheduler.attempts" -> { for (point in metric.longSumData.points) { when (point.attributes[RESULT_KEY]) { - "success" -> service.attemptsSuccess = point.value.toInt() - "failure" -> service.attemptsFailure = point.value.toInt() - "error" -> service.attemptsError = point.value.toInt() + "success" -> service._attemptsSuccess = point.value.toInt() + "failure" -> service._attemptsFailure = point.value.toInt() + "error" -> service._attemptsError = point.value.toInt() } } } @@ -87,10 +89,10 @@ public class ComputeMetricAggregator { for (point in metric.longSumData.points) { when (point.attributes[STATE_KEY]) { - "terminated" -> agg.guestsTerminated = point.value.toInt() - "running" -> agg.guestsRunning = point.value.toInt() - "error" -> agg.guestsRunning = point.value.toInt() - "invalid" -> agg.guestsInvalid = point.value.toInt() + "terminated" -> agg._guestsTerminated = point.value.toInt() + "running" -> agg._guestsRunning = point.value.toInt() + "error" -> agg._guestsRunning = point.value.toInt() + "invalid" -> agg._guestsInvalid = point.value.toInt() } } } @@ -101,24 +103,24 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { - server.cpuLimit = point.value - server.host = agg.host + server._cpuLimit = point.value + server._host = agg.host } else { - agg.cpuLimit = point.value + agg._cpuLimit = point.value } } } "system.cpu.usage" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuUsage = metric.doubleGaugeData.points.first().value + agg._cpuUsage = metric.doubleGaugeData.points.first().value } "system.cpu.demand" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuDemand = metric.doubleGaugeData.points.first().value + agg._cpuDemand = metric.doubleGaugeData.points.first().value } "system.cpu.utilization" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuUtilization = metric.doubleGaugeData.points.first().value + agg._cpuUtilization = metric.doubleGaugeData.points.first().value } "system.cpu.time" -> { val agg = getHost(hosts, resource) ?: continue @@ -128,29 +130,29 @@ public class ComputeMetricAggregator { val state = point.attributes[STATE_KEY] if (server != null) { when (state) { - "active" -> server.cpuActiveTime = point.value - "idle" -> server.cpuIdleTime = point.value - "steal" -> server.cpuStealTime = point.value - "lost" -> server.cpuLostTime = point.value + "active" -> server._cpuActiveTime = point.value + "idle" -> server._cpuIdleTime = point.value + "steal" -> server._cpuStealTime = point.value + "lost" -> server._cpuLostTime = point.value } - server.host = agg.host + server._host = agg.host } else { when (state) { - "active" -> agg.cpuActiveTime = point.value - "idle" -> agg.cpuIdleTime = point.value - "steal" -> agg.cpuStealTime = point.value - "lost" -> agg.cpuLostTime = point.value + "active" -> agg._cpuActiveTime = point.value + "idle" -> agg._cpuIdleTime = point.value + "steal" -> agg._cpuStealTime = point.value + "lost" -> agg._cpuLostTime = point.value } } } } "system.power.usage" -> { val agg = getHost(hosts, resource) ?: continue - agg.powerUsage = metric.doubleGaugeData.points.first().value + agg._powerUsage = metric.doubleGaugeData.points.first().value } "system.power.total" -> { val agg = getHost(hosts, resource) ?: continue - agg.powerTotal = metric.doubleSumData.points.first().value + agg._powerTotal = metric.doubleSumData.points.first().value } "system.time" -> { val agg = getHost(hosts, resource) ?: continue @@ -162,16 +164,16 @@ public class ComputeMetricAggregator { server.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> server.uptime = point.value - "down" -> server.downtime = point.value + "up" -> server._uptime = point.value + "down" -> server._downtime = point.value } - server.host = agg.host + server._host = agg.host } else { agg.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> agg.uptime = point.value - "down" -> agg.downtime = point.value + "up" -> agg._uptime = point.value + "down" -> agg._downtime = point.value } } } @@ -183,10 +185,10 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { - server.bootTime = Instant.ofEpochMilli(point.value) - server.host = agg.host + server._bootTime = Instant.ofEpochMilli(point.value) + server._host = agg.host } else { - agg.bootTime = Instant.ofEpochMilli(point.value) + agg._bootTime = Instant.ofEpochMilli(point.value) } } } @@ -194,7 +196,7 @@ public class ComputeMetricAggregator { for (point in metric.longGaugeData.points) { val server = getServer(servers, point) ?: continue server.recordTimestamp(point) - server.provisionTime = Instant.ofEpochMilli(point.value) + server._provisionTime = Instant.ofEpochMilli(point.value) } } } @@ -205,14 +207,16 @@ public class ComputeMetricAggregator { * Collect the data via the [monitor]. */ public fun collect(monitor: ComputeMonitor) { - monitor.record(_service.collect()) + monitor.record(_service) for (host in _hosts.values) { - monitor.record(host.collect()) + monitor.record(host) + host.reset() } for (server in _servers.values) { - monitor.record(server.collect()) + monitor.record(server) + server.reset() } } @@ -243,50 +247,55 @@ public class ComputeMetricAggregator { /** * An aggregator for service metrics before they are reported. */ - internal class ServiceAggregator { - private var timestamp = Long.MIN_VALUE + internal class ServiceAggregator : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp - @JvmField var hostsUp = 0 - @JvmField var hostsDown = 0 + override val hostsUp: Int + get() = _hostsUp + @JvmField var _hostsUp = 0 - @JvmField var serversPending = 0 - @JvmField var serversActive = 0 + override val hostsDown: Int + get() = _hostsDown + @JvmField var _hostsDown = 0 - @JvmField var attemptsSuccess = 0 - @JvmField var attemptsFailure = 0 - @JvmField var attemptsError = 0 + override val serversPending: Int + get() = _serversPending + @JvmField var _serversPending = 0 - /** - * Finish the aggregation for this cycle. - */ - fun collect(): ServiceData { - val now = Instant.ofEpochMilli(timestamp) - return toServiceData(now) - } + override val serversActive: Int + get() = _serversActive + @JvmField var _serversActive = 0 - /** - * Convert the aggregator state to an immutable [ServiceData]. - */ - private fun toServiceData(now: Instant): ServiceData { - return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) - } + override val attemptsSuccess: Int + get() = _attemptsSuccess + @JvmField var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + @JvmField var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + @JvmField var _attemptsError = 0 /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } /** * An aggregator for host metrics before they are reported. */ - internal class HostAggregator(resource: Resource) { + internal class HostAggregator(resource: Resource) : HostTableReader { /** * The static information about this host. */ - val host = HostInfo( + override val host = HostInfo( resource.attributes[HOST_ID]!!, resource.attributes[HOST_NAME] ?: "", resource.attributes[HOST_ARCH] ?: "", @@ -294,111 +303,127 @@ public class ComputeMetricAggregator { resource.attributes[HOST_MEM_CAPACITY] ?: 0, ) - private var timestamp = Long.MIN_VALUE + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + @JvmField var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + @JvmField var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + @JvmField var _guestsError = 0 - @JvmField var guestsTerminated = 0 - @JvmField var guestsRunning = 0 - @JvmField var guestsError = 0 - @JvmField var guestsInvalid = 0 + override val guestsInvalid: Int + get() = _guestsInvalid + @JvmField var _guestsInvalid = 0 - @JvmField var cpuLimit = 0.0 - @JvmField var cpuUsage = 0.0 - @JvmField var cpuDemand = 0.0 - @JvmField var cpuUtilization = 0.0 + override val cpuLimit: Double + get() = _cpuLimit + @JvmField var _cpuLimit = 0.0 - @JvmField var cpuActiveTime = 0L - @JvmField var cpuIdleTime = 0L - @JvmField var cpuStealTime = 0L - @JvmField var cpuLostTime = 0L + override val cpuUsage: Double + get() = _cpuUsage + @JvmField var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + @JvmField var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + @JvmField var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + @JvmField var _cpuActiveTime = 0L private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + @JvmField var _cpuIdleTime = 0L private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + @JvmField var _cpuStealTime = 0L private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + @JvmField var _cpuLostTime = 0L private var previousCpuLostTime = 0L - @JvmField var powerUsage = 0.0 - @JvmField var powerTotal = 0.0 + override val powerUsage: Double + get() = _powerUsage + @JvmField var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + @JvmField var _powerTotal = 0.0 private var previousPowerTotal = 0.0 - @JvmField var uptime = 0L + override val uptime: Long + get() = _uptime - previousUptime + @JvmField var _uptime = 0L private var previousUptime = 0L - @JvmField var downtime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + @JvmField var _downtime = 0L private var previousDowntime = 0L - @JvmField var bootTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + @JvmField var _bootTime: Instant? = null /** * Finish the aggregation for this cycle. */ - fun collect(): HostData { - val now = Instant.ofEpochMilli(timestamp) - val data = toHostData(now) - + fun reset() { // Reset intermediate state for next aggregation - previousCpuActiveTime = cpuActiveTime - previousCpuIdleTime = cpuIdleTime - previousCpuStealTime = cpuStealTime - previousCpuLostTime = cpuLostTime - previousPowerTotal = powerTotal - previousUptime = uptime - previousDowntime = downtime - - guestsTerminated = 0 - guestsRunning = 0 - guestsError = 0 - guestsInvalid = 0 - - cpuLimit = 0.0 - cpuUsage = 0.0 - cpuDemand = 0.0 - cpuUtilization = 0.0 - - powerUsage = 0.0 - - return data - } - - /** - * Convert the aggregator state to an immutable [HostData] instance. - */ - private fun toHostData(now: Instant): HostData { - return HostData( - now, - host, - guestsTerminated, - guestsRunning, - guestsError, - guestsInvalid, - cpuLimit, - cpuUsage, - cpuDemand, - cpuUtilization, - cpuActiveTime - previousCpuActiveTime, - cpuIdleTime - previousCpuIdleTime, - cpuStealTime - previousCpuStealTime, - cpuLostTime - previousCpuLostTime, - powerUsage, - powerTotal - previousPowerTotal, - uptime - previousUptime, - downtime - previousDowntime, - bootTime - ) + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 } /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } /** * An aggregator for server metrics before they are reported. */ - internal class ServerAggregator(attributes: Attributes) { + internal class ServerAggregator(attributes: Attributes) : ServerTableReader { /** * The static information about this server. */ - val server = ServerInfo( + override val server = ServerInfo( attributes[ResourceAttributes.HOST_ID]!!, attributes[ResourceAttributes.HOST_NAME]!!, attributes[ResourceAttributes.HOST_TYPE]!!, @@ -412,70 +437,76 @@ public class ComputeMetricAggregator { /** * The [HostInfo] of the host on which the server is hosted. */ - @JvmField var host: HostInfo? = null + override val host: HostInfo? + get() = _host + @JvmField var _host: HostInfo? = null - private var timestamp = Long.MIN_VALUE - @JvmField var uptime: Long = 0 + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + @JvmField var _uptime: Long = 0 private var previousUptime = 0L - @JvmField var downtime: Long = 0 + + override val downtime: Long + get() = _downtime - previousDowntime + @JvmField var _downtime: Long = 0 private var previousDowntime = 0L - @JvmField var provisionTime: Instant? = null - @JvmField var bootTime: Instant? = null - @JvmField var cpuLimit = 0.0 - @JvmField var cpuActiveTime = 0L - @JvmField var cpuIdleTime = 0L - @JvmField var cpuStealTime = 0L - @JvmField var cpuLostTime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + @JvmField var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + @JvmField var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + @JvmField var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + @JvmField var _cpuActiveTime = 0L private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + @JvmField var _cpuIdleTime = 0L private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + @JvmField var _cpuStealTime = 0L private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + @JvmField var _cpuLostTime = 0L private var previousCpuLostTime = 0L /** * Finish the aggregation for this cycle. */ - fun collect(): ServerData { - val now = Instant.ofEpochMilli(timestamp) - val data = toServerData(now) - - previousUptime = uptime - previousDowntime = downtime + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime previousCpuActiveTime = cpuActiveTime previousCpuIdleTime = cpuIdleTime previousCpuStealTime = cpuStealTime previousCpuLostTime = cpuLostTime - host = null - cpuLimit = 0.0 - - return data - } - - /** - * Convert the aggregator state into an immutable [ServerData]. - */ - private fun toServerData(now: Instant): ServerData { - return ServerData( - now, - server, - host, - uptime - previousUptime, - downtime - previousDowntime, - provisionTime, - bootTime, - cpuLimit, - cpuActiveTime - previousCpuActiveTime, - cpuIdleTime - previousCpuIdleTime, - cpuStealTime - previousCpuStealTime, - cpuLostTime - previousCpuLostTime - ) + _host = null + _cpuLimit = 0.0 } /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt index d51bcab4..64b5f337 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -22,26 +22,26 @@ package org.opendc.telemetry.compute -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader /** * A monitor that tracks the metrics and events of the OpenDC Compute service. */ public interface ComputeMonitor { /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: ServerData) {} + public fun record(reader: ServerTableReader) {} /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: HostData) {} + public fun record(reader: HostTableReader) {} /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: ServiceData) {} + public fun record(reader: ServiceTableReader) {} } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index ce89061b..41315b15 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -24,6 +24,8 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.telemetry.compute.table.toServiceData /** * Collect the metrics of the compute service. @@ -32,8 +34,8 @@ public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { lateinit var serviceData: ServiceData val agg = ComputeMetricAggregator() val monitor = object : ComputeMonitor { - override fun record(data: ServiceData) { - serviceData = data + override fun record(reader: ServiceTableReader) { + serviceData = reader.toServiceData() } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt deleted file mode 100644 index 8e787b97..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -import java.time.Instant - -/** - * A trace entry for a particular host. - */ -public data class HostData( - val timestamp: Instant, - val host: HostInfo, - val guestsTerminated: Int, - val guestsRunning: Int, - val guestsError: Int, - val guestsInvalid: Int, - val cpuLimit: Double, - val cpuUsage: Double, - val cpuDemand: Double, - val cpuUtilization: Double, - val cpuActiveTime: Long, - val cpuIdleTime: Long, - val cpuStealTime: Long, - val cpuLostTime: Long, - val powerUsage: Double, - val powerTotal: Double, - val uptime: Long, - val downtime: Long, - val bootTime: Instant? -) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt new file mode 100644 index 00000000..1e1ad94e --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt deleted file mode 100644 index 6fd2a81b..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -import java.time.Instant - -/** - * A trace entry for a particular server. - */ -public data class ServerData( - val timestamp: Instant, - val server: ServerInfo, - val host: HostInfo?, - val uptime: Long, - val downtime: Long, - val provisionTime: Instant?, - val bootTime: Instant?, - val cpuLimit: Double, - val cpuActiveTime: Long, - val cpuIdleTime: Long, - val cpuStealTime: Long, - val cpuLostTime: Long, -) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt new file mode 100644 index 00000000..c23d1467 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index 6db1399d..39bf96f4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -37,3 +37,10 @@ public data class ServiceData( val attemptsFailure: Int, val attemptsError: Int ) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt new file mode 100644 index 00000000..908f6748 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a service trace entry. + */ +public interface ServiceTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt index 7913660d..d39a0c74 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt @@ -24,8 +24,8 @@ package org.opendc.web.runner import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader import kotlin.math.max import kotlin.math.roundToLong @@ -33,24 +33,24 @@ import kotlin.math.roundToLong * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ class WebComputeMetricExporter : ComputeMetricExporter() { - override fun record(data: HostData) { - val slices = data.downtime / SLICE_LENGTH + override fun record(reader: HostTableReader) { + val slices = reader.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalActiveTime + data.cpuActiveTime, - hostAggregateMetrics.totalIdleTime + data.cpuIdleTime, - hostAggregateMetrics.totalStealTime + data.cpuStealTime, - hostAggregateMetrics.totalLostTime + data.cpuLostTime, - hostAggregateMetrics.totalPowerDraw + data.powerTotal, + hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, + hostAggregateMetrics.totalStealTime + reader.cpuStealTime, + hostAggregateMetrics.totalLostTime + reader.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + reader.powerTotal, hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices + hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices ) - hostMetrics.compute(data.host.id) { _, prev -> + hostMetrics.compute(reader.host.id) { _, prev -> HostMetrics( - data.cpuUsage + (prev?.cpuUsage ?: 0.0), - data.cpuDemand + (prev?.cpuDemand ?: 0.0), - data.guestsRunning + (prev?.instanceCount ?: 0), + reader.cpuUsage + (prev?.cpuUsage ?: 0.0), + reader.cpuDemand + (prev?.cpuDemand ?: 0.0), + reader.guestsRunning + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) } @@ -79,13 +79,13 @@ class WebComputeMetricExporter : ComputeMetricExporter() { private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() - override fun record(data: ServiceData) { + override fun record(reader: ServiceTableReader) { serviceMetrics = AggregateServiceMetrics( - max(data.attemptsSuccess, serviceMetrics.vmTotalCount), - max(data.serversPending, serviceMetrics.vmWaitingCount), - max(data.serversActive, serviceMetrics.vmActiveCount), + max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), + max(reader.serversPending, serviceMetrics.vmWaitingCount), + max(reader.serversActive, serviceMetrics.vmActiveCount), max(0, serviceMetrics.vmInactiveCount), - max(data.attemptsFailure, serviceMetrics.vmFailedCount), + max(reader.attemptsFailure, serviceMetrics.vmFailedCount), ) } -- cgit v1.2.3 From ba310a3545c9631e1e4ff61a0a1759228ec5cf63 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 16 Oct 2021 16:52:49 +0200 Subject: fix(simulator): Use correct flow input capacity for counters This change fixes an issue with the FlowMultiplexer implementation where the capacity of each flow input was equal to the capacity of all flow outputs. Now, the user can specify the capacity of the input, which will be used to correctly compute the active and idle time. --- .../compute/kernel/SimAbstractHypervisor.kt | 36 ++++++++++++---------- .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 10 +++++- .../flow/mux/ForwardingFlowMultiplexer.kt | 2 ++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 20 ++++++++++-- 4 files changed, 48 insertions(+), 20 deletions(-) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 90bf5e25..eda59d2d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -144,7 +144,7 @@ public abstract class SimAbstractHypervisor( */ private inner class VirtualMachine( model: MachineModel, - private val interferenceId: String? = null + interferenceId: String? = null ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { /** * The interference key of this virtual machine. @@ -154,24 +154,14 @@ public abstract class SimAbstractHypervisor( /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) } + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) } /** * The resource counters associated with the hypervisor. */ override val counters: SimHypervisorCounters get() = _counters - private val _counters = object : SimHypervisorCounters { - private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 - - override val cpuActiveTime: Long - get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() - override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() - override val cpuStealTime: Long - get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() - override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong() - } + private val _counters = VmCountersImpl(cpus) /** * The CPU capacity of the hypervisor in MHz. @@ -235,9 +225,7 @@ public abstract class SimAbstractHypervisor( ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity - set(_) { - // Ignore capacity changes - } + set(_) = TODO("Capacity changes on vCPU not supported") override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" @@ -311,4 +299,20 @@ public abstract class SimAbstractHypervisor( cpuTime[3] += (interferenceDelta * d).roundToLong() } } + + /** + * A [SimHypervisorCounters] implementation for a virtual machine. + */ + private class VmCountersImpl(private val cpus: List) : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() + override val cpuIdleTime: Long + get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + override val cpuStealTime: Long + get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() + override val cpuLostTime: Long + get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 04ba7f21..a7877546 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -62,12 +62,20 @@ public interface FlowMultiplexer { public val counters: FlowCounters /** - * Create a new input on this multiplexer. + * Create a new input on this multiplexer with a coupled capacity. * * @param key The key of the interference member to which the input belongs. */ public fun newInput(key: InterferenceKey? = null): FlowConsumer + /** + * Create a new input on this multiplexer with the specified [capacity]. + * + * @param capacity The capacity of the input. + * @param key The key of the interference member to which the input belongs. + */ + public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer + /** * Remove [input] from this multiplexer. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 125d10fe..b68a8baa 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -78,6 +78,8 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return input } + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key) + override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index a0fb8a4e..3d26efda 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -86,7 +86,15 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) + return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key) + } + + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer { + return newInput(isCoupled = false, capacity, key) + } + + private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer { + val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity) _inputs.add(provider) return provider } @@ -206,7 +214,10 @@ public class MaxMinFlowMultiplexer( // Disable timers and convergence of the source if one of the output manages it input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput - input.capacity = capacity + + if (input.isCoupled) { + input.capacity = capacity + } trigger(_clock.millis()) } @@ -340,7 +351,9 @@ public class MaxMinFlowMultiplexer( capacity = newCapacity for (input in _activeInputs) { - input.capacity = newCapacity + if (input.isCoupled) { + input.capacity = newCapacity + } } // Sort outputs by their capacity @@ -495,6 +508,7 @@ public class MaxMinFlowMultiplexer( private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, @JvmField val key: InterferenceKey?, + @JvmField val isCoupled: Boolean, initialCapacity: Double, ) : FlowConsumer, FlowConsumerLogic, Comparable { /** -- cgit v1.2.3 From 86c65e875b7dde8872dc81a37aa9dca72eee7782 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 19 Oct 2021 17:27:01 +0200 Subject: refactor(simulator): Support running workloads without coroutines This change updates the SimMachine interface to drop the coroutine requirement for running a workload on a machines. Users can now asynchronously start a workload and receive notifications via the workload callbacks. Users still have the possibility to suspend execution during workload execution by using the new `runWorkload` method, which is implemented on top of the new `startWorkload` primitive. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 59 +++--- .../org/opendc/compute/simulator/internal/Guest.kt | 8 +- .../opendc/experiments/tf20/core/SimTFDevice.kt | 7 +- .../faas/service/deployer/FunctionInstance.kt | 4 +- .../opendc/faas/simulator/SimFunctionDeployer.kt | 5 +- .../opendc-simulator-compute/build.gradle.kts | 2 +- .../simulator/compute/SimMachineBenchmarks.kt | 32 ++-- .../org/opendc/simulator/compute/Coroutines.kt | 69 +++++++ .../opendc/simulator/compute/SimAbstractMachine.kt | 115 ++++++----- .../org/opendc/simulator/compute/SimMachine.kt | 17 +- .../compute/kernel/SimAbstractHypervisor.kt | 81 ++++++-- .../simulator/compute/kernel/SimHypervisor.kt | 9 +- .../simulator/compute/workload/SimFlopsWorkload.kt | 2 + .../compute/workload/SimRuntimeWorkload.kt | 2 + .../simulator/compute/workload/SimTraceWorkload.kt | 2 + .../simulator/compute/workload/SimWorkload.kt | 7 + .../compute/workload/SimWorkloadLifecycle.kt | 53 ++++-- .../org/opendc/simulator/compute/SimMachineTest.kt | 211 ++++++++------------- .../compute/kernel/SimFairShareHypervisorTest.kt | 45 ++--- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 63 +++--- .../compute/workload/SimTraceWorkloadTest.kt | 35 +--- 21 files changed, 478 insertions(+), 350 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 10faeb5b..908a58e9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -47,6 +47,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext @@ -136,11 +137,6 @@ public class SimHost( } } - /** - * The [Job] that represents the machine running the hypervisor. - */ - private var _job: Job? = null - init { launch() @@ -199,11 +195,12 @@ public class SimHost( val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name) val newGuest = Guest( scope.coroutineContext, clock, this, + hypervisor, mapper, guestListener, server, @@ -249,7 +246,7 @@ public class SimHost( override fun close() { reset() scope.cancel() - machine.close() + machine.cancel() } override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" @@ -275,27 +272,40 @@ public class SimHost( } } + /** + * The [Job] that represents the machine running the hypervisor. + */ + private var _ctx: SimMachineContext? = null + /** * Launch the hypervisor. */ private fun launch() { - check(_job == null) { "Concurrent hypervisor running" } + check(_ctx == null) { "Concurrent hypervisor running" } // Launch hypervisor onto machine - _job = scope.launch { - try { - _bootTime = clock.millis() - _state = HostState.UP - machine.run(hypervisor, emptyMap()) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - logger.error(cause) { "Host failed" } - throw cause - } finally { - _state = HostState.DOWN + _ctx = machine.startWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + _bootTime = clock.millis() + _state = HostState.UP + hypervisor.onStart(ctx) + } catch (cause: Throwable) { + _state = HostState.DOWN + _ctx = null + throw cause + } } - } + + override fun onStop(ctx: SimMachineContext) { + try { + hypervisor.onStop(ctx) + } finally { + _state = HostState.DOWN + _ctx = null + } + } + }) } /** @@ -305,12 +315,7 @@ public class SimHost( updateUptime() // Stop the hypervisor - val job = _job - if (job != null) { - job.cancel() - _job = null - } - + _ctx?.close() _state = HostState.DOWN } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 61b3214e..9f3122db 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -34,7 +34,9 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimVirtualMachine +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -46,6 +48,7 @@ internal class Guest( context: CoroutineContext, private val clock: Clock, val host: SimHost, + private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper, private val listener: GuestListener, val server: Server, @@ -114,8 +117,7 @@ internal class Guest( stop() state = ServerState.DELETED - - machine.close() + hypervisor.removeMachine(machine) scope.cancel() } @@ -191,7 +193,7 @@ internal class Guest( */ private suspend fun runMachine(workload: SimWorkload) { delay(1) // TODO Introduce model for boot time - machine.run(workload, mapOf("driver" to host, "server" to server)) + machine.runWorkload(workload, mapOf("driver" to host, "server" to server)) } /** diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index fb36d2c7..1752802f 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -34,6 +34,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* import java.time.Clock @@ -128,6 +129,8 @@ public class SimTFDevice( } } + override fun onStop(ctx: SimMachineContext) {} + override fun onStart(conn: FlowConnection, now: Long) { ctx = conn capacity = conn.capacity @@ -172,7 +175,7 @@ public class SimTFDevice( init { scope.launch { - machine.run(workload) + machine.runWorkload(workload) } } @@ -189,7 +192,7 @@ public class SimTFDevice( } override fun close() { - machine.close() + machine.cancel() scope.cancel() } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt index a8b04df4..77eadbbe 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt @@ -25,9 +25,9 @@ package org.opendc.faas.service.deployer import org.opendc.faas.service.FunctionObject /** - * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions. + * A [FunctionInstance] is a self-contained worker—typically a container—capable of handling function executions. * - * Multiple, concurrent function instances can exists for a single function, for scalability purposes. + * Multiple, concurrent function instances can exist for a single function, for scalability purposes. */ public interface FunctionInstance : AutoCloseable { /** diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 020d75b5..68233c1a 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -36,6 +36,7 @@ import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.flow.FlowEngine import java.time.Clock import java.util.ArrayDeque @@ -114,7 +115,7 @@ public class SimFunctionDeployer( override fun close() { state = FunctionInstanceState.Deleted stop() - machine.close() + machine.cancel() } override fun toString(): String = "FunctionInstance[state=$state]" @@ -130,7 +131,7 @@ public class SimFunctionDeployer( launch { try { - machine.run(workload) + machine.runWorkload(workload) } finally { state = FunctionInstanceState.Deleted } diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index a2bb89c2..ca8b912a 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -35,7 +35,7 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcUtils) + implementation(libs.kotlin.logging) testImplementation(libs.slf4j.simple) } diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index cb52d24f..91e91f9d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -76,7 +76,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - return@runBlockingSimulation machine.run(SimTraceWorkload(trace)) + return@runBlockingSimulation machine.runWorkload(SimTraceWorkload(trace)) } } @@ -89,15 +89,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -111,15 +111,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -133,22 +133,22 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } coroutineScope { repeat(2) { - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) launch { try { - vm.run(SimTraceWorkload(trace)) + vm.runWorkload(SimTraceWorkload(trace)) } finally { - machine.close() + machine.cancel() } } } } - machine.close() + machine.cancel() } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt new file mode 100644 index 00000000..c23f48dc --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.simulator.compute.workload.SimWorkload +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. + */ +public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map = emptyMap()) { + return suspendCancellableCoroutine { cont -> + cont.invokeOnCancellation { this@runWorkload.cancel() } + + startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + workload.onStart(ctx) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + try { + workload.onStop(ctx) + + if (!cont.isCompleted) { + cont.resume(Unit) + } + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + }, + meta + ) + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 5909d980..6a4c594d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* +import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral import org.opendc.simulator.compute.model.MachineModel @@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. @@ -71,56 +70,20 @@ public abstract class SimAbstractMachine( */ public override val peripherals: List = net.map { it as SimNetworkAdapter } - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - /** * The current active [Context]. */ private var _ctx: Context? = null - /** - * This method is invoked when the machine is started. - */ - protected open fun onStart(ctx: SimMachineContext) {} - - /** - * This method is invoked when the machine is stopped. - */ - protected open fun onStop(ctx: SimMachineContext) { - _ctx = null - } - - /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map) { - check(!isTerminated) { "Machine is terminated" } + override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { check(_ctx == null) { "A machine cannot run concurrently" } - return suspendCancellableCoroutine { cont -> - val ctx = Context(meta, cont) - _ctx = ctx - - // Cancel all cpus on cancellation - cont.invokeOnCancellation { ctx.close() } - - engine.batch { - onStart(ctx) - - workload.onStart(ctx) - } - } + val ctx = Context(workload, meta) + ctx.start() + return ctx } - override fun close() { - if (isTerminated) { - return - } - - isTerminated = true + override fun cancel() { _ctx?.close() } @@ -130,15 +93,33 @@ public abstract class SimAbstractMachine( /** * The execution context in which the workload runs. + * + * @param workload The workload that is running on the machine. + * @param meta The metadata passed to the workload. */ - private inner class Context(override val meta: Map, private val cont: Continuation) : SimMachineContext { + private inner class Context( + private val workload: SimWorkload, + override val meta: Map + ) : SimMachineContext { /** * A flag to indicate that the context has been closed. */ private var isClosed = false - override val engine: FlowEngine - get() = this@SimAbstractMachine.engine + override val engine: FlowEngine = this@SimAbstractMachine.engine + + /** + * Start this context. + */ + fun start() { + try { + _ctx = this + engine.batch { workload.onStart(this) } + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStart callback" } + close() + } + } override val cpus: List = this@SimAbstractMachine.cpus @@ -154,15 +135,43 @@ public abstract class SimAbstractMachine( } isClosed = true + assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" } + _ctx = null + + // Cancel all the resources associated with the machine + doCancel() + + try { + workload.onStop(this) + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStop callback" } + } + } + + /** + * Run the stop procedures for the resources associated with the machine. + */ + private fun doCancel() { engine.batch { for (cpu in cpus) { cpu.cancel() } - } - onStop(this) - cont.resume(Unit) + memory.cancel() + + for (ifx in net) { + (ifx as NetworkAdapterImpl).disconnect() + } + + for (storage in storage) { + val impl = storage as StorageDeviceImpl + impl.read.cancel() + impl.write.cancel() + } + } } + + override fun toString(): String = "SimAbstractMachine.Context" } /** @@ -218,4 +227,12 @@ public abstract class SimAbstractMachine( override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]" } + + private companion object { + /** + * The logging instance associated with this class. + */ + @JvmStatic + private val logger = KotlinLogging.logger {} + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ab0b56ae..94581e89 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A generic machine that is able to run a [SimWorkload]. */ -public interface SimMachine : AutoCloseable { +public interface SimMachine { /** * The model of the machine containing its specifications. */ @@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable { public val peripherals: List /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Start the specified [SimWorkload] on this machine. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - public suspend fun run(workload: SimWorkload, meta: Map = emptyMap()) + public fun startWorkload(workload: SimWorkload, meta: Map = emptyMap()): SimMachineContext /** - * Terminate this machine. + * Cancel the workload that is currently running on this machine. + * + * If no workload is active, this operation is a no-op. */ - public override fun close() + public fun cancel() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index eda59d2d..07465126 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -28,6 +28,7 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer @@ -93,13 +94,20 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf() /* SimHypervisor */ - override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { + override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } val vm = VirtualMachine(model, interferenceId) _vms.add(vm) return vm } + override fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx @@ -122,6 +130,8 @@ public abstract class SimAbstractHypervisor( } } + override fun onStop(ctx: SimMachineContext) {} + private var _cpuCount = 0 private var _cpuCapacity = 0.0 @@ -145,11 +155,16 @@ public abstract class SimAbstractHypervisor( private inner class VirtualMachine( model: MachineModel, interferenceId: String? = null - ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + /** * The interference key of this virtual machine. */ - private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } + private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. @@ -181,36 +196,60 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) - override fun onStart(ctx: SimMachineContext) { - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.join(interferenceKey) - } - - super.onStart(ctx) + override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + joinInterferenceDomain() + workload.onStart(ctx) + } catch (cause: Throwable) { + leaveInterferenceDomain() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + leaveInterferenceDomain() + workload.onStop(ctx) + } + }, + meta + ) } - override fun onStop(ctx: SimMachineContext) { - super.onStop(ctx) - - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) + override fun close() { + if (isClosed) { + return } - } - override fun close() { - super.close() + isClosed = true + cancel() for (cpu in cpus) { cpu.close() } + } - _vms.remove(this) + /** + * Join the interference domain of the hypervisor. + */ + private fun joinInterferenceDomain() { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + } + /** + * Leave the interference domain of the hypervisor. + */ + private fun leaveInterferenceDomain() { val interferenceKey = interferenceKey if (interferenceKey != null) { - interferenceDomain?.removeKey(interferenceKey) + interferenceDomain?.leave(interferenceKey) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 57d4cf20..a69f419f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload { * @param model The machine to create. * @param interferenceId An identifier for the interference model. */ - public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + + /** + * Remove the specified [machine] from the hypervisor. + * + * @param machine The machine to remove. + */ + public fun removeMachine(machine: SimVirtualMachine) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 99f4a1e1..726d1f56 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -48,5 +48,7 @@ public class SimFlopsWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 2ef3bc43..8a3f5f84 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -48,5 +48,7 @@ public class SimRuntimeWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 53c98409..ce04a790 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimTraceWorkload" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index b80665fa..61c6e2ad 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -37,4 +37,11 @@ public interface SimWorkload { * @param ctx The execution context in which the machine runs. */ public fun onStart(ctx: SimMachineContext) + + /** + * This method is invoked when the workload is stopped. + * + * @param ctx The execution context in which the machine runs. + */ + public fun onStop(ctx: SimMachineContext) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index cc4f1f6a..742470a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { /** * The resource consumers which represent the lifecycle of the workload. */ - private val waiting = mutableSetOf() + private val waiting = HashSet() /** - * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. + * Wait for the specified [source] to complete before ending the lifecycle of the workload. */ - public fun waitFor(consumer: FlowSource): FlowSource { - waiting.add(consumer) - return object : FlowSource by consumer { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - try { - consumer.onStop(conn, now, delta) - } finally { - complete(consumer) - } - } - override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" - } + public fun waitFor(source: FlowSource): FlowSource { + val wrapper = Wrapper(source) + waiting.add(wrapper) + return wrapper } /** - * Complete the specified [FlowSource]. + * Complete the specified [Wrapper]. */ - private fun complete(consumer: FlowSource) { - if (waiting.remove(consumer) && waiting.isEmpty()) { + private fun complete(wrapper: Wrapper) { + if (waiting.remove(wrapper) && waiting.isEmpty()) { ctx.close() } } + + /** + * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed. + */ + private inner class Wrapper(private val delegate: FlowSource) : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + delegate.onStart(conn, now) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + delegate.onConverge(conn, now, delta) + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate.onStop(conn, now, delta) + } finally { + complete(this) + } + } + + override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]" + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 0bb24ed8..644eb497 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -65,14 +65,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two cores execute 1000 MFlOps per second (1000 ms) - assertEquals(1000, clock.millis()) - } finally { - machine.close() - } + // Two cores execute 1000 MFlOps per second (1000 ms) + assertEquals(1000, clock.millis()) } @Test @@ -88,14 +84,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two sockets with two cores execute 2000 MFlOps per second (500 ms) - assertEquals(500, clock.millis()) - } finally { - machine.close() - } + // Two sockets with two cores execute 2000 MFlOps per second (500 ms) + assertEquals(500, clock.millis()) } @Test @@ -109,16 +101,12 @@ class SimMachineTest { val source = SimPowerSource(engine, capacity = 1000.0) source.connect(machine.psu) - try { - coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - assertAll( - { assertEquals(100.0, machine.psu.powerDraw) }, - { assertEquals(100.0, source.powerDraw) } - ) - } - } finally { - machine.close() + coroutineScope { + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } + assertAll( + { assertEquals(100.0, machine.psu.powerDraw) }, + { assertEquals(100.0, source.powerDraw) } + ) } } @@ -130,22 +118,20 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val cpu = ctx.cpus[0] - - cpu.capacity = cpu.model.frequency + 1000.0 - assertEquals(cpu.model.frequency, cpu.capacity) - cpu.capacity = -1.0 - assertEquals(0.0, cpu.capacity) - - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val cpu = ctx.cpus[0] + + cpu.capacity = cpu.model.frequency + 1000.0 + assertEquals(cpu.model.frequency, cpu.capacity) + cpu.capacity = -1.0 + assertEquals(0.0, cpu.capacity) + + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -156,16 +142,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - assertEquals(32_000 * 4.0, ctx.memory.capacity) - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + assertEquals(32_000 * 4.0, ctx.memory.capacity) + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -176,18 +160,16 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -202,19 +184,17 @@ class SimMachineTest { val adapter = (machine.peripherals[0] as SimNetworkAdapter) adapter.connect(SimNetworkSink(engine, adapter.bandwidth)) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val iface = ctx.net[0] - iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val iface = ctx.net[0] + iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -226,19 +206,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -250,19 +228,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -275,13 +251,11 @@ class SimMachineTest { try { coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } cancel() } } catch (_: CancellationException) { // Ignore - } finally { - machine.close() } assertEquals(0, clock.millis()) @@ -295,31 +269,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - coroutineScope { - launch { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + coroutineScope { + launch { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) + } - assertThrows { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + assertThrows { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } - } finally { - machine.close() } } - - @Test - fun testClose() = runBlockingSimulation { - val machine = SimBareMetalMachine( - FlowEngine(coroutineContext, clock), - machineModel, - SimplePowerDriver(ConstantPowerModel(0.0)) - ) - - machine.close() - assertDoesNotThrow { machine.close() } - assertThrows { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index b7f5bf8e..91855e8d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -37,6 +37,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -80,16 +81,16 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) println("Hypervisor finished") } yield() - val vm = hypervisor.createMachine(model) - vm.run(workloadA) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) yield() - machine.close() + machine.cancel() assertAll( { assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") }, @@ -131,22 +132,22 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, null, null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } yield() coroutineScope { launch { - val vm = hypervisor.createMachine(model) - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model) - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } yield() - machine.close() + machine.cancel() yield() assertAll( @@ -171,11 +172,11 @@ internal class SimFairShareHypervisorTest { assertDoesNotThrow { launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } } - machine.close() + machine.cancel() } @Test @@ -219,20 +220,20 @@ internal class SimFairShareHypervisorTest { ) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } coroutineScope { launch { - val vm = hypervisor.createMachine(model, "a") - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model, "a") + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model, "b") - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model, "b") + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 02d308ff..823a0ae3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -36,6 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -76,13 +77,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } - val vm = hypervisor.createMachine(machineModel) - vm.run(workloadA) + launch { machine.runWorkload(hypervisor) } + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workloadA) yield() - vm.close() - machine.close() + hypervisor.removeMachine(vm) + machine.cancel() assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } @@ -98,12 +99,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - vm.close() - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + hypervisor.removeMachine(vm) + + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -121,11 +123,11 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -142,19 +144,20 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(SimRuntimeWorkload(duration)) - vm.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm) yield() - val vm2 = hypervisor.createMachine(machineModel) - vm2.run(SimRuntimeWorkload(duration)) - vm2.close() - machine.close() + val vm2 = hypervisor.newMachine(machineModel) + vm2.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm2) + + machine.cancel() assertEquals(duration * 2, clock.millis()) { "Took enough time" } } @@ -168,17 +171,17 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel) + hypervisor.newMachine(machineModel) assertAll( { assertFalse(hypervisor.canFit(machineModel)) }, - { assertThrows { hypervisor.createMachine(machineModel) } } + { assertThrows { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } /** @@ -192,16 +195,16 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel).close() + hypervisor.removeMachine(hypervisor.newMachine(machineModel)) assertAll( { assertTrue(hypervisor.canFit(machineModel)) }, - { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } + { assertDoesNotThrow { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index 574860e8..aa91984a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -30,6 +30,7 @@ import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -67,13 +68,9 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -94,13 +91,9 @@ class SimTraceWorkloadTest { offset = 1000 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(5000, clock.millis()) - } finally { - machine.close() - } + assertEquals(5000, clock.millis()) } @Test @@ -121,14 +114,10 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - delay(1000L) - machine.run(workload) + delay(1000L) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -149,12 +138,8 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } } -- cgit v1.2.3