From 6a2a5423479696e8dc28885be27cc3e3252f28b0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 30 Dec 2020 14:03:12 +0100 Subject: simulator: Add generic framework for resource consumption modeling This change adds a generic framework for modeling resource consumptions and adapts opendc-simulator-compute to model machines and VMs on top of this framework. This framework anticipates the addition of additional resource types such as memory, disk and network to the OpenDC codebase. --- .../src/main/kotlin/jacoco-conventions.gradle.kts | 26 -- .../kotlin/kotlin-library-conventions.gradle.kts | 1 + .../kotlin/org/opendc/compute/simulator/SimHost.kt | 4 +- .../org/opendc/compute/simulator/SimHostTest.kt | 12 +- .../environment/sc18/Sc18EnvironmentReader.kt | 16 +- .../sc20/Sc20ClusterEnvironmentReader.kt | 12 +- .../environment/sc20/Sc20EnvironmentReader.kt | 16 +- .../kotlin/org/opendc/runner/web/TopologyParser.kt | 12 +- .../opendc-simulator-compute/build.gradle.kts | 1 + .../simulator/compute/SimBareMetalMachine.kt | 276 ++++---------------- .../simulator/compute/SimExecutionContext.kt | 55 ---- .../simulator/compute/SimFairShareHypervisor.kt | 263 +++++++------------ .../opendc/simulator/compute/SimMachineContext.kt | 62 +++++ .../opendc/simulator/compute/SimMachineModel.kt | 8 +- .../simulator/compute/SimSpaceSharedHypervisor.kt | 199 +++++++------- .../opendc/simulator/compute/model/MemoryUnit.kt | 38 --- .../simulator/compute/model/ProcessingNode.kt | 38 --- .../simulator/compute/model/ProcessingUnit.kt | 36 --- .../simulator/compute/model/SimMemoryUnit.kt | 43 ++++ .../simulator/compute/model/SimProcessingNode.kt | 38 +++ .../simulator/compute/model/SimProcessingUnit.kt | 41 +++ .../simulator/compute/workload/SimFlopsWorkload.kt | 46 ++-- .../compute/workload/SimResourceCommand.kt | 52 ---- .../compute/workload/SimRuntimeWorkload.kt | 34 ++- .../simulator/compute/workload/SimTraceWorkload.kt | 52 ++-- .../simulator/compute/workload/SimWorkload.kt | 27 +- .../opendc/simulator/compute/SimHypervisorTest.kt | 12 +- .../org/opendc/simulator/compute/SimMachineTest.kt | 86 +------ .../compute/SimSpaceSharedHypervisorTest.kt | 63 ++++- .../opendc-simulator-resources/build.gradle.kts | 37 +++ .../resources/SimAbstractResourceContext.kt | 255 ++++++++++++++++++ .../org/opendc/simulator/resources/SimResource.kt | 33 +++ .../simulator/resources/SimResourceCommand.kt | 52 ++++ .../simulator/resources/SimResourceConsumer.kt | 45 ++++ .../simulator/resources/SimResourceContext.kt | 46 ++++ .../simulator/resources/SimResourceProvider.kt | 45 ++++ .../simulator/resources/SimResourceSource.kt | 133 ++++++++++ .../simulator/resources/SimResourceCommandTest.kt | 74 ++++++ .../simulator/resources/SimResourceSourceTest.kt | 285 +++++++++++++++++++++ .../main/kotlin/org/opendc/utils/TimerScheduler.kt | 2 +- simulator/settings.gradle.kts | 1 + 41 files changed, 1633 insertions(+), 944 deletions(-) delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt delete mode 100644 simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt create mode 100644 simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt (limited to 'simulator') diff --git a/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts index 544e34bf..e0bc2ce4 100644 --- a/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts +++ b/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts @@ -1,29 +1,3 @@ -import org.gradle.kotlin.dsl.`java-library` -import org.gradle.kotlin.dsl.jacoco -import org.gradle.kotlin.dsl.kotlin - -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - /* * Copyright (c) 2021 AtLarge Research * diff --git a/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts index 8d6420be..ab13215b 100644 --- a/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts +++ b/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts @@ -46,5 +46,6 @@ kotlin { tasks.withType().configureEach { kotlinOptions.jvmTarget = Versions.jvmTarget.toString() + kotlinOptions.useIR = true kotlinOptions.freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn" } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 19fa3e97..0da81152 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -35,7 +35,7 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.simulator.compute.* import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.SimMemoryUnit import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow import java.time.Clock @@ -216,7 +216,7 @@ public class SimHost( val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + val memoryUnits = listOf(SimMemoryUnit("Generic", "Generic", 3200.0, memorySize)) return SimMachineModel(processingUnits, memoryUnits) } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index e1a1d87e..a45ab9fc 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -40,9 +40,9 @@ import org.opendc.compute.api.ServerWatcher import org.opendc.compute.service.driver.HostEvent import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import java.time.Clock @@ -62,11 +62,11 @@ internal class SimHostTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 3da8d0b3..85a2e413 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import java.io.InputStream import java.util.* @@ -61,12 +61,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja val cores = machine.cpus.flatMap { id -> when (id) { 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } + val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) } } 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } + val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) } } else -> throw IllegalArgumentException("The cpu id $id is not recognized") } @@ -75,7 +75,7 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja UUID(0L, counter++.toLong()), "node-$counter", emptyMap(), - SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), + SimMachineModel(cores, listOf(SimMemoryUnit("", "", 2300.0, 16000))), ConstantPowerModel(0.0) ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 9a06a40f..094bc975 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -26,9 +26,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import java.io.File import java.io.FileInputStream import java.io.InputStream @@ -88,8 +88,8 @@ public class Sc20ClusterEnvironmentReader( memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L coresPerHost = values[coresPerHostCol].trim().toInt() - val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) - val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + val unknownProcessingNode = SimProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = SimMemoryUnit("unknown", "unknown", -1.0, memoryPerHost) repeat(numberOfHosts) { nodes.add( @@ -99,7 +99,7 @@ public class Sc20ClusterEnvironmentReader( mapOf("cluster" to clusterId), SimMachineModel( List(coresPerHost) { coreId -> - ProcessingUnit(unknownProcessingNode, coreId, speed) + SimProcessingUnit(unknownProcessingNode, coreId, speed) }, listOf(unknownMemoryUnit) ), diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index effd0286..87a49f49 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import java.io.InputStream import java.util.* @@ -60,19 +60,19 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja val cores = machine.cpus.flatMap { id -> when (id) { 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } + val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) } } 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } + val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) } } else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } val memories = machine.memories.map { id -> when (id) { - 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) + 1 -> SimMemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index e7e99a3d..0ff40a28 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -34,9 +34,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import java.util.* /** @@ -56,13 +56,13 @@ public class TopologyParser(private val collection: MongoCollection, p val cores = cpu.getInteger("numberOfCores") val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() // TODO Remove hardcoding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + val node = SimProcessingNode("Intel", "amd64", cpu.getString("name"), cores) List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) + SimProcessingUnit(node, coreId, speed) } } val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> - MemoryUnit( + SimMemoryUnit( "Samsung", memory.getString("name"), memory.get("speedMbPerS", Number::class.java).toDouble(), diff --git a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts index 19af6fe8..66d7d9e5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -30,5 +30,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-simulator:opendc-simulator-core")) + api(project(":opendc-simulator:opendc-simulator-resources")) implementation(project(":opendc-utils")) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f74c5697..b1d1c0b7 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -25,16 +25,16 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.* import org.opendc.utils.TimerScheduler import java.time.Clock import java.util.* import kotlin.coroutines.* -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A simulated bare-metal machine that is able to run a single workload. @@ -52,261 +52,73 @@ public class SimBareMetalMachine( private val clock: Clock, override val model: SimMachineModel ) : SimMachine { - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ + private val _usage = MutableStateFlow(0.0) override val usage: StateFlow - get() = usageState + get() = _usage /** * A flag to indicate that the machine is terminated. */ private var isTerminated = false - /** - * The [MutableStateFlow] containing the load of the server. - */ - private val usageState = MutableStateFlow(0.0) - - /** - * The current active workload. - */ - private var cont: Continuation? = null - - /** - * The active CPUs of this machine. - */ - private var cpus: List = emptyList() - /** * The [TimerScheduler] to use for scheduling the interrupts. */ - private val scheduler = TimerScheduler(coroutineScope, clock) + private val scheduler = TimerScheduler(coroutineScope, clock) /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The execution context in which the workload runs. */ - override suspend fun run(workload: SimWorkload, meta: Map) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimBareMetalMachine.model - - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override val meta: Map - get() = meta - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - workload.onStart(ctx) + private inner class Context(val map: Map>, + override val meta: Map) : SimMachineContext { + override val clock: Clock + get() = this@SimBareMetalMachine.clock - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.cpus = model.cpus.map { Cpu(ctx, it, workload) } + override val cpus: List = model.cpus - for (cpu in cpus) { - cpu.start() - } - } - } + override val memory: List = model.memory - /** - * Terminate the specified bare-metal machine. - */ - override fun close() { - isTerminated = true - } - - /** - * Update the usage of the machine. - */ - private fun updateUsage() { - usageState.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } - } - - /** - * This method is invoked when one of the CPUs has exited. - */ - private fun onCpuExit(cpu: Int) { - // Check whether all other CPUs have finished - if (cpus.all { it.hasExited }) { - val cont = cont - this.cont = null - cont?.resume(Unit) + override fun interrupt(resource: SimResource) { + val context = map[resource] + checkNotNull(context) { "Invalid resource" } + context.interrupt() } } /** - * This method is invoked when one of the CPUs failed. - */ - private fun onCpuFailure(e: Throwable) { - // Make sure no other tasks will be resumed. - scheduler.cancelAll() - - // In case the flush fails with an exception, immediately propagate to caller, cancelling all other - // tasks. - val cont = cont - this.cont = null - cont?.resumeWithException(e) - } - - /** - * A physical CPU of the machine. + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) { - /** - * The current command. - */ - private var currentCommand: CommandWrapper? = null - - /** - * The actual processing speed. - */ - var speed: Double = 0.0 - set(value) { - field = value - updateUsage() - } - - /** - * A flag to indicate that the CPU is currently processing a command. - */ - var isIntermediate: Boolean = false - - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - /** - * Process the specified [SimResourceCommand] for this CPU. - */ - fun process(command: SimResourceCommand) { - val timestamp = clock.millis() - - val task = when (command) { - is SimResourceCommand.Idle -> { - speed = 0.0 - - val deadline = command.deadline - - require(deadline >= timestamp) { "Deadline already passed" } - - if (deadline != Long.MAX_VALUE) { - scheduler.startSingleTimerTo(this, deadline) { flush() } - } else { - null - } - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline - - require(deadline >= timestamp) { "Deadline already passed" } - - speed = min(model.frequency, limit) - - // The required duration to process all the work - val finishedAt = timestamp + ceil(work / speed * 1000).toLong() - - scheduler.startSingleTimerTo(this, min(finishedAt, deadline)) { flush() } - } - is SimResourceCommand.Exit -> { - speed = 0.0 - hasExited = true + override suspend fun run(workload: SimWorkload, meta: Map): Unit = coroutineScope { + require(!isTerminated) { "Machine is terminated" } + val map = mutableMapOf>() + val ctx = Context(map, meta) + val sources = model.cpus.map { SimResourceSource(it, clock, scheduler) } + val totalCapacity = model.cpus.sumByDouble { it.frequency } - onCpuExit(model.id) + workload.onStart(ctx) - null + for (source in sources) { + val consumer = workload.getConsumer(ctx, source.resource) + val job = source.speed + .onEach { + _usage.value = sources.sumByDouble { it.speed.value } / totalCapacity } - } - - assert(currentCommand == null) { "Concurrent access to current command" } - currentCommand = CommandWrapper(timestamp, command) - } + .launchIn(this) - /** - * Request the workload for more work. - */ - private fun next(remainingWork: Double) { - process(workload.onNext(ctx, model.id, remainingWork)) - } - - /** - * Start the CPU. - */ - fun start() { - try { - isIntermediate = true - - process(workload.onStart(ctx, model.id)) - } catch (e: Throwable) { - onCpuFailure(e) - } finally { - isIntermediate = false - } - } - - /** - * Flush the work performed by the CPU. - */ - fun flush() { - try { - val (timestamp, command) = currentCommand ?: return - - isIntermediate = true - currentCommand = null - - // Cancel the running task and flush the progress - scheduler.cancel(this) - - when (command) { - is SimResourceCommand.Idle -> next(remainingWork = 0.0) - is SimResourceCommand.Consume -> { - val duration = clock.millis() - timestamp - val remainingWork = if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, command.work - processed) - } else { - 0.0 - } - - next(remainingWork) + launch { + source.consume(object : SimResourceConsumer by consumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + map[ctx.resource] = ctx + return consumer.onStart(ctx) } - SimResourceCommand.Exit -> throw IllegalStateException() - } - } catch (e: Throwable) { - onCpuFailure(e) - } finally { - isIntermediate = false + }) + job.cancel() } } - - /** - * Interrupt the CPU. - */ - fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isIntermediate) { - return - } - - flush() - } } - /** - * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. - */ - private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) + override fun close() { + isTerminated = true + scheduler.close() + } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt deleted file mode 100644 index 657dac66..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute - -import java.time.Clock - -/** - * A simulated execution context in which a bootable image runs. This interface represents the - * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on - * which the image runs. - */ -public interface SimExecutionContext { - /** - * The virtual clock tracking simulation time. - */ - public val clock: Clock - - /** - * The machine model of the machine that is running the image. - */ - public val machine: SimMachineModel - - /** - * The metadata associated with the context. - */ - public val meta: Map - - /** - * Ask the host machine to interrupt the specified vCPU. - * - * @param cpu The id of the vCPU to interrupt. - * @throws IllegalArgumentException if the identifier points to a non-existing vCPU. - */ - public fun interrupt(cpu: Int) -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index bf6d8a5e..12b3b428 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -26,10 +26,11 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.compute.workload.SimWorkloadBarrier +import org.opendc.simulator.resources.* import java.time.Clock import kotlin.coroutines.Continuation import kotlin.coroutines.resume @@ -44,22 +45,22 @@ import kotlin.math.min * * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { +public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer { - override fun onStart(ctx: SimExecutionContext) { - val model = ctx.machine + override fun onStart(ctx: SimMachineContext) { this.ctx = ctx - this.commands = Array(model.cpus.size) { SimResourceCommand.Idle() } - this.pCpus = model.cpus.indices.sortedBy { model.cpus[it].frequency }.toIntArray() - this.maxUsage = model.cpus.sumByDouble { it.frequency } - this.barrier = SimWorkloadBarrier(model.cpus.size) + this.commands = Array(ctx.cpus.size) { SimResourceCommand.Idle() } + this.pCpus = ctx.cpus.indices.sortedBy { ctx.cpus[it].frequency }.toIntArray() + this.maxUsage = ctx.cpus.sumByDouble { it.frequency } + this.barrier = SimWorkloadBarrier(ctx.cpus.size) } - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return commands[cpu] + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { + return this } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + val cpu = ctx.resource.id totalRemainingWork += remainingWork val isLast = barrier.enter() @@ -82,6 +83,10 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener } } + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return commands[ctx.resource.id] + } + override fun canFit(model: SimMachineModel): Boolean = true override fun createMachine( @@ -92,7 +97,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * The execution context in which the hypervisor runs. */ - private lateinit var ctx: SimExecutionContext + private lateinit var ctx: SimMachineContext /** * The commands to submit to the underlying host. @@ -199,7 +204,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener val vcpu = vcpuIterator.next() val availableShare = availableSpeed / remaining-- - when (val command = vcpu.command) { + when (val command = vcpu.activeCommand) { is SimResourceCommand.Idle -> { // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) @@ -246,7 +251,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener // Divide the requests over the available capacity of the pCPUs fairly for (i in pCpus) { - val maxCpuUsage = ctx.machine.cpus[i].frequency + val maxCpuUsage = ctx.cpus[i].frequency val fraction = maxCpuUsage / maxUsage val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction) val grantedWork = duration * grantedSpeed @@ -275,7 +280,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener private fun flushGuests() { // Flush all the vCPUs work for (vcpu in vcpus) { - vcpu.flush(interrupt = false) + vcpu.flush(isIntermediate = true) } // Report metrics @@ -299,9 +304,9 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * Interrupt all host CPUs. */ - private fun SimExecutionContext.interruptAll() { - for (i in machine.cpus.indices) { - interrupt(i) + private fun SimMachineContext.interruptAll() { + for (cpu in ctx.cpus) { + interrupt(cpu) } } @@ -336,33 +341,38 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener private var cpus: List = emptyList() /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The execution context in which the workload runs. */ - override suspend fun run(workload: SimWorkload, meta: Map) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model + inner class Context(override val meta: Map) : SimMachineContext { + override val cpus: List + get() = model.cpus - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock + override val memory: List + get() = model.memory - override val meta: Map - get() = meta + override val clock: Clock + get() = this@SimFairShareHypervisor.ctx.clock - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } + override fun interrupt(resource: SimResource) { + TODO() } + } + + lateinit var ctx: SimMachineContext + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map) { + require(!isTerminated) { "Machine is terminated" } + require(cont == null) { "Run should not be called concurrently" } + ctx = Context(meta) workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) } + this.cpus = model.cpus.map { VCpu(this, it, workload.getConsumer(ctx, it), ctx.clock) } for (cpu in cpus) { // Register vCPU to scheduler @@ -387,13 +397,13 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener * Update the usage of the VM. */ fun updateUsage() { - usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.model.frequency } + usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.resource.frequency } } /** * This method is invoked when one of the CPUs has exited. */ - fun onCpuExit(cpu: Int) { + fun onCpuExit() { // Check whether all other CPUs have finished if (cpus.all { it.hasExited }) { val cont = cont @@ -419,19 +429,14 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener */ private inner class VCpu( val vm: SimVm, - val ctx: SimExecutionContext, - val model: ProcessingUnit, - val workload: SimWorkload - ) : Comparable { + resource: SimProcessingUnit, + consumer: SimResourceConsumer, + clock: Clock + ) : SimAbstractResourceContext(resource, clock, consumer), Comparable { /** - * The latest command processed by the CPU. + * The current command that is processed by the vCPU. */ - var command: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The latest timestamp at which the vCPU was flushed. - */ - var latestFlush: Long = 0 + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() /** * The processing speed that is allowed by the model constraints. @@ -447,149 +452,75 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener vm.updateUsage() } - /** - * A flag to indicate that the CPU is currently processing a command. - */ - var isIntermediate: Boolean = false - /** * A flag to indicate that the CPU has exited. */ - val hasExited: Boolean - get() = command is SimResourceCommand.Exit - - /** - * Process the specified [SimResourceCommand] for this CPU. - */ - fun process(command: SimResourceCommand) { - // Assign command as the most recent executed command - this.command = command - - when (command) { - is SimResourceCommand.Idle -> { - require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } + var hasExited: Boolean = false - allowedSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } + override fun onIdle(deadline: Long) { + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } - allowedSpeed = min(model.frequency, command.limit) - } - is SimResourceCommand.Exit -> { - allowedSpeed = 0.0 - actualSpeed = 0.0 + override fun onConsume(work: Double, limit: Double, deadline: Long) { + allowedSpeed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } - vm.onCpuExit(model.id) - } - } + override fun onFinish() { + hasExited = true + activeCommand = SimResourceCommand.Exit + vm.onCpuExit() } - /** - * Start the CPU. - */ - fun start() { - try { - isIntermediate = true - latestFlush = ctx.clock.millis() - - process(workload.onStart(ctx, model.id)) - } catch (e: Throwable) { - fail(e) - } finally { - isIntermediate = false - } + override fun onFailure(cause: Throwable) { + hasExited = true + activeCommand = SimResourceCommand.Exit + vm.onCpuFailure(cause) } - /** - * Flush the work performed by the CPU. - */ - fun flush(interrupt: Boolean) { - val now = ctx.clock.millis() + override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + // Apply performance interference model + val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0 - // Fast path: if the CPU was already flushed at at the current instant, no need to flush the progress. - if (latestFlush >= now) { - return + // Compute the remaining amount of work + val remainingWork = if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 } - try { - isIntermediate = true - when (val command = command) { - is SimResourceCommand.Idle -> { - // Act like nothing has happened in case the vCPU did not reach its deadline or was not - // interrupted by the user. - if (interrupt || command.deadline <= now) { - process(workload.onNext(ctx, model.id, 0.0)) - } - } - is SimResourceCommand.Consume -> { - // Apply performance interference model - val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0 - - // Compute the remaining amount of work - val remainingWork = if (command.work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - totalInterferedWork += interferedWork - - max(0.0, command.work - processed) - } else { - 0.0 - } - - // Act like nothing has happened in case the vCPU did not finish yet or was not interrupted by - // the user. - if (interrupt || remainingWork == 0.0 || command.deadline <= now) { - if (!interrupt) { - totalOvercommittedWork += remainingWork - } - - process(workload.onNext(ctx, model.id, remainingWork)) - } else { - process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) - } - } - SimResourceCommand.Exit -> - throw IllegalStateException() - } - } catch (e: Throwable) { - fail(e) - } finally { - latestFlush = now - isIntermediate = false + if (!isInterrupted) { + totalOvercommittedWork += remainingWork } + + return remainingWork } - /** - * Interrupt the CPU. - */ - fun interrupt() { + override fun interrupt() { // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead // to infinite recursion. - if (isIntermediate) { + if (isProcessing) { return } - flush(interrupt = true) + super.interrupt() // Force the scheduler to re-schedule shouldSchedule() } - /** - * Fail the CPU. - */ - fun fail(e: Throwable) { - command = SimResourceCommand.Exit - vm.onCpuFailure(e) - } - override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt new file mode 100644 index 00000000..5c67b990 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResource +import java.time.Clock + +/** + * A simulated execution context in which a bootable image runs. This interface represents the + * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on + * which the image runs. + */ +public interface SimMachineContext { + /** + * The virtual clock tracking simulation time. + */ + public val clock: Clock + + /** + * The metadata associated with the context. + */ + public val meta: Map + + /** + * The CPUs available on the machine. + */ + public val cpus: List + + /** + * The memory available on the machine + */ + public val memory: List + + /** + * Interrupt the specified [resource]. + * + * @throws IllegalArgumentException if the resource does not belong to this execution context. + */ + public fun interrupt(resource: SimResource) +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt index c2988b11..d6bf0e99 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit /** * A description of the physical or virtual machine on which a bootable image runs. @@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.ProcessingUnit * @property cpus The list of processing units available to the image. * @property memory The list of memory units available to the image. */ -public data class SimMachineModel(public val cpus: List, public val memory: List) +public data class SimMachineModel(public val cpus: List, public val memory: List) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 778b68ca..751873a5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -26,26 +26,26 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.* import java.time.Clock import java.util.ArrayDeque import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException -import kotlin.math.min /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. * * @param listener The hypervisor listener to use. */ -public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { +public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer { /** * The execution context in which the hypervisor runs. */ - private lateinit var ctx: SimExecutionContext + private lateinit var ctx: SimMachineContext /** * The mapping from pCPU to vCPU. @@ -67,18 +67,36 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen return SimVm(model, performanceInterferenceModel) } - override fun onStart(ctx: SimExecutionContext) { + override fun onStart(ctx: SimMachineContext) { this.ctx = ctx - this.vcpus = arrayOfNulls(ctx.machine.cpus.size) - this.availableCpus.addAll(ctx.machine.cpus.indices) + this.vcpus = arrayOfNulls(ctx.cpus.size) + this.availableCpus.addAll(ctx.cpus.indices) } - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return onNext(ctx, cpu, 0.0) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { + return this } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle() + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return onNext(ctx, 0.0) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + val vcpu = vcpus[ctx.resource.id] ?: return SimResourceCommand.Idle() + + if (vcpu.isStarted) { + vcpu.remainingWork = remainingWork + vcpu.flush() + } else { + vcpu.isStarted = true + vcpu.start() + } + + if (vcpu.hasExited && vcpu != vcpus[ctx.resource.id]) { + return onNext(ctx, remainingWork) + } + + return vcpu.activeCommand } /** @@ -117,36 +135,46 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen private var cpus: List = emptyList() /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The execution context in which the workload runs. */ - override suspend fun run(workload: SimWorkload, meta: Map) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model + inner class Context(override val meta: Map) : SimMachineContext { + override val cpus: List + get() = model.cpus - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock + override val memory: List + get() = model.memory - override val meta: Map - get() = meta + override val clock: Clock + get() = this@SimSpaceSharedHypervisor.ctx.clock - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } + override fun interrupt(resource: SimResource) { + TODO() } + } + + lateinit var ctx: SimMachineContext + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map) { + require(!isTerminated) { "Machine is terminated" } + require(cont == null) { "Run should not be called concurrently" } + + ctx = Context(meta) workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) } + try { + this.cpus = model.cpus.map { model -> VCpu(this, model, workload.getConsumer(ctx, model), ctx.clock) } - for (cpu in cpus) { - cpu.start() + for ((index, pCPU) in pCPUs.withIndex()) { + vcpus[pCPU] = cpus[index] + this@SimSpaceSharedHypervisor.ctx.interrupt(this@SimSpaceSharedHypervisor.ctx.cpus[pCPU]) + } + } catch (e: Throwable) { + cont.resumeWithException(e) } } } @@ -157,19 +185,23 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen vcpus[pCPU] = null availableCpus.add(pCPU) } + + val cont = cont + this.cont = null + cont?.resume(Unit) } /** * Update the usage of the VM. */ fun updateUsage() { - usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } + usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.resource.frequency } } /** * This method is invoked when one of the CPUs has exited. */ - fun onCpuExit(cpu: Int) { + fun onCpuExit() { // Check whether all other CPUs have finished if (cpus.all { it.hasExited }) { val cont = cont @@ -193,7 +225,22 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { + private inner class VCpu( + val vm: SimVm, + resource: SimProcessingUnit, + consumer: SimResourceConsumer, + clock: Clock + ) : SimAbstractResourceContext(resource, clock, consumer) { + /** + * Indicates that the vCPU was started. + */ + var isStarted: Boolean = false + + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + /** * The processing speed of the vCPU. */ @@ -204,81 +251,41 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen } /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - /** - * A flag to indicate that the CPU was started. + * The amount of work remaining from the previous consumption. */ - var hasStarted: Boolean = false + var remainingWork: Double = 0.0 /** - * Process the specified [SimResourceCommand] for this CPU. + * A flag to indicate that the CPU has exited. */ - fun process(command: SimResourceCommand): SimResourceCommand { - return when (command) { - is SimResourceCommand.Idle -> { - speed = 0.0 - command - } - is SimResourceCommand.Consume -> { - speed = min(model.frequency, command.limit) - command - } - is SimResourceCommand.Exit -> { - speed = 0.0 - hasExited = true - - vm.onCpuExit(model.id) - - SimResourceCommand.Idle() - } - } - } + var hasExited: Boolean = false - /** - * Start the CPU. - */ - fun start() { - vcpus[pCPU] = this - interrupt() + override fun onIdle(deadline: Long) { + speed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) } - /** - * Request the workload for more work. - */ - fun next(remainingWork: Double): SimResourceCommand { - return try { - val command = - if (hasStarted) { - workload.onNext(ctx, model.id, remainingWork) - } else { - hasStarted = true - workload.onStart(ctx, model.id) - } - process(command) - } catch (e: Throwable) { - fail(e) - } + override fun onConsume(work: Double, limit: Double, deadline: Long) { + speed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, speed, deadline) } - /** - * Interrupt the CPU. - */ - fun interrupt() { - this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU) + override fun onFinish() { + speed = 0.0 + hasExited = true + activeCommand = SimResourceCommand.Idle() + vm.onCpuExit() } - /** - * Fail the CPU. - */ - fun fail(e: Throwable): SimResourceCommand { + override fun onFailure(cause: Throwable) { + speed = 0.0 hasExited = true + activeCommand = SimResourceCommand.Idle() + vm.onCpuFailure(cause) + } - vm.onCpuFailure(e) - - return SimResourceCommand.Idle() + override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + return remainingWork } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt deleted file mode 100644 index bcbde5b1..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.model - -/** - * A memory unit of a compute resource, either virtual or physical. - * - * @property vendor The vendor string of the memory. - * @property modelName The name of the memory model. - * @property speed The access speed of the memory in MHz. - * @property size The size of the memory unit in MBs. - */ -public data class MemoryUnit( - public val vendor: String, - public val modelName: String, - public val speed: Double, - public val size: Long -) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt deleted file mode 100644 index 58ed816c..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.model - -/** - * A processing node/package/socket containing possibly several CPU cores. - * - * @property vendor The vendor string of the processor node. - * @property modelName The name of the processor node. - * @property arch The micro-architecture of the processor node. - * @property coreCount The number of logical CPUs in the processor node. - */ -public data class ProcessingNode( - public val vendor: String, - public val arch: String, - public val modelName: String, - public val coreCount: Int -) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt deleted file mode 100644 index 415e95e6..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.model - -/** - * A single logical compute unit of processor node, either virtual or physical. - * - * @property node The processing node containing the CPU core. - * @property id The identifier of the CPU core within the processing node. - * @property frequency The clock rate of the CPU in MHz. - */ -public data class ProcessingUnit( - public val node: ProcessingNode, - public val id: Int, - public val frequency: Double -) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt new file mode 100644 index 00000000..49745868 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +import org.opendc.simulator.resources.SimResource + +/** + * A memory unit of a compute resource, either virtual or physical. + * + * @property vendor The vendor string of the memory. + * @property modelName The name of the memory model. + * @property speed The access speed of the memory in MHz. + * @property size The size of the memory unit in MBs. + */ +public data class SimMemoryUnit( + public val vendor: String, + public val modelName: String, + public val speed: Double, + public val size: Long +) : SimResource { + override val capacity: Double + get() = speed +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt new file mode 100644 index 00000000..4022ecb3 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +/** + * A processing node/package/socket containing possibly several CPU cores. + * + * @property vendor The vendor string of the processor node. + * @property modelName The name of the processor node. + * @property arch The micro-architecture of the processor node. + * @property coreCount The number of logical CPUs in the processor node. + */ +public data class SimProcessingNode( + public val vendor: String, + public val arch: String, + public val modelName: String, + public val coreCount: Int +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt new file mode 100644 index 00000000..1c989254 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.model + +import org.opendc.simulator.resources.SimResource + +/** + * A single logical compute unit of processor node, either virtual or physical. + * + * @property node The processing node containing the CPU core. + * @property id The identifier of the CPU core within the processing node. + * @property frequency The clock rate of the CPU in MHz. + */ +public data class SimProcessingUnit( + public val node: SimProcessingNode, + public val id: Int, + public val frequency: Double +) : SimResource { + override val capacity: Double + get() = frequency +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index c22fcc07..9b47821e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -36,31 +40,35 @@ public class SimFlopsWorkload( public val utilization: Double = 0.8 ) : SimWorkload { init { - require(flops >= 0) { "Negative number of flops" } + require(flops >= 0) { "Number of FLOPs must be positive" } require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimExecutionContext) {} + override fun onStart(ctx: SimMachineContext) {} - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - val cores = ctx.machine.cpus.size - val limit = ctx.machine.cpus[cpu].frequency * utilization - val work = flops.toDouble() / cores - - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) - } else { - SimResourceCommand.Exit - } + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { + return CpuConsumer(ctx) } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.machine.cpus[cpu].frequency * utilization + private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + val limit = ctx.resource.frequency * utilization + val work = flops.toDouble() / machine.cpus.size + + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } - return SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.resource.frequency * utilization + return SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt deleted file mode 100644 index 41a5028e..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload - -/** - * A command that is sent to the host machine. - */ -public sealed class SimResourceCommand { - /** - * A request to the host to process the specified amount of [work] on a vCPU before the specified [deadline]. - * - * @param work The amount of work to process on the CPU. - * @param limit The maximum amount of work to be processed per second. - * @param deadline The instant at which the work needs to be fulfilled. - */ - public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { - init { - require(work > 0) { "The amount of work must be positive." } - require(limit > 0) { "Limit must be positive." } - } - } - - /** - * An indication to the host that the vCPU will idle until the specified [deadline] or is interrupted. - */ - public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() - - /** - * An indication to the host that the vCPU has finished processing. - */ - public object Exit : SimResourceCommand() -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 00ebebce..313b6ed5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that models application execution as a single duration. @@ -39,20 +43,26 @@ public class SimRuntimeWorkload( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimExecutionContext) {} + override fun onStart(ctx: SimMachineContext) {} - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - val limit = ctx.machine.cpus[cpu].frequency * utilization - val work = (limit / 1000) * duration - return SimResourceCommand.Consume(work, limit) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { + return CpuConsumer() } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.machine.cpus[cpu].frequency * utilization - SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit + private inner class CpuConsumer : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + val limit = ctx.resource.frequency * utilization + val work = (limit / 1000) * duration + return SimResourceCommand.Consume(work, limit) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.resource.frequency * utilization + SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index deb10b98..edef3843 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -34,36 +38,42 @@ public class SimTraceWorkload(public val trace: Sequence) : SimWorkloa private var fragment: Fragment? = null private lateinit var barrier: SimWorkloadBarrier - override fun onStart(ctx: SimExecutionContext) { - barrier = SimWorkloadBarrier(ctx.machine.cpus.size) + override fun onStart(ctx: SimMachineContext) { + barrier = SimWorkloadBarrier(ctx.cpus.size) fragment = nextFragment() offset = ctx.clock.millis() } - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return onNext(ctx, cpu, 0.0) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer { + return CpuConsumer() } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val work = (fragment.duration / 1000) * fragment.usage - val deadline = offset + fragment.duration + private inner class CpuConsumer : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return onNext(ctx, 0.0) + } - assert(deadline >= now) { "Deadline already passed" } + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val work = (fragment.duration / 1000) * fragment.usage + val deadline = offset + fragment.duration - val cmd = - if (cpu < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + assert(deadline >= now) { "Deadline already passed" } - if (barrier.enter()) { - this.fragment = nextFragment() - this.offset += fragment.duration - } + val cmd = + if (ctx.resource.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) - return cmd + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } + + return cmd + } } override fun toString(): String = "SimTraceWorkload" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 6fc78d56..60661e23 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -22,7 +22,9 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceConsumer /** * A model that characterizes the runtime behavior of some particular workload. @@ -32,27 +34,12 @@ import org.opendc.simulator.compute.SimExecutionContext */ public interface SimWorkload { /** - * This method is invoked when the workload is started, before the (virtual) CPUs assigned to the workload will - * start. + * This method is invoked when the workload is started. */ - public fun onStart(ctx: SimExecutionContext) + public fun onStart(ctx: SimMachineContext) /** - * This method is invoked when a (virtual) CPU assigned to the workload has started. - * - * @param ctx The execution context in which the workload runs. - * @param cpu The index of the (virtual) CPU to start. - * @return The command to perform on the CPU. + * Obtain the resource consumer for the specified processing unit. */ - public fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand - - /** - * This method is invoked when a (virtual) CPU assigned to the workload was interrupted or reached its deadline. - * - * @param ctx The execution context in which the workload runs. - * @param cpu The index of the (virtual) CPU to obtain the resource consumption of. - * @param remainingWork The remaining work that was not yet completed. - * @return The next command to perform on the CPU. - */ - public fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand + public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index b8eee4f0..4b4d7eca 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -30,9 +30,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import java.time.Clock @@ -51,10 +51,10 @@ internal class SimHypervisorTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 1036f1ac..00efba53 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -29,14 +29,10 @@ import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter /** @@ -48,11 +44,11 @@ class SimMachineTest { @BeforeEach fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -86,74 +82,4 @@ class SimMachineTest { assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } } } - - @Test - fun testInterrupt() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - ctx.interrupt(cpu) - return SimResourceCommand.Exit - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertDoesNotThrow { - testScope.runBlockingTest { machine.run(workload) } - } - } - - @Test - fun testExceptionPropagationOnStart() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - throw IllegalStateException() - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertThrows { - testScope.runBlockingTest { machine.run(workload) } - } - } - - @Test - fun testExceptionPropagationOnNext() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertThrows { - testScope.runBlockingTest { machine.run(workload) } - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 1a9faf11..583d989c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -31,9 +31,10 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -53,10 +54,10 @@ internal class SimSpaceSharedHypervisorTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -125,6 +126,56 @@ internal class SimSpaceSharedHypervisorTest { assertEquals(duration, scope.currentTime) { "Took enough time" } } + /** + * Test FLOPs workload on hypervisor. + */ + @Test + fun testFlopsWorkload() { + val duration = 5 * 60L * 1000 + val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + scope.launch { + launch { machine.run(hypervisor) } + + yield() + launch { hypervisor.createMachine(machineModel).run(workload) } + } + + scope.advanceUntilIdle() + + assertEquals(duration, scope.currentTime) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() { + val duration = 5 * 60L * 1000 + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + scope.launch { + launch { machine.run(hypervisor) } + + yield() + launch { + val vm = hypervisor.createMachine(machineModel) + vm.run(SimRuntimeWorkload(duration)) + vm.close() + + val vm2 = hypervisor.createMachine(machineModel) + vm2.run(SimRuntimeWorkload(duration)) + } + } + + scope.advanceUntilIdle() + + assertEquals(duration * 2, scope.currentTime) { "Took enough time" } + } + /** * Test concurrent workloads on the machine. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts new file mode 100644 index 00000000..831ca3db --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Uniform resource consumption simulation model" + +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api("org.jetbrains.kotlinx:kotlinx-coroutines-core") + implementation(project(":opendc-utils")) + + testImplementation(project(":opendc-simulator:opendc-simulator-core")) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt new file mode 100644 index 00000000..f9da74c7 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + */ +public abstract class SimAbstractResourceContext( + override val resource: R, + override val clock: Clock, + private val consumer: SimResourceConsumer +) : SimResourceContext { + /** + * This method is invoked when the resource will idle until the specified [deadline]. + */ + public abstract fun onIdle(deadline: Long) + + /** + * This method is invoked when the resource will be consumed until the specified [work] was processed or the + * [deadline] was reached. + */ + public abstract fun onConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer has finished. + */ + public abstract fun onFinish() + + /** + * This method is invoked when the resource consumer throws an exception. + */ + public abstract fun onFailure(cause: Throwable) + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + protected open fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } + + /** + * Compute the speed at which the resource may be consumed. + */ + protected open fun getSpeed(limit: Double): Double { + return min(limit, resource.capacity) + } + + /** + * Get the remaining work to process after a resource consumption was flushed. + * + * @param work The size of the resource consumption. + * @param speed The speed of consumption. + * @param duration The duration from the start of the consumption until now. + * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to + * it being interrupted before it could finish or reach its deadline. + * @return The amount of work remaining. + */ + protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + return if (duration > 0L) { + val processed = duration / 1000.0 * speed + max(0.0, work - processed) + } else { + 0.0 + } + } + + /** + * Start the consumer. + */ + public fun start() { + try { + isProcessing = true + latestFlush = clock.millis() + + interpret(consumer.onStart(this)) + } catch (e: Throwable) { + onFailure(e) + } finally { + isProcessing = false + } + } + + /** + * Immediately stop the consumer. + */ + public fun stop() { + try { + isProcessing = true + latestFlush = clock.millis() + + flush(isIntermediate = true) + onFinish() + } catch (e: Throwable) { + onFailure(e) + } finally { + isProcessing = false + } + } + + /** + * Flush the current active resource consumption. + * + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public open fun flush(isIntermediate: Boolean = false) { + val now = clock.millis() + + // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. + if (isIntermediate && latestFlush >= now) { + return + } + + try { + val (timestamp, command) = activeCommand ?: return + + isProcessing = true + activeCommand = null + + val duration = now - timestamp + assert(duration >= 0) { "Flush in the past" } + + when (command) { + is SimResourceCommand.Idle -> { + // We should only continue processing the next command if: + // 1. The resource consumer reached its deadline. + // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if (command.deadline <= now || !isIntermediate) { + next(remainingWork = 0.0) + } + } + is SimResourceCommand.Consume -> { + val speed = min(resource.capacity, command.limit) + val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed) + val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted) + + // We should only continue processing the next command if: + // 1. The resource consumption was finished. + // 2. The resource consumer reached its deadline. + // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { + next(remainingWork) + } else { + interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) + } + } + SimResourceCommand.Exit -> + // Flush may not be called when the resource consumer has finished + throw IllegalStateException() + } + } catch (e: Throwable) { + onFailure(e) + } finally { + latestFlush = now + isProcessing = false + } + } + + override fun interrupt() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isProcessing) { + return + } + + flush() + } + + override fun toString(): String = "SimAbstractResourceContext[resource=$resource]" + + /** + * A flag to indicate that the resource is currently processing a command. + */ + protected var isProcessing: Boolean = false + + /** + * The current command that is being processed. + */ + private var activeCommand: CommandWrapper? = null + + /** + * The latest timestamp at which the resource was flushed. + */ + private var latestFlush: Long = Long.MIN_VALUE + + /** + * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + */ + private fun interpret(command: SimResourceCommand) { + val now = clock.millis() + + when (command) { + is SimResourceCommand.Idle -> { + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + onIdle(deadline) + } + is SimResourceCommand.Consume -> { + val work = command.work + val limit = command.limit + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + onConsume(work, limit, deadline) + } + is SimResourceCommand.Exit -> { + onFinish() + } + } + + assert(activeCommand == null) { "Concurrent access to current command" } + activeCommand = CommandWrapper(now, command) + } + + /** + * Request the workload for more work. + */ + private fun next(remainingWork: Double) { + interpret(consumer.onNext(this, remainingWork)) + } + + /** + * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. + */ + private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt new file mode 100644 index 00000000..31b0a175 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A generic representation of resource that may be consumed. + */ +public interface SimResource { + /** + * The capacity of the resource. + */ + public val capacity: Double +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt new file mode 100644 index 00000000..77c0a7a9 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + */ +public sealed class SimResourceCommand { + /** + * A request to the resource to perform the specified amount of work before the given [deadline]. + * + * @param work The amount of work to process on the CPU. + * @param limit The maximum amount of work to be processed per second. + * @param deadline The instant at which the work needs to be fulfilled. + */ + public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { + init { + require(work > 0) { "Amount of work must be positive" } + require(limit > 0) { "Limit must be positive" } + } + } + + /** + * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted. + */ + public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() + + /** + * An indication to the resource that the consumer has finished. + */ + public object Exit : SimResourceCommand() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt new file mode 100644 index 00000000..f516faa6 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A SimResourceConsumer characterizes how a [SimResource] is consumed. + */ +public interface SimResourceConsumer { + /** + * This method is invoked when the consumer is started for a resource. + * + * @param ctx The execution context in which the consumer runs. + * @return The next command that the resource should perform. + */ + public fun onStart(ctx: SimResourceContext): SimResourceCommand + + /** + * This method is invoked when a resource was either interrupted or reached its deadline. + * + * @param ctx The execution context in which the consumer runs. + * @param remainingWork The remaining work that was not yet completed. + * @return The next command that the resource should perform. + */ + public fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt new file mode 100644 index 00000000..dfb5e9ce --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a + * resource and a resource consumer. + */ +public interface SimResourceContext { + /** + * The resource that is managed by this context. + */ + public val resource: R + + /** + * The virtual clock tracking simulation time. + */ + public val clock: Clock + + /** + * Ask the resource provider to interrupt its resource. + */ + public fun interrupt() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt new file mode 100644 index 00000000..91a745ab --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceProvider] provides some resource of type [R]. + */ +public interface SimResourceProvider : AutoCloseable { + /** + * The resource that is managed by this provider. + */ + public val resource: R + + /** + * Consume the resource provided by this provider using the specified [consumer]. + */ + public suspend fun consume(consumer: SimResourceConsumer) + + /** + * End the lifetime of the resource. + * + * This operation terminates the existing resource consumer. + */ + public override fun close() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt new file mode 100644 index 00000000..4445df86 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import org.opendc.utils.TimerScheduler +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.math.min + +/** + * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. + * + * @param resource The resource to provide. + * @param clock The virtual clock to track simulation time. + */ +public class SimResourceSource( + override val resource: R, + private val clock: Clock, + private val scheduler: TimerScheduler +) : SimResourceProvider { + /** + * The resource processing speed over time. + */ + public val speed: StateFlow + get() = _speed + private val _speed = MutableStateFlow(0.0) + + override suspend fun consume(consumer: SimResourceConsumer) { + check(!isClosed) { "Lifetime of resource has ended." } + check(cont == null) { "Run should not be called concurrently" } + + try { + return suspendCancellableCoroutine { cont -> + this.cont = cont + val ctx = Context(consumer, cont) + ctx.start() + cont.invokeOnCancellation { + ctx.stop() + } + } + } finally { + cont = null + } + } + + override fun close() { + isClosed = true + cont?.cancel() + cont = null + } + + /** + * A flag to indicate that the resource was closed. + */ + private var isClosed: Boolean = false + + /** + * The current active consumer. + */ + private var cont: CancellableContinuation? = null + + /** + * Internal implementation of [SimResourceContext] for this class. + */ + private inner class Context( + consumer: SimResourceConsumer, + val cont: Continuation + ) : SimAbstractResourceContext(resource, clock, consumer) { + /** + * The processing speed of the resource. + */ + private var speed: Double = 0.0 + set(value) { + field = value + _speed.value = field + } + + override fun onIdle(deadline: Long) { + speed = 0.0 + + // Do not resume if deadline is "infinite" + if (deadline != Long.MAX_VALUE) { + scheduler.startSingleTimerTo(this, deadline) { flush() } + } + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + speed = getSpeed(limit) + val until = min(deadline, clock.millis() + getDuration(work, speed)) + + scheduler.startSingleTimerTo(this, until) { flush() } + } + + override fun onFinish() { + speed = 0.0 + scheduler.cancel(this) + cont.resume(Unit) + } + + override fun onFailure(cause: Throwable) { + speed = 0.0 + scheduler.cancel(this) + cont.resumeWithException(cause) + } + + override fun toString(): String = "SimResourceSource.Context[resource=$resource]" + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt new file mode 100644 index 00000000..02d456ff --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows + +/** + * Test suite for [SimResourceCommand]. + */ +class SimResourceCommandTest { + @Test + fun testZeroWork() { + assertThrows { + SimResourceCommand.Consume(0.0, 1.0) + } + } + + @Test + fun testNegativeWork() { + assertThrows { + SimResourceCommand.Consume(-1.0, 1.0) + } + } + + @Test + fun testZeroLimit() { + assertThrows { + SimResourceCommand.Consume(1.0, 0.0) + } + } + + @Test + fun testNegativeLimit() { + assertThrows { + SimResourceCommand.Consume(1.0, -1.0, 1) + } + } + + @Test + fun testConsumeCorrect() { + assertDoesNotThrow { + SimResourceCommand.Consume(1.0, 1.0) + } + } + + @Test + fun testIdleCorrect() { + assertDoesNotThrow { + SimResourceCommand.Idle(1) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt new file mode 100644 index 00000000..8b380efb --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler +import java.time.Clock + +/** + * A test suite for the [SimResourceScheduler] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceSourceTest { + + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + + data class SimCpu(val speed: Double) : SimResource { + override val capacity: Double + get() = speed + } + + @BeforeEach + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + } + + @Test + fun testSpeed() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + scope.runBlockingTest { + val res = mutableListOf() + val job = launch { provider.speed.toList(res) } + + provider.consume(consumer) + + job.cancel() + assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" } + } + } + + @Test + fun testSpeedLimit() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + scope.runBlockingTest { + val res = mutableListOf() + val job = launch { provider.speed.toList(res) } + + provider.consume(consumer) + + job.cancel() + assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" } + } + } + + @Test + fun testInterrupt() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + ctx.interrupt() + return SimResourceCommand.Exit + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertDoesNotThrow { + scope.runBlockingTest { + provider.consume(consumer) + } + } + } + + @Test + fun testFailure() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + throw IllegalStateException() + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertThrows { + scope.runBlockingTest { + provider.consume(consumer) + } + } + } + + @Test + fun testExceptionPropagationOnNext() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Consume(1.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertThrows { + scope.runBlockingTest { provider.consume(consumer) } + } + } + + @Test + fun testConcurrentConsumption() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Consume(1.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertThrows { + scope.runBlockingTest { + launch { provider.consume(consumer) } + launch { provider.consume(consumer) } + } + } + } + + @Test + fun testClosedConsumption() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Consume(1.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + assertThrows { + scope.runBlockingTest { + provider.close() + provider.consume(consumer) + } + } + } + + @Test + fun testCloseDuringConsumption() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Consume(1.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + throw IllegalStateException() + } + } + + scope.runBlockingTest { + launch { provider.consume(consumer) } + delay(500) + provider.close() + } + + assertEquals(500, scope.currentTime) + } + + @Test + fun testIdle() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Idle(ctx.clock.millis() + 500) + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + scope.runBlockingTest { + provider.consume(consumer) + } + + assertEquals(500, scope.currentTime) + } + + @Test + fun testInfiniteSleep() { + val resource = SimCpu(4200.0) + val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Idle() + } + + override fun onNext(ctx: SimResourceContext, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + assertThrows { + scope.runBlockingTest { + provider.consume(consumer) + } + } + } +} diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index ff116443..bb6f3299 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -51,7 +51,7 @@ public class TimerScheduler(private val coroutineScope: CoroutineScope, priva private val timers = mutableMapOf() /** - * The channel to communicate with the + * The channel to communicate with the scheduling job. */ private val channel = Channel(Channel.CONFLATED) diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index e87dd4d8..0e5a2711 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -32,6 +32,7 @@ include(":opendc-experiments:opendc-experiments-sc18") include(":opendc-experiments:opendc-experiments-capelin") include(":opendc-runner-web") include(":opendc-simulator:opendc-simulator-core") +include(":opendc-simulator:opendc-simulator-resources") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") include(":opendc-trace:opendc-trace-core") -- cgit v1.2.3