diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-23 11:56:53 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-23 11:56:53 +0100 |
| commit | 6de1ef7424e058603be9ae5a86f0568b40579e5f (patch) | |
| tree | 2a882a67667e8efcd51d74cfbe32fdeaad02f502 /simulator | |
| parent | 0fa1dc262905c42b3549172fea59f7ad4cb58b1f (diff) | |
| parent | 38a13e5c201c828f9f21f17e89916b4638396945 (diff) | |
simulator: Add uniform resource consumption model (v2)
This is the second pull request in the series of pull requests to add a uniform resource consumption model to OpenDC. This pull request focusses on adding dynamic capacity negotiation and propagation between resource consumer and resource provider:
* The generic resource constraint is removed from the interfaces of `opendc-simulator-resources`. Users of the API are expected to use the untyped variants where only the capacity needs to be specified. Users are expected to build higher-level abstractions on top of these interface to represent actual resources (e.g., CPU, disk or network).
* Added benchmarks for the most important implementations of `opendc-simulator-resources`. This allows us to quantify the effects of changes on the runtime.
* The `SimResourceSwitchMaxMin` has been split into a `SimResourceAggregatorMaxMin` and `SimResourceDistributorMaxMin` which respectively aggregate input resources and distribute output resources using max-min fair sharing.
* The `SimResourceConsumer` interface has a new method for receiving capacity change events: `onCapacityChanged(ctx, isThrottled)`
**Breaking API Changes**
* All interfaces in `opendc-simulator-resources`.
Diffstat (limited to 'simulator')
58 files changed, 2365 insertions, 1200 deletions
diff --git a/simulator/buildSrc/build.gradle.kts b/simulator/buildSrc/build.gradle.kts index be071d0c..a71e18cf 100644 --- a/simulator/buildSrc/build.gradle.kts +++ b/simulator/buildSrc/build.gradle.kts @@ -36,6 +36,8 @@ dependencies { implementation(kotlin("gradle-plugin", version = "1.4.31")) implementation("org.jlleitschuh.gradle:ktlint-gradle:10.0.0") implementation("org.jetbrains.dokka:dokka-gradle-plugin:0.10.1") + implementation("org.jetbrains.kotlin:kotlin-allopen:1.4.30") + implementation("org.jetbrains.kotlinx:kotlinx-benchmark-plugin:0.3.0") } kotlinDslPluginOptions { diff --git a/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts new file mode 100644 index 00000000..d3bb886d --- /dev/null +++ b/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +import kotlinx.benchmark.gradle.* +import org.jetbrains.kotlin.allopen.gradle.* + +plugins { + id("org.jetbrains.kotlinx.benchmark") + `java-library` + kotlin("plugin.allopen") +} + +sourceSets { + register("jmh") { + compileClasspath += sourceSets["main"].output + runtimeClasspath += sourceSets["main"].output + } +} + +configurations { + named("jmhImplementation") { + extendsFrom(configurations["implementation"]) + } +} + +configure<AllOpenExtension> { + annotation("org.openjdk.jmh.annotations.State") +} + +benchmark { + targets { + register("jmh") { + this as JvmBenchmarkTarget + jmhVersion = "1.21" + } + } +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-benchmark-runtime-jvm:0.3.0") +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 6e9b8151..694676bc 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -35,7 +35,7 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.simulator.compute.* import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow import java.time.Clock @@ -217,7 +217,7 @@ public class SimHost( val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(SimMemoryUnit("Generic", "Generic", 3200.0, memorySize)) + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) return SimMachineModel(processingUnits, memoryUnits) } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 6929b06c..e311cd21 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -40,9 +40,9 @@ import org.opendc.compute.api.ServerWatcher import org.opendc.compute.service.driver.HostEvent import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import java.time.Clock @@ -62,11 +62,11 @@ internal class SimHostTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -136,9 +136,9 @@ internal class SimHostTest { assertAll( { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, - { assertEquals(4273200, requestedWork, "Requested work does not match") }, - { assertEquals(3133200, grantedWork, "Granted work does not match") }, - { assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") }, + { assertEquals(4197600, requestedWork, "Requested work does not match") }, + { assertEquals(2157600, grantedWork, "Granted work does not match") }, + { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1200006, scope.currentTime) } ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 59ce895f..a812490a 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -143,10 +143,10 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1707132711051, monitor.totalRequestedBurst) }, - { assertEquals(457881474296, monitor.totalGrantedBurst) }, - { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, - { assertEquals(0, monitor.totalInterferedBurst) } + { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -189,9 +189,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(702636229989, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(172807361391, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(528959213229, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 85a2e413..3da8d0b3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream import java.util.* @@ -61,12 +61,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja val cores = machine.cpus.flatMap { id -> when (id) { 1 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } } 2 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } } else -> throw IllegalArgumentException("The cpu id $id is not recognized") } @@ -75,7 +75,7 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja UUID(0L, counter++.toLong()), "node-$counter", emptyMap(), - SimMachineModel(cores, listOf(SimMemoryUnit("", "", 2300.0, 16000))), + SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), ConstantPowerModel(0.0) ) } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 094bc975..9a06a40f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -26,9 +26,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.File import java.io.FileInputStream import java.io.InputStream @@ -88,8 +88,8 @@ public class Sc20ClusterEnvironmentReader( memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L coresPerHost = values[coresPerHostCol].trim().toInt() - val unknownProcessingNode = SimProcessingNode("unknown", "unknown", "unknown", coresPerHost) - val unknownMemoryUnit = SimMemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) repeat(numberOfHosts) { nodes.add( @@ -99,7 +99,7 @@ public class Sc20ClusterEnvironmentReader( mapOf("cluster" to clusterId), SimMachineModel( List(coresPerHost) { coreId -> - SimProcessingUnit(unknownProcessingNode, coreId, speed) + ProcessingUnit(unknownProcessingNode, coreId, speed) }, listOf(unknownMemoryUnit) ), diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 87a49f49..effd0286 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream import java.util.* @@ -60,19 +60,19 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja val cores = machine.cpus.flatMap { id -> when (id) { 1 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) + List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } } 2 -> { - val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) } + val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) + List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } } else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } val memories = machine.memories.map { id -> when (id) { - 1 -> SimMemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) + 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index 0ff40a28..e7e99a3d 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -34,9 +34,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import java.util.* /** @@ -56,13 +56,13 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p val cores = cpu.getInteger("numberOfCores") val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() // TODO Remove hardcoding of vendor - val node = SimProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) List(cores) { coreId -> - SimProcessingUnit(node, coreId, speed) + ProcessingUnit(node, coreId, speed) } } val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> - SimMemoryUnit( + MemoryUnit( "Samsung", memory.getString("name"), memory.get("speedMbPerS", Number::class.java).toDouble(), diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt index a80365de..f68e206a 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt @@ -35,9 +35,9 @@ import org.opendc.serverless.service.ServerlessService import org.opendc.serverless.service.router.RandomRoutingPolicy import org.opendc.serverless.simulator.workload.SimServerlessWorkload import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -52,11 +52,11 @@ internal class SimServerlessServiceTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index a99b082a..81d09f12 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -27,12 +27,11 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.launch import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* import java.time.Clock -import kotlin.coroutines.CoroutineContext /** * Abstract implementation of the [SimHypervisor] interface. @@ -46,7 +45,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * The resource switch to use. */ - private lateinit var switch: SimResourceSwitch<SimProcessingUnit> + private lateinit var switch: SimResourceSwitch /** * The virtual machines running on this hypervisor. @@ -58,12 +57,12 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. */ - public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> + public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch /** * Check whether the specified machine model fits on this hypervisor. */ - public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean + public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean override fun canFit(model: SimMachineModel): Boolean { return canFit(model, switch) @@ -102,7 +101,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * The vCPUs of the machine. */ - private val cpus: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>> = model.cpus.associateWith { switch.addOutput(it) } + private val cpus: Map<ProcessingUnit, SimResourceProvider> = model.cpus.associateWith { switch.addOutput(it.frequency) } /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. @@ -112,19 +111,19 @@ public abstract class SimAbstractHypervisor : SimHypervisor { require(!isTerminated) { "Machine is terminated" } val ctx = object : SimMachineContext { - override val cpus: List<SimProcessingUnit> + override val cpus: List<ProcessingUnit> get() = model.cpus - override val memory: List<SimMemoryUnit> + override val memory: List<MemoryUnit> get() = model.memory override val clock: Clock get() = this@SimAbstractHypervisor.context.clock - override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext) + override val meta: Map<String, Any> = meta - override fun interrupt(resource: SimResource) { - requireNotNull(this@VirtualMachine.cpus[resource]).interrupt() + override fun interrupt(cpu: ProcessingUnit) { + requireNotNull(this@VirtualMachine.cpus[cpu]).interrupt() } } @@ -156,8 +155,8 @@ public abstract class SimAbstractHypervisor : SimHypervisor { switch = createSwitch(ctx) } - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - val forwarder = SimResourceForwarder(cpu) + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + val forwarder = SimResourceForwarder() switch.addInput(forwarder) return forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 39ae34fe..1c0f94fd 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -27,12 +27,12 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource +import org.opendc.simulator.resources.consume import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -64,24 +64,24 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * The resources allocated for this machine. */ - protected abstract val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> + protected abstract val resources: Map<ProcessingUnit, SimResourceSource> /** * The execution context in which the workload runs. */ private inner class Context( - val sources: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>>, + val sources: Map<ProcessingUnit, SimResourceProvider>, override val meta: Map<String, Any> ) : SimMachineContext { override val clock: Clock get() = this@SimAbstractMachine.clock - override val cpus: List<SimProcessingUnit> = model.cpus + override val cpus: List<ProcessingUnit> = model.cpus - override val memory: List<SimMemoryUnit> = model.memory + override val memory: List<MemoryUnit> = model.memory - override fun interrupt(resource: SimResource) { - checkNotNull(sources[resource]) { "Invalid resource" }.interrupt() + override fun interrupt(cpu: ProcessingUnit) { + checkNotNull(sources[cpu]) { "Invalid resource" }.interrupt() } } @@ -91,7 +91,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) { val resources = resources require(!isTerminated) { "Machine is terminated" } - val ctx = Context(resources, meta + mapOf("coroutine-context" to context)) + val ctx = Context(resources, meta) val totalCapacity = model.cpus.sumByDouble { it.frequency } _speed = MutableList(model.cpus.size) { 0.0 } @@ -102,7 +102,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine val consumer = workload.getConsumer(ctx, cpu) val job = source.speed .onEach { - _speed[cpu.id] = source.speed.value + _speed[cpu.id] = it _usage.value = _speed.sum() / totalCapacity } .launchIn(this) @@ -116,9 +116,8 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine override fun close() { if (!isTerminated) { - resources.forEach { (_, provider) -> provider.close() } - } else { isTerminated = true + resources.forEach { (_, provider) -> provider.close() } } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 79982ea8..19479719 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* import org.opendc.utils.TimerScheduler import java.time.Clock @@ -57,8 +57,8 @@ public class SimBareMetalMachine( */ private val scheduler = TimerScheduler<Any>(this.context, clock) - override val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> = - model.cpus.associateWith { SimResourceSource(it, clock, scheduler) } + override val resources: Map<ProcessingUnit, SimResourceSource> = + model.cpus.associateWith { SimResourceSource(it.frequency, clock, scheduler) } override fun close() { super.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index bb97192d..fa677de9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -22,10 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* -import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single @@ -35,15 +33,14 @@ import kotlin.coroutines.CoroutineContext */ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { - override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean = true + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchMaxMin( ctx.clock, - ctx.meta["coroutine-context"] as CoroutineContext, - object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> { + object : SimResourceSwitchMaxMin.Listener { override fun onSliceFinish( - switch: SimResourceSwitchMaxMin<SimProcessingUnit>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index cff70826..85404e6e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,9 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import java.time.Clock /** @@ -46,17 +45,17 @@ public interface SimMachineContext { /** * The CPUs available on the machine. */ - public val cpus: List<SimProcessingUnit> + public val cpus: List<ProcessingUnit> /** * The memory available on the machine */ - public val memory: List<SimMemoryUnit> + public val memory: List<MemoryUnit> /** - * Interrupt the specified [resource]. + * Interrupt the specified [cpu]. * * @throws IllegalArgumentException if the resource does not belong to this execution context. */ - public fun interrupt(resource: SimResource) + public fun interrupt(cpu: ProcessingUnit) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt index d6bf0e99..2b414540 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit /** * A description of the physical or virtual machine on which a bootable image runs. @@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.SimProcessingUnit * @property cpus The list of processing units available to the image. * @property memory The list of memory units available to the image. */ -public data class SimMachineModel(public val cpus: List<SimProcessingUnit>, public val memory: List<SimMemoryUnit>) +public data class SimMachineModel(public val cpus: List<ProcessingUnit>, public val memory: List<MemoryUnit>) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 2001a230..fd8e546f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -22,19 +22,17 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.resources.* -import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { - override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean { + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { - return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext) + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { + return SimResourceSwitchExclusive() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt index 49745868..bcbde5b1 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.compute.model -import org.opendc.simulator.resources.SimResource - /** * A memory unit of a compute resource, either virtual or physical. * @@ -32,12 +30,9 @@ import org.opendc.simulator.resources.SimResource * @property speed The access speed of the memory in MHz. * @property size The size of the memory unit in MBs. */ -public data class SimMemoryUnit( +public data class MemoryUnit( public val vendor: String, public val modelName: String, public val speed: Double, public val size: Long -) : SimResource { - override val capacity: Double - get() = speed -} +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt index 4022ecb3..58ed816c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt @@ -30,7 +30,7 @@ package org.opendc.simulator.compute.model * @property arch The micro-architecture of the processor node. * @property coreCount The number of logical CPUs in the processor node. */ -public data class SimProcessingNode( +public data class ProcessingNode( public val vendor: String, public val arch: String, public val modelName: String, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt index 1c989254..415e95e6 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.compute.model -import org.opendc.simulator.resources.SimResource - /** * A single logical compute unit of processor node, either virtual or physical. * @@ -31,11 +29,8 @@ import org.opendc.simulator.resources.SimResource * @property id The identifier of the CPU core within the processing node. * @property frequency The clock rate of the CPU in MHz. */ -public data class SimProcessingUnit( - public val node: SimProcessingNode, +public data class ProcessingUnit( + public val node: ProcessingNode, public val id: Int, public val frequency: Double -) : SimResource { - override val capacity: Double - get() = frequency -} +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 9b47821e..63c9d28c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,10 +23,9 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -46,30 +45,8 @@ public class SimFlopsWorkload( override fun onStart(ctx: SimMachineContext) {} - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return CpuConsumer(ctx) - } - - private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> { - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - val limit = ctx.resource.frequency * utilization - val work = flops.toDouble() / machine.cpus.size - - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) - } else { - SimResourceCommand.Exit - } - } - - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.resource.frequency * utilization - return SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit - } - } + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization) } override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 313b6ed5..a3420e32 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -23,10 +23,9 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * A [SimWorkload] that models application execution as a single duration. @@ -45,25 +44,9 @@ public class SimRuntimeWorkload( override fun onStart(ctx: SimMachineContext) {} - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return CpuConsumer() - } - - private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - val limit = ctx.resource.frequency * utilization - val work = (limit / 1000) * duration - return SimResourceCommand.Consume(work, limit) - } - - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.resource.frequency * utilization - SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit - } - } + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + val limit = cpu.frequency * utilization + return SimWorkConsumer((limit / 1000) * duration, utilization) } override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 31f58a0f..2442d748 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -45,35 +45,29 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa offset = ctx.clock.millis() } - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return CpuConsumer() - } - - private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - return onNext(ctx, 0.0) - } + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + return object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val work = (fragment.duration / 1000) * fragment.usage + val deadline = offset + fragment.duration - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val work = (fragment.duration / 1000) * fragment.usage - val deadline = offset + fragment.duration + assert(deadline >= now) { "Deadline already passed" } - assert(deadline >= now) { "Deadline already passed" } + val cmd = + if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) - val cmd = - if (ctx.resource.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration + return cmd } - - return cmd } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 60661e23..bdc12bb5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer /** @@ -41,5 +41,5 @@ public interface SimWorkload { /** * Obtain the resource consumer for the specified processing unit. */ - public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> + public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 4ac8cf63..5773b325 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield @@ -31,9 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -46,10 +47,10 @@ internal class SimHypervisorTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) model = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -98,15 +99,21 @@ internal class SimHypervisorTest { println("Hypervisor finished") } yield() - hypervisor.createMachine(model).run(workloadA) + val vm = hypervisor.createMachine(model) + val res = mutableListOf<Double>() + val job = launch { machine.usage.toList(res) } + + vm.run(workloadA) yield() + job.cancel() machine.close() assertAll( { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, currentTime) } + { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), res) { "VM usage is correct" } }, + { assertEquals(1200000, currentTime) { "Current time is correct" } } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 6adc41d0..071bdf77 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -28,9 +28,9 @@ import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -43,11 +43,11 @@ class SimMachineTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -76,7 +76,7 @@ class SimMachineTest { try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - + yield() job.cancel() assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } } finally { diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 8428a0a7..fb0523af 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -31,9 +31,9 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -48,10 +48,10 @@ internal class SimSpaceSharedHypervisorTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts index 831ca3db..3b0a197c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts +++ b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts @@ -26,6 +26,7 @@ plugins { `kotlin-library-conventions` `testing-conventions` `jacoco-conventions` + `benchmark-conventions` } dependencies { @@ -33,5 +34,6 @@ dependencies { api("org.jetbrains.kotlinx:kotlinx-coroutines-core") implementation(project(":opendc-utils")) + jmhImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-simulator:opendc-simulator-core")) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt new file mode 100644 index 00000000..8d2587b1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.consumer.SimTraceConsumer + +/** + * Helper function to create simple consumer workload. + */ +fun createSimpleConsumer(): SimResourceConsumer { + return SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0), + SimTraceConsumer.Fragment(1000, 400.0), + SimTraceConsumer.Fragment(1000, 100.0), + SimTraceConsumer.Fragment(1000, 3000.0), + SimTraceConsumer.Fragment(1000, 4500.0), + ), + ) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt new file mode 100644 index 00000000..f2eea97c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler +import org.openjdk.jmh.annotations.* +import java.time.Clock +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceBenchmarks { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var scheduler: TimerScheduler<Any> + + @Setup + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + scheduler = TimerScheduler(scope.coroutineContext, clock) + } + + @State(Scope.Thread) + class Workload { + lateinit var consumers: Array<SimResourceConsumer> + + @Setup + fun setUp() { + consumers = Array(3) { createSimpleConsumer() } + } + } + + @Benchmark + fun benchmarkSource(state: Workload) { + return scope.runBlockingTest { + val provider = SimResourceSource(4200.0, clock, scheduler) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkForwardOverhead(state: Workload) { + return scope.runBlockingTest { + val provider = SimResourceSource(4200.0, clock, scheduler) + val forwarder = SimResourceForwarder() + provider.startConsumer(forwarder) + return@runBlockingTest forwarder.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(3) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } + + @Benchmark + fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(2) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt new file mode 100644 index 00000000..e5991264 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * Abstract implementation of [SimResourceAggregator]. + */ +public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { + /** + * The available resource provider contexts. + */ + protected val inputContexts: Set<SimResourceContext> + get() = _inputContexts + private val _inputContexts = mutableSetOf<SimResourceContext>() + + /** + * The output context. + */ + protected val outputContext: SimResourceContext + get() = context + + /** + * The commands to submit to the underlying input resources. + */ + protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf() + + /** + * This method is invoked when the resource consumer consumes resources. + */ + protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer enters an idle state. + */ + protected open fun doIdle(deadline: Long) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Idle(deadline) + } + } + + /** + * This method is invoked when the resource consumer finishes processing. + */ + protected open fun doFinish(cause: Throwable?) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Exit + } + } + + /** + * This method is invoked when an input context is started. + */ + protected open fun onContextStarted(ctx: SimResourceContext) { + _inputContexts.add(ctx) + } + + protected open fun onContextFinished(ctx: SimResourceContext) { + assert(_inputContexts.remove(ctx)) { "Lost context" } + } + + override fun addInput(input: SimResourceProvider) { + check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + + val consumer = Consumer() + _inputs.add(input) + input.startConsumer(consumer) + } + + override fun close() { + output.close() + } + + override val output: SimResourceProvider + get() = _output + private val _output = SimResourceForwarder() + + override val inputs: Set<SimResourceProvider> + get() = _inputs + private val _inputs = mutableSetOf<SimResourceProvider>() + + private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { + override val remainingWork: Double + get() = inputContexts.sumByDouble { it.remainingWork } + + override fun interrupt() { + super.interrupt() + + interruptAll() + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) + + override fun onIdle(deadline: Long) = doIdle(deadline) + + override fun onFinish(cause: Throwable?) { + doFinish(cause) + + super.onFinish(cause) + + interruptAll() + } + } + + /** + * A flag to indicate that an interrupt is active. + */ + private var isInterrupting: Boolean = false + + /** + * Schedule the work over the input resources. + */ + private fun doSchedule() { + context.flush(isIntermediate = true) + interruptAll() + } + + /** + * Interrupt all inputs. + */ + private fun interruptAll() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isInterrupting) { + return + } + + try { + isInterrupting = true + + val iterator = _inputs.iterator() + while (iterator.hasNext()) { + val input = iterator.next() + input.interrupt() + + if (input.state != SimResourceState.Active) { + iterator.remove() + } + } + } finally { + isInterrupting = false + } + } + + /** + * An internal [SimResourceConsumer] implementation for aggregator inputs. + */ + private inner class Consumer : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { + onContextStarted(ctx) + onCapacityChanged(ctx, false) + + // Make sure we initialize the output if we have not done so yet + if (context.state == SimResourceState.Pending) { + context.start() + } + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + doSchedule() + + return commands[ctx] ?: SimResourceCommand.Idle() + } + + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + // Adjust capacity of output resource + context.capacity = inputContexts.sumByDouble { it.capacity } + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + onContextFinished(ctx) + + super.onFinish(ctx, cause) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 52251bff..9705bd17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -23,64 +23,79 @@ package org.opendc.simulator.resources import java.time.Clock -import kotlin.math.ceil import kotlin.math.max import kotlin.math.min /** * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ -public abstract class SimAbstractResourceContext<R : SimResource>( - override val resource: R, +public abstract class SimAbstractResourceContext( + initialCapacity: Double, override val clock: Clock, - private val consumer: SimResourceConsumer<R> -) : SimResourceContext<R> { + private val consumer: SimResourceConsumer +) : SimResourceContext { /** - * This method is invoked when the resource will idle until the specified [deadline]. + * The capacity of the resource. */ - public abstract fun onIdle(deadline: Long) + public final override var capacity: Double = initialCapacity + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } /** - * This method is invoked when the resource will be consumed until the specified [work] was processed or the - * [deadline] was reached. + * The amount of work still remaining at this instant. */ - public abstract fun onConsume(work: Double, limit: Double, deadline: Long) + override val remainingWork: Double + get() { + val activeCommand = activeCommand ?: return 0.0 + return computeRemainingWork(activeCommand, clock.millis()) + } /** - * This method is invoked when the resource consumer has finished. + * A flag to indicate the state of the context. */ - public abstract fun onFinish() + public var state: SimResourceState = SimResourceState.Pending + private set /** - * This method is invoked when the resource consumer throws an exception. + * The current processing speed of the resource. */ - public abstract fun onFailure(cause: Throwable) + public var speed: Double = 0.0 + private set /** - * Compute the duration that a resource consumption will take with the specified [speed]. + * This method is invoked when the resource will idle until the specified [deadline]. */ - protected open fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } + public abstract fun onIdle(deadline: Long) /** - * Compute the speed at which the resource may be consumed. + * This method is invoked when the resource will be consumed until the specified [work] was processed or the + * [deadline] was reached. */ - protected open fun getSpeed(limit: Double): Double { - return min(limit, resource.capacity) + public abstract fun onConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer has finished. + */ + public open fun onFinish(cause: Throwable?) { + consumer.onFinish(this, cause) } /** - * Get the remaining work to process after a resource consumption was flushed. + * Get the remaining work to process after a resource consumption. * * @param work The size of the resource consumption. * @param speed The speed of consumption. * @param duration The duration from the start of the consumption until now. - * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to - * it being interrupted before it could finish or reach its deadline. * @return The amount of work remaining. */ - protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { return if (duration > 0L) { val processed = duration / 1000.0 * speed max(0.0, work - processed) @@ -93,13 +108,19 @@ public abstract class SimAbstractResourceContext<R : SimResource>( * Start the consumer. */ public fun start() { - try { - isProcessing = true - latestFlush = clock.millis() + check(state == SimResourceState.Pending) { "Consumer is already started" } - interpret(consumer.onStart(this)) - } catch (e: Throwable) { - onFailure(e) + val now = clock.millis() + + state = SimResourceState.Active + isProcessing = true + latestFlush = now + + try { + consumer.onStart(this) + activeCommand = interpret(consumer.onNext(this), now) + } catch (cause: Throwable) { + doStop(cause) } finally { isProcessing = false } @@ -114,9 +135,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>( latestFlush = clock.millis() flush(isIntermediate = true) - onFinish() - } catch (e: Throwable) { - onFailure(e) + doStop(null) + } catch (cause: Throwable) { + doStop(cause) } finally { isProcessing = false } @@ -129,7 +150,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>( * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer * will be asked to deliver a new command and is essentially interrupted. */ - public open fun flush(isIntermediate: Boolean = false) { + public fun flush(isIntermediate: Boolean = false) { + // Flush is no-op when the consumer is finished or not yet started + if (state != SimResourceState.Active) { + return + } + val now = clock.millis() // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. @@ -141,44 +167,42 @@ public abstract class SimAbstractResourceContext<R : SimResource>( val activeCommand = activeCommand ?: return val (timestamp, command) = activeCommand + // Note: accessor is reliant on activeCommand being set + val remainingWork = remainingWork + isProcessing = true - this.activeCommand = null val duration = now - timestamp assert(duration >= 0) { "Flush in the past" } - when (command) { + this.activeCommand = when (command) { is SimResourceCommand.Idle -> { // We should only continue processing the next command if: // 1. The resource consumer reached its deadline. // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (command.deadline <= now || !isIntermediate) { - next(remainingWork = 0.0) + next(now) } else { - this.activeCommand = activeCommand + interpret(SimResourceCommand.Idle(command.deadline), now) } } is SimResourceCommand.Consume -> { - val speed = min(resource.capacity, command.limit) - val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed) - val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted) - // We should only continue processing the next command if: // 1. The resource consumption was finished. - // 2. The resource consumer reached its deadline. - // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + // 2. The resource capacity cannot satisfy the demand. + // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { - next(remainingWork) + next(now) } else { - interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) + interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) } } SimResourceCommand.Exit -> // Flush may not be called when the resource consumer has finished throw IllegalStateException() } - } catch (e: Throwable) { - onFailure(e) + } catch (cause: Throwable) { + doStop(cause) } finally { latestFlush = now isProcessing = false @@ -195,7 +219,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( flush() } - override fun toString(): String = "SimAbstractResourceContext[resource=$resource]" + override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" /** * A flag to indicate that the resource is currently processing a command. @@ -213,17 +237,30 @@ public abstract class SimAbstractResourceContext<R : SimResource>( private var latestFlush: Long = Long.MIN_VALUE /** - * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + * Finish the consumer and resource provider. */ - private fun interpret(command: SimResourceCommand) { - val now = clock.millis() + private fun doStop(cause: Throwable?) { + val state = state + this.state = SimResourceState.Stopped + + if (state == SimResourceState.Active) { + activeCommand = null + onFinish(cause) + } + } + /** + * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + */ + private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { when (command) { is SimResourceCommand.Idle -> { val deadline = command.deadline require(deadline >= now) { "Deadline already passed" } + speed = 0.0 + onIdle(deadline) } is SimResourceCommand.Consume -> { @@ -233,22 +270,57 @@ public abstract class SimAbstractResourceContext<R : SimResource>( require(deadline >= now) { "Deadline already passed" } + speed = min(capacity, limit) + onConsume(work, limit, deadline) } is SimResourceCommand.Exit -> { - onFinish() + speed = 0.0 + + doStop(null) + + // No need to set the next active command + return null } } - assert(activeCommand == null) { "Concurrent access to current command" } - activeCommand = CommandWrapper(now, command) + return CommandWrapper(now, command) } /** * Request the workload for more work. */ - private fun next(remainingWork: Double) { - interpret(consumer.onNext(this, remainingWork)) + private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) + + /** + * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. + */ + private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { + val (timestamp, command) = wrapper + val duration = now - timestamp + return when (command) { + is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) + is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 + } + } + + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + consumer.onCapacityChanged(this, isThrottled) + + // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. + // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). + if (isThrottled) { + flush(isIntermediate = true) + } } /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt new file mode 100644 index 00000000..bb4e6a2c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. + */ +public interface SimResourceAggregator : AutoCloseable { + /** + * The output resource provider to which resource consumers can be attached. + */ + public val output: SimResourceProvider + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set<SimResourceProvider> + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider) + + /** + * End the lifecycle of the aggregator. + */ + public override fun close() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt new file mode 100644 index 00000000..08bc064e --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A [SimResourceAggregator] that distributes the load equally across the input resources. + */ +public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { + private val consumers = mutableListOf<SimResourceContext>() + + override fun doConsume(work: Double, limit: Double, deadline: Long) { + // Sort all consumers by their capacity + consumers.sortWith(compareBy { it.capacity }) + + // Divide the requests over the available capacity of the input resources fairly + for (input in consumers) { + val inputCapacity = input.capacity + val fraction = inputCapacity / outputContext.capacity + val grantedSpeed = limit * fraction + val grantedWork = fraction * work + + commands[input] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + } + + override fun onContextStarted(ctx: SimResourceContext) { + super.onContextStarted(ctx) + + consumers.add(ctx) + } + + override fun onContextFinished(ctx: SimResourceContext) { + super.onContextFinished(ctx) + + consumers.remove(ctx) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 77c0a7a9..f7f3fa4d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -23,13 +23,13 @@ package org.opendc.simulator.resources /** - * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. */ public sealed class SimResourceCommand { /** * A request to the resource to perform the specified amount of work before the given [deadline]. * - * @param work The amount of work to process on the CPU. + * @param work The amount of work to process. * @param limit The maximum amount of work to be processed per second. * @param deadline The instant at which the work needs to be fulfilled. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index f516faa6..672a3e9d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -23,23 +23,49 @@ package org.opendc.simulator.resources /** - * A SimResourceConsumer characterizes how a [SimResource] is consumed. + * A [SimResourceConsumer] characterizes how a [SimResource] is consumed. + * + * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) + * for multiple resource providers, unless explicitly said otherwise. */ -public interface SimResourceConsumer<in R : SimResource> { +public interface SimResourceConsumer { + /** + * This method is invoked when the consumer is started for some resource. + * + * @param ctx The execution context in which the consumer runs. + */ + public fun onStart(ctx: SimResourceContext) {} + /** - * This method is invoked when the consumer is started for a resource. + * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because + * the resource finished processing, reached its deadline or was interrupted. * * @param ctx The execution context in which the consumer runs. - * @return The next command that the resource should perform. + * @return The next command that the resource should execute. */ - public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand + public fun onNext(ctx: SimResourceContext): SimResourceCommand /** - * This method is invoked when a resource was either interrupted or reached its deadline. + * This is method is invoked when the capacity of the resource changes. + * + * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the + * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly + * causing the active resource command to finish at a later moment than initially planned. * * @param ctx The execution context in which the consumer runs. - * @param remainingWork The remaining work that was not yet completed. - * @return The next command that the resource should perform. + * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the + * capacity change. + */ + public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} + + /** + * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], + * the resource finished itself, or a failure occurred at the resource. + * + * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider. + * + * @param ctx The execution context in which the consumer ran. + * @param cause The cause of the finish in case the resource finished exceptionally. */ - public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand + public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index dfb5e9ce..11dbb09f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -28,16 +28,21 @@ import java.time.Clock * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a * resource and a resource consumer. */ -public interface SimResourceContext<out R : SimResource> { +public interface SimResourceContext { /** - * The resource that is managed by this context. + * The virtual clock tracking simulation time. */ - public val resource: R + public val clock: Clock /** - * The virtual clock tracking simulation time. + * The resource capacity available at this instant. */ - public val clock: Clock + public val capacity: Double + + /** + * The amount of work still remaining at this instant. + */ + public val remainingWork: Double /** * Ask the resource provider to interrupt its resource. diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt new file mode 100644 index 00000000..b2759b7f --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. + */ +public interface SimResourceDistributor : AutoCloseable { + /** + * The output resource providers to which resource consumers can be attached. + */ + public val outputs: Set<SimResourceProvider> + + /** + * The input resource that will be distributed over the consumers. + */ + public val input: SimResourceProvider + + /** + * Add an output to the switch with the specified [capacity]. + */ + public fun addOutput(capacity: Double): SimResourceProvider +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt new file mode 100644 index 00000000..9df333e3 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -0,0 +1,420 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. + */ +public class SimResourceDistributorMaxMin( + override val input: SimResourceProvider, + private val clock: Clock, + private val listener: Listener? = null +) : SimResourceDistributor { + override val outputs: Set<SimResourceProvider> + get() = _outputs + private val _outputs = mutableSetOf<OutputProvider>() + + /** + * The active output contexts. + */ + private val outputContexts: MutableList<OutputContext> = mutableListOf() + + /** + * The total speed requested by the output resources. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the output resources. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the output resources. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the output resources. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private val consumer = object : SimResourceConsumer { + /** + * The resource context of the consumer. + */ + private lateinit var ctx: SimResourceContext + + val remainingWork: Double + get() = ctx.remainingWork + + override fun onStart(ctx: SimResourceContext) { + this.ctx = ctx + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() + + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call tou output.close() + iterator.remove() + + output.close() + } + } + } + + /** + * The total amount of remaining work. + */ + private val totalRemainingWork: Double + get() = consumer.remainingWork + + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Distributor has been closed" } + + val provider = OutputProvider(capacity) + _outputs.add(provider) + return provider + } + + override fun close() { + if (!isClosed) { + isClosed = true + input.cancel() + } + } + + init { + input.startConsumer(consumer) + } + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun schedule() { + input.interrupt() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule(capacity: Double): SimResourceCommand { + // If there is no work yet, mark all inputs as idle. + if (outputContexts.isEmpty()) { + return SimResourceCommand.Idle() + } + + val maxUsage = capacity + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Flush the work of the outputs + var outputIterator = outputContexts.listIterator() + while (outputIterator.hasNext()) { + val output = outputIterator.next() + + output.flush(isIntermediate = true) + + if (output.activeCommand == SimResourceCommand.Exit) { + // Apparently the output consumer has exited, so remove it from the scheduling queue. + outputIterator.remove() + } + } + + // Sort the outputs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + outputContexts.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + outputIterator = outputContexts.listIterator() + var remaining = outputContexts.size + while (outputIterator.hasNext()) { + val output = outputIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = output.activeCommand) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + output.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(output.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from an output + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } + } + } + + assert(deadline >= clock.millis()) { "Deadline already passed" } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = maxUsage - availableSpeed + this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + + return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) + SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + + /** + * Obtain the next command to perform. + */ + private fun doNext(capacity: Double): SimResourceCommand { + val totalRequestedWork = totalRequestedWork.toLong() + val totalRemainingWork = totalRemainingWork.toLong() + val totalAllocatedWork = totalAllocatedWork.toLong() + val totalRequestedSpeed = totalRequestedSpeed + val totalAllocatedSpeed = totalAllocatedSpeed + + // Force all inputs to re-schedule their work. + val command = doSchedule(capacity) + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork, + totalAllocatedWork - totalRemainingWork, + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalRequestedSpeed, + totalAllocatedSpeed, + ) + + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + return command + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class OutputProvider(val capacity: Double) : SimResourceProvider { + /** + * The [OutputContext] that is currently running. + */ + private var ctx: OutputContext? = null + + override var state: SimResourceState = SimResourceState.Pending + internal set + + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + + val ctx = OutputContext(this, consumer) + this.ctx = ctx + this.state = SimResourceState.Active + outputContexts += ctx + + ctx.start() + schedule() + } + + override fun close() { + cancel() + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Stopped + _outputs.remove(this) + } + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + } + + /** + * A [SimAbstractResourceContext] for the output resources. + */ + private inner class OutputContext( + private val provider: OutputProvider, + consumer: SimResourceConsumer + ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + + private fun reportOvercommit() { + val remainingWork = remainingWork + totalOvercommittedWork += remainingWork + } + + override fun onIdle(deadline: Long) { + reportOvercommit() + + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + reportOvercommit() + + allowedSpeed = speed + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } + + override fun onFinish(cause: Throwable?) { + reportOvercommit() + + activeCommand = SimResourceCommand.Exit + provider.cancel() + + super.onFinish(cause) + } + + override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { + // Apply performance interference model + val performanceScore = 1.0 + + // Compute the remaining amount of work + return if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 + } + } + + override fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isProcessing) { + return + } + + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } + + override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index ca23557c..1a05accd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -22,34 +22,19 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume - /** * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. */ -public class SimResourceForwarder<R : SimResource>(override val resource: R) : - SimResourceProvider<R>, SimResourceConsumer<R> { +public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { /** * The [SimResourceContext] in which the forwarder runs. */ - private var ctx: SimResourceContext<R>? = null - - /** - * A flag to indicate that the forwarder is closed. - */ - private var isClosed: Boolean = false - - /** - * The continuation to resume after consumption. - */ - private var cont: Continuation<Unit>? = null + private var ctx: SimResourceContext? = null /** * The delegate [SimResourceConsumer]. */ - private var delegate: SimResourceConsumer<R>? = null + private var delegate: SimResourceConsumer? = null /** * A flag to indicate that the delegate was started. @@ -57,99 +42,115 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : private var hasDelegateStarted: Boolean = false /** - * The remaining amount of work last cycle. + * The state of the forwarder. */ - private var remainingWork: Double = 0.0 - - override suspend fun consume(consumer: SimResourceConsumer<R>) { - check(!isClosed) { "Lifecycle of forwarder has ended" } - check(cont == null) { "Run should not be called concurrently" } + override var state: SimResourceState = SimResourceState.Pending + private set - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.delegate = consumer + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } - cont.invokeOnCancellation { reset() } + state = SimResourceState.Active + delegate = consumer - ctx?.interrupt() - } + // Interrupt the provider to replace the consumer + interrupt() } override fun interrupt() { ctx?.interrupt() } + override fun cancel() { + val delegate = delegate + val ctx = ctx + + state = SimResourceState.Pending + + if (delegate != null && ctx != null) { + this.delegate = null + delegate.onFinish(ctx) + } + } + override fun close() { - isClosed = true - interrupt() - ctx = null + val ctx = ctx + + state = SimResourceState.Stopped + + if (ctx != null) { + this.ctx = null + ctx.interrupt() + } } - override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { + override fun onStart(ctx: SimResourceContext) { this.ctx = ctx - - return onNext(ctx, 0.0) } - override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { - this.remainingWork = remainingWork + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val delegate = delegate - return if (isClosed) { - SimResourceCommand.Exit - } else if (!hasDelegateStarted) { + if (!hasDelegateStarted) { start() + } + + return if (state == SimResourceState.Stopped) { + SimResourceCommand.Exit + } else if (delegate != null) { + val command = delegate.onNext(ctx) + if (command == SimResourceCommand.Exit) { + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onFinish(ctx) + + if (state == SimResourceState.Stopped) + SimResourceCommand.Exit + else + onNext(ctx) + } else { + command + } } else { - next() + SimResourceCommand.Idle() } } - /** - * Start the delegate. - */ - private fun start(): SimResourceCommand { - val delegate = delegate ?: return SimResourceCommand.Idle() - val command = delegate.onStart(checkNotNull(ctx)) - - hasDelegateStarted = true - - return forward(command) + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + delegate?.onCapacityChanged(ctx, isThrottled) } - /** - * Obtain the next command to process. - */ - private fun next(): SimResourceCommand { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + this.ctx = null + val delegate = delegate - return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle()) + if (delegate != null) { + reset() + delegate.onFinish(ctx, cause) + } } /** - * Forward the specified [command]. + * Start the delegate. */ - private fun forward(command: SimResourceCommand): SimResourceCommand { - return if (command == SimResourceCommand.Exit) { - val cont = checkNotNull(cont) - - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - cont.resume(Unit) + private fun start() { + val delegate = delegate ?: return + delegate.onStart(checkNotNull(ctx)) - if (isClosed) - SimResourceCommand.Exit - else - start() - } else { - command - } + hasDelegateStarted = true } /** * Reset the delegate. */ private fun reset() { - cont = null delegate = null hasDelegateStarted = false + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index e35aa683..52b13c5c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -22,29 +22,58 @@ package org.opendc.simulator.resources +import kotlinx.coroutines.suspendCancellableCoroutine + /** * A [SimResourceProvider] provides some resource of type [R]. */ -public interface SimResourceProvider<out R : SimResource> : AutoCloseable { +public interface SimResourceProvider : AutoCloseable { /** - * The resource that is managed by this provider. + * The state of the resource. */ - public val resource: R + public val state: SimResourceState /** - * Consume the resource provided by this provider using the specified [consumer]. + * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. + * + * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. */ - public suspend fun consume(consumer: SimResourceConsumer<R>) + public fun startConsumer(consumer: SimResourceConsumer) /** - * Interrupt the resource. + * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. */ public fun interrupt() /** + * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. + */ + public fun cancel() + + /** * End the lifetime of the resource. * * This operation terminates the existing resource consumer. */ public override fun close() } + +/** + * Consume the resource provided by this provider using the specified [consumer] and suspend execution until + * the consumer has finished. + */ +public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { + return suspendCancellableCoroutine { cont -> + startConsumer(object : SimResourceConsumer by consumer { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + assert(!cont.isCompleted) { "Coroutine already completed" } + + consumer.onFinish(ctx, cause) + + cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) + } + + override fun toString(): String = "SimSuspendingResourceConsumer" + }) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 540a17c9..9b10edaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,27 +22,25 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException +import kotlin.math.ceil import kotlin.math.min /** * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. * - * @param resource The resource to provide. + * @param initialCapacity The initial capacity of the resource. * @param clock The virtual clock to track simulation time. + * @param scheduler The scheduler to schedule the interrupts. */ -public class SimResourceSource<R : SimResource>( - override val resource: R, +public class SimResourceSource( + initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler<Any> -) : SimResourceProvider<R> { +) : SimResourceProvider { /** * The resource processing speed over time. */ @@ -51,71 +49,59 @@ public class SimResourceSource<R : SimResource>( private val _speed = MutableStateFlow(0.0) /** - * A flag to indicate that the resource was closed. + * The capacity of the resource. */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation<Unit>? = null + public var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } /** * The [Context] that is currently running. */ private var ctx: Context? = null - override suspend fun consume(consumer: SimResourceConsumer<R>) { - check(!isClosed) { "Lifetime of resource has ended." } - check(cont == null) { "Run should not be called concurrently" } + override var state: SimResourceState = SimResourceState.Pending + private set - try { - return suspendCancellableCoroutine { cont -> - val ctx = Context(consumer, cont) + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + val ctx = Context(consumer) - this.cont = cont - this.ctx = ctx + this.ctx = ctx + this.state = SimResourceState.Active - ctx.start() - cont.invokeOnCancellation { - ctx.stop() - } - } - } finally { - cont = null - ctx = null - } + ctx.start() } override fun close() { - isClosed = true - cont?.cancel() - cont = null - ctx = null + cancel() + state = SimResourceState.Stopped } override fun interrupt() { ctx?.interrupt() } + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context( - consumer: SimResourceConsumer<R>, - val cont: Continuation<Unit> - ) : SimAbstractResourceContext<R>(resource, clock, consumer) { - /** - * The processing speed of the resource. - */ - private var speed: Double = 0.0 - set(value) { - field = value - _speed.value = field - } - + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - speed = 0.0 + _speed.value = speed // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { @@ -124,24 +110,28 @@ public class SimResourceSource<R : SimResource>( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - speed = getSpeed(limit) + _speed.value = speed + val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } - override fun onFinish() { - speed = 0.0 + override fun onFinish(cause: Throwable?) { + _speed.value = speed scheduler.cancel(this) - cont.resume(Unit) - } + cancel() - override fun onFailure(cause: Throwable) { - speed = 0.0 - scheduler.cancel(this) - cont.resumeWithException(cause) + super.onFinish(cause) } - override fun toString(): String = "SimResourceSource.Context[resource=$resource]" + override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" + } + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + private fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt index 31b0a175..c72951d0 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -23,11 +23,21 @@ package org.opendc.simulator.resources /** - * A generic representation of resource that may be consumed. + * The state of a resource provider. */ -public interface SimResource { +public enum class SimResourceState { /** - * The capacity of the resource. + * The resource provider is pending and the resource is waiting to be consumed. */ - public val capacity: Double + Pending, + + /** + * The resource provider is active and the resource is currently being consumed. + */ + Active, + + /** + * The resource provider is stopped and the resource cannot be consumed anymore. + */ + Stopped } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index cd1af3fc..53fec16a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -23,26 +23,26 @@ package org.opendc.simulator.resources /** - * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers. + * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. */ -public interface SimResourceSwitch<R : SimResource> : AutoCloseable { +public interface SimResourceSwitch : AutoCloseable { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set<SimResourceProvider<R>> + public val outputs: Set<SimResourceProvider> /** * The input resources that will be switched between the output providers. */ - public val inputs: Set<SimResourceProvider<R>> + public val inputs: Set<SimResourceProvider> /** - * Add an output to the switch represented by [resource]. + * Add an output to the switch with the specified [capacity]. */ - public fun addOutput(resource: R): SimResourceProvider<R> + public fun addOutput(capacity: Double): SimResourceProvider /** * Add the specified [input] to the switch. */ - public fun addInput(input: SimResourceProvider<R>) + public fun addInput(input: SimResourceProvider) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 060d0ea2..a10f84b6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -22,69 +22,72 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.launch import java.util.ArrayDeque -import kotlin.coroutines.CoroutineContext /** * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. */ -public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> { +public class SimResourceSwitchExclusive : SimResourceSwitch { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. + * A flag to indicate that the switch is closed. */ - private val scope = CoroutineScope(context + Job()) + private var isClosed: Boolean = false - private val _outputs = mutableSetOf<SimResourceProvider<R>>() - override val outputs: Set<SimResourceProvider<R>> + private val _outputs = mutableSetOf<Provider>() + override val outputs: Set<SimResourceProvider> get() = _outputs - private val availableResources = ArrayDeque<SimResourceForwarder<R>>() - private val _inputs = mutableSetOf<SimResourceProvider<R>>() - override val inputs: Set<SimResourceProvider<R>> + private val availableResources = ArrayDeque<SimResourceForwarder>() + + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> get() = _inputs - override fun addOutput(resource: R): SimResourceProvider<R> { + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(resource, forwarder) + val output = Provider(capacity, forwarder) _outputs += output return output } - override fun addInput(input: SimResourceProvider<R>) { + override fun addInput(input: SimResourceProvider) { + check(!isClosed) { "Switch has been closed" } + if (input in inputs) { return } - val forwarder = SimResourceForwarder(input.resource) - - scope.launch { input.consume(forwarder) } + val forwarder = SimResourceForwarder() _inputs += input availableResources += forwarder + + input.startConsumer(object : SimResourceConsumer by forwarder { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + // De-register the input after it has finished + _inputs -= input + forwarder.onFinish(ctx, cause) + } + }) } override fun close() { - scope.cancel() + isClosed = true + + // Cancel all upstream subscriptions + _inputs.forEach(SimResourceProvider::cancel) } private inner class Provider( - override val resource: R, - private val forwarder: SimResourceForwarder<R> - ) : SimResourceProvider<R> { - - override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer) - - override fun interrupt() { - forwarder.interrupt() - } - + private val capacity: Double, + private val forwarder: SimResourceForwarder + ) : SimResourceProvider by forwarder { override fun close() { + // We explicitly do not close the forwarder here in order to re-use it across output resources. + _outputs -= this availableResources += forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index bcf76d3c..c796c251 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -23,99 +23,61 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* -import org.opendc.simulator.resources.consumer.SimConsumerBarrier import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ -public class SimResourceSwitchMaxMin<R : SimResource>( - private val clock: Clock, - context: CoroutineContext, - private val listener: Listener<R>? = null -) : SimResourceSwitch<R> { - /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - - private val inputConsumers = mutableSetOf<InputConsumer>() - private val _outputs = mutableSetOf<OutputProvider>() - override val outputs: Set<SimResourceProvider<R>> +public class SimResourceSwitchMaxMin( + clock: Clock, + private val listener: Listener? = null +) : SimResourceSwitch { + private val _outputs = mutableSetOf<SimResourceProvider>() + override val outputs: Set<SimResourceProvider> get() = _outputs - private val _inputs = mutableSetOf<SimResourceProvider<R>>() - override val inputs: Set<SimResourceProvider<R>> + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> get() = _inputs /** - * The commands to submit to the underlying host. - */ - private val commands = mutableMapOf<R, SimResourceCommand>() - - /** - * The active output contexts. - */ - private val outputContexts: MutableList<OutputContext> = mutableListOf() - - /** - * The total amount of remaining work (of all pCPUs). - */ - private var totalRemainingWork: Double = 0.0 - - /** - * The total speed requested by the vCPUs. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The total amount of work requested by the vCPUs. - */ - private var totalRequestedWork = 0.0 - - /** - * The total allocated speed for the vCPUs. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total allocated work requested for the vCPUs. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. + * A flag to indicate that the switch was closed. */ - private var totalInterferedWork = 0.0 + private var isClosed = false /** - * A flag to indicate that the scheduler has submitted work that has not yet been completed. + * The aggregator to aggregate the resources. */ - private var isDirty: Boolean = false + private val aggregator = SimResourceAggregatorMaxMin(clock) /** - * The scheduler barrier. + * The distributor to distribute the aggregated resources. */ - private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) + private val distributor = SimResourceDistributorMaxMin( + aggregator.output, clock, + object : SimResourceDistributorMaxMin.Listener { + override fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) + } + } + ) /** * Add an output to the switch represented by [resource]. */ - override fun addOutput(resource: R): SimResourceProvider<R> { - val provider = OutputProvider(resource) + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Switch has been closed" } + + val provider = distributor.addOutput(capacity) _outputs.add(provider) return provider } @@ -123,166 +85,29 @@ public class SimResourceSwitchMaxMin<R : SimResource>( /** * Add the specified [input] to the switch. */ - override fun addInput(input: SimResourceProvider<R>) { - val consumer = InputConsumer(input) - _inputs.add(input) - inputConsumers += consumer - } - - override fun close() { - scope.cancel() - } - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - isDirty = true - interruptAll() - } - - /** - * Schedule the work over the physical CPUs. - */ - private fun doSchedule() { - // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { - commands.replaceAll { _, _ -> SimResourceCommand.Idle() } - interruptAll() - } - - val maxUsage = inputs.sumByDouble { it.resource.capacity } - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage - var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 - - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - val outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() - val availableShare = availableSpeed / remaining-- - - when (val command = output.activeCommand) { - is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - output.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - output.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += command.limit - totalRequestedWork += command.work - - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request from an output - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> { - // Apparently the output consumer has exited, so remove it from the scheduling queue. - outputIterator.remove() - } - } - } - - // Round the duration to milliseconds - duration = ceil(duration * 1000) / 1000 - - assert(deadline >= clock.millis()) { "Deadline already passed" } - - val totalAllocatedSpeed = maxUsage - availableSpeed - var totalAllocatedWork = 0.0 - availableSpeed = totalAllocatedSpeed - - // Divide the requests over the available capacity of the input resources fairly - for (input in inputs.sortedByDescending { it.resource.capacity }) { - val maxResourceUsage = input.resource.capacity - val fraction = maxResourceUsage / maxUsage - val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction) - val grantedWork = duration * grantedSpeed - - commands[input.resource] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - - totalAllocatedWork += grantedWork - availableSpeed -= grantedSpeed - } - - this.totalRequestedSpeed = totalRequestedSpeed - this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = totalAllocatedSpeed - this.totalAllocatedWork = totalAllocatedWork - - interruptAll() - } - - /** - * Flush the progress of the vCPUs. - */ - private fun flushGuests() { - // Flush all the outputs work - for (output in outputContexts) { - output.flush(isIntermediate = true) - } - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork.toLong(), - (totalAllocatedWork - totalRemainingWork).toLong(), - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalRequestedSpeed, - totalAllocatedSpeed - ) - totalRemainingWork = 0.0 - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 + override fun addInput(input: SimResourceProvider) { + check(!isClosed) { "Switch has been closed" } - // Force all inputs to re-schedule their work. - doSchedule() + aggregator.addInput(input) } - /** - * Interrupt all inputs. - */ - private fun interruptAll() { - for (input in inputConsumers) { - input.interrupt() + override fun close() { + if (!isClosed) { + isClosed = true + distributor.close() + aggregator.close() } } /** * Event listener for hypervisor events. */ - public interface Listener<R : SimResource> { + public interface Listener { /** * This method is invoked when a slice is finished. */ public fun onSliceFinish( - switch: SimResourceSwitchMaxMin<R>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -291,218 +116,4 @@ public class SimResourceSwitchMaxMin<R : SimResource>( cpuDemand: Double ) } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> { - /** - * A flag to indicate that the resource was closed. - */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation<Unit>? = null - - /** - * The [OutputContext] that is currently running. - */ - private var ctx: OutputContext? = null - - override suspend fun consume(consumer: SimResourceConsumer<R>) { - check(!isClosed) { "Lifetime of resource has ended." } - check(cont == null) { "Run should not be called concurrently" } - - try { - return suspendCancellableCoroutine { cont -> - val ctx = OutputContext(resource, consumer, cont) - ctx.start() - cont.invokeOnCancellation { - ctx.stop() - } - - this.cont = cont - this.ctx = ctx - - outputContexts += ctx - schedule() - } - } finally { - cont = null - ctx = null - } - } - - override fun close() { - isClosed = true - cont?.cancel() - cont = null - ctx = null - _outputs.remove(this) - } - - override fun interrupt() { - ctx?.interrupt() - } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - resource: R, - consumer: SimResourceConsumer<R>, - private val cont: Continuation<Unit> - ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - override fun onIdle(deadline: Long) { - allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - allowedSpeed = getSpeed(limit) - activeCommand = SimResourceCommand.Consume(work, limit, deadline) - } - - override fun onFinish() { - hasExited = true - activeCommand = SimResourceCommand.Exit - cont.resume(Unit) - } - - override fun onFailure(cause: Throwable) { - hasExited = true - activeCommand = SimResourceCommand.Exit - cont.resumeWithException(cause) - } - - override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { - // Apply performance interference model - val performanceScore = 1.0 - - // Compute the remaining amount of work - val remainingWork = if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) - } else { - 0.0 - } - - if (!isInterrupted) { - totalOvercommittedWork += remainingWork - } - - return remainingWork - } - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } - - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) - } - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext<R> - - init { - scope.launch { - try { - barrier = SimConsumerBarrier(barrier.parties + 1) - input.consume(this@InputConsumer) - } catch (e: CancellationException) { - // Cancel gracefully - throw e - } catch (e: Throwable) { - e.printStackTrace() - } finally { - barrier = SimConsumerBarrier(barrier.parties - 1) - inputConsumers -= this@InputConsumer - _inputs -= input - } - } - } - - /** - * Interrupt the consumer - */ - fun interrupt() { - ctx.interrupt() - } - - override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { - this.ctx = ctx - return commands[ctx.resource] ?: SimResourceCommand.Idle() - } - - override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { - totalRemainingWork += remainingWork - val isLast = barrier.enter() - - // Flush the progress of the guest after the barrier has been reached. - if (isLast && isDirty) { - isDirty = false - flushGuests() - } - - return if (isDirty) { - // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. - SimResourceCommand.Idle() - } else { - // Indicate that the scheduler needs to run next call. - if (isLast) { - isDirty = true - } - - commands[ctx.resource] ?: SimResourceCommand.Idle() - } - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt index 7aa5a5aa..52a42241 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt @@ -42,4 +42,11 @@ public class SimConsumerBarrier(public val parties: Int) { } return false } + + /** + * Reset the barrier. + */ + public fun reset() { + counter = 0 + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt new file mode 100644 index 00000000..fd4a9ed5 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext +import kotlin.math.min + +/** + * Helper class to expose an observable [speed] field describing the speed of the consumer. + */ +public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate { + /** + * The resource processing speed over time. + */ + public val speed: StateFlow<Double> + get() = _speed + private val _speed = MutableStateFlow(0.0) + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val command = delegate.onNext(ctx) + + when (command) { + is SimResourceCommand.Idle -> _speed.value = 0.0 + is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit) + is SimResourceCommand.Exit -> _speed.value = 0.0 + } + + return command + } + + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + val oldSpeed = _speed.value + + delegate.onCapacityChanged(ctx, isThrottled) + + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == _speed.value) { + _speed.value = min(ctx.capacity, _speed.value) + } + } + + override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 03a3cebd..a52d1d5d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -31,14 +30,16 @@ import org.opendc.simulator.resources.SimResourceContext * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> { - private val iterator = trace.iterator() +public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { + private var iterator: Iterator<Fragment>? = null - override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand { - return onNext(ctx, 0.0) + override fun onStart(ctx: SimResourceContext) { + check(iterator == null) { "Consumer already running" } + iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() val fragment = iterator.next() @@ -56,6 +57,10 @@ public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<S } } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + iterator = null + } + /** * A fragment of the workload. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt new file mode 100644 index 00000000..faa693c4 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext + +/** + * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. + */ +public class SimWorkConsumer( + private val work: Double, + private val utilization: Double +) : SimResourceConsumer { + + init { + require(work >= 0.0) { "Work must be positive" } + require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } + } + + private var isFirst = true + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val limit = ctx.capacity * utilization + val work = if (isFirst) { + isFirst = false + work + } else { + ctx.remainingWork + } + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt new file mode 100644 index 00000000..de864c1c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * Test suite for the [SimResourceAggregatorMaxMin] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceAggregatorMaxMinTest { + @Test + fun testSingleCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + val usage = mutableListOf<Double>() + val job = launch { sources[0].speed.toList(usage) } + + try { + aggregator.output.consume(consumer) + yield() + + assertAll( + { assertEquals(1000, currentTime) }, + { assertEquals(listOf(0.0, 0.5, 0.0), usage) } + ) + } finally { + aggregator.output.close() + job.cancel() + } + } + + @Test + fun testDoubleCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(2.0, 1.0) + val usage = mutableListOf<Double>() + val job = launch { sources[0].speed.toList(usage) } + + try { + aggregator.output.consume(consumer) + yield() + assertAll( + { assertEquals(1000, currentTime) }, + { assertEquals(listOf(0.0, 1.0, 0.0), usage) } + ) + } finally { + aggregator.output.close() + job.cancel() + } + } + + @Test + fun testOvercommit() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.output.consume(consumer) + yield() + assertEquals(1000, currentTime) + + verify(exactly = 2) { consumer.onNext(any()) } + } finally { + aggregator.output.close() + } + } + + @Test + fun testException() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } + yield() + assertEquals(SimResourceState.Pending, sources[0].state) + } finally { + aggregator.output.close() + } + } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(4.0, 1.0) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(1000) + sources[0].capacity = 0.5 + } + yield() + assertEquals(2334, currentTime) + } finally { + aggregator.output.close() + } + } + + @Test + fun testFailOverCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + try { + coroutineScope { + launch { aggregator.output.consume(consumer) } + delay(500) + sources[0].capacity = 0.5 + } + yield() + assertEquals(1000, currentTime) + } finally { + aggregator.output.close() + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index e7642dc1..030a0f6b 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -22,11 +22,10 @@ package org.opendc.simulator.resources +import io.mockk.* import kotlinx.coroutines.* import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.opendc.simulator.utils.DelayControllerClockAdapter /** @@ -34,39 +33,17 @@ import org.opendc.simulator.utils.DelayControllerClockAdapter */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testFlushWithoutCommand() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(10.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - } - - override fun onFinish() { - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - override fun onFailure(cause: Throwable) { - } + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + override fun onFinish(cause: Throwable?) {} } context.flush() @@ -75,72 +52,35 @@ class SimResourceContextTest { @Test fun testIntermediateFlush() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(10.0, 1.0) - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - var counter = 0 - val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - counter++ - } - - override fun onFinish() { - } - - override fun onFailure(cause: Throwable) { - } - } + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path context.flush(isIntermediate = true) - assertEquals(2, counter) + + verify(exactly = 2) { context.onConsume(any(), any(), any()) } } @Test fun testIntermediateFlushIdle() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Idle(10) - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - - var counter = 0 - var isFinished = false - val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { - override fun onIdle(deadline: Long) { - counter++ - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - } - - override fun onFinish() { - isFinished = true - } - - override fun onFailure(cause: Throwable) { - } - } + val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + }) context.start() delay(5) @@ -149,8 +89,25 @@ class SimResourceContextTest { context.flush(isIntermediate = true) assertAll( - { assertEquals(1, counter) }, - { assertTrue(isFinished) } + { verify(exactly = 2) { context.onIdle(any()) } }, + { verify(exactly = 1) { context.onFinish(null) } } ) } + + @Test + fun testDoubleStart() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit + + val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + override fun onIdle(deadline: Long) {} + override fun onFinish(cause: Throwable?) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + } + + context.start() + assertThrows<IllegalStateException> { context.start() } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index ced1bd98..143dbca9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -22,10 +22,16 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.launch +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -34,59 +40,147 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceForwarderTest { - - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testExitImmediately() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) launch { source.consume(forwarder) source.close() } - forwarder.consume(object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Exit - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + forwarder.consume(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } }) + forwarder.close() scheduler.close() } @Test fun testExit() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) launch { source.consume(forwarder) source.close() } - forwarder.consume(object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } + forwarder.consume(object : SimResourceConsumer { + var isFirst = true - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(10.0, 1.0) + } else { + SimResourceCommand.Exit + } } }) forwarder.close() } + + @Test + fun testState() = runBlockingTest { + val forwarder = SimResourceForwarder() + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit + } + + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.startConsumer(consumer) + assertEquals(SimResourceState.Active, forwarder.state) + + assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } + + forwarder.cancel() + assertEquals(SimResourceState.Pending, forwarder.state) + + forwarder.close() + assertEquals(SimResourceState.Stopped, forwarder.state) + } + + @Test + fun testCancelPendingDelegate() = runBlockingTest { + val forwarder = SimResourceForwarder() + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Exit + + forwarder.startConsumer(consumer) + forwarder.cancel() + + verify(exactly = 0) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelStartedDelegate() = runBlockingTest { + val forwarder = SimResourceForwarder() + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(2000.0, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + forwarder.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } + + @Test + fun testCancelPropagation() = runBlockingTest { + val forwarder = SimResourceForwarder() + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(2000.0, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + source.cancel() + + verify(exactly = 1) { consumer.onStart(any()) } + verify(exactly = 1) { consumer.onFinish(any(), null) } + } + + @Test + fun testAdjustCapacity() = runBlockingTest { + val forwarder = SimResourceForwarder() + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(1.0, clock, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 4f7825fc..58e19421 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -22,11 +22,16 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -35,26 +40,17 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceSourceTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSpeed() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) + .andThen(SimResourceCommand.Exit) try { val res = mutableListOf<Double>() @@ -63,7 +59,7 @@ class SimResourceSourceTest { provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() provider.close() @@ -71,20 +67,38 @@ class SimResourceSourceTest { } @Test - fun testSpeedLimit() = runBlockingTest { + fun testAdjustCapacity() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val provider = SimResourceSource(1.0, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed) - } + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit + try { + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 } + assertEquals(3000, currentTime) + verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + } finally { + scheduler.close() + provider.close() } + } + + @Test + fun testSpeedLimit() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) + .andThen(SimResourceCommand.Exit) try { val res = mutableListOf<Double>() @@ -93,7 +107,7 @@ class SimResourceSourceTest { provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() provider.close() @@ -108,16 +122,16 @@ class SimResourceSourceTest { fun testIntermediateInterrupt() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { ctx.interrupt() - return SimResourceCommand.Exit } - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return SimResourceCommand.Exit } } @@ -133,18 +147,24 @@ class SimResourceSourceTest { fun testInterrupt() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - lateinit var resCtx: SimResourceContext<SimCpu> + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + lateinit var resCtx: SimResourceContext - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + val consumer = object : SimResourceConsumer { + var isFirst = true + override fun onStart(ctx: SimResourceContext) { resCtx = ctx - return SimResourceCommand.Consume(4.0, 1.0) } - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - assertEquals(0.0, remainingWork) - return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + assertEquals(0.0, ctx.remainingWork) + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(4.0, 1.0) + } else { + SimResourceCommand.Exit + } } } @@ -166,17 +186,12 @@ class SimResourceSourceTest { fun testFailure() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - throw IllegalStateException() - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onStart(any()) } + .throws(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -192,17 +207,13 @@ class SimResourceSourceTest { fun testExceptionPropagationOnNext() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -218,17 +229,13 @@ class SimResourceSourceTest { fun testConcurrentConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -247,17 +254,13 @@ class SimResourceSourceTest { fun testClosedConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { assertThrows<IllegalStateException> { @@ -274,17 +277,13 @@ class SimResourceSourceTest { fun testCloseDuringConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) try { launch { provider.consume(consumer) } @@ -302,17 +301,13 @@ class SimResourceSourceTest { fun testIdle() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Idle(ctx.clock.millis() + 500) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Idle(clock.millis() + 500)) + .andThen(SimResourceCommand.Exit) try { provider.consume(consumer) @@ -330,17 +325,13 @@ class SimResourceSourceTest { runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Idle() - } + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Idle()) + .andThenThrows(IllegalStateException()) try { provider.consume(consumer) @@ -351,4 +342,26 @@ class SimResourceSourceTest { } } } + + @Test + fun testIncorrectDeadline() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Idle(2)) + .andThen(SimResourceCommand.Exit) + + try { + delay(10) + + assertThrows<IllegalArgumentException> { provider.consume(consumer) } + } finally { + scheduler.close() + provider.close() + } + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index ca6558bf..edd60502 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch @@ -34,18 +36,12 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler -import java.lang.IllegalStateException /** * Test suite for the [SimResourceSwitchExclusive] class. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchExclusiveTest { - class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - /** * Test a trace workload. */ @@ -67,12 +63,12 @@ internal class SimResourceSwitchExclusiveTest { ), ) - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) val job = launch { source.speed.toList(speed) } try { @@ -98,22 +94,15 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(duration / 1000.0, 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { provider.consume(workload) @@ -133,22 +122,29 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(duration / 1000.0, 1.0) + val workload = object : SimResourceConsumer { + var isFirst = true + + override fun onStart(ctx: SimResourceContext) { + isFirst = true } - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return if (isFirst) { + isFirst = false + SimResourceCommand.Consume(duration / 1000.0, 1.0) + } else { + SimResourceCommand.Exit + } } } - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { provider.consume(workload) @@ -169,22 +165,15 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(duration.toDouble(), 1.0) - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - switch.addOutput(SimCpu(3200.0)) - assertThrows<IllegalStateException> { switch.addOutput(SimCpu(3200.0)) } + switch.addOutput(3200.0) + assertThrows<IllegalStateException> { switch.addOutput(3200.0) } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 698c1700..5f4fd187 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -38,31 +40,19 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchMaxMinTest { - class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSmoke() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext) + val switch = SimResourceSwitchMaxMin(clock) - val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) } + val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) } sources.forEach { switch.addInput(it) } - val provider = switch.addOutput(SimCpu(1000.0)) - - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } + val provider = switch.addOutput(1000.0) - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit try { provider.consume(consumer) @@ -81,13 +71,13 @@ internal class SimResourceSwitchMaxMinTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> { + val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L override fun onSliceFinish( - switch: SimResourceSwitchMaxMin<SimCpu>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -112,11 +102,11 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) - val provider = switch.addOutput(SimCpu(3200.0)) + val switch = SimResourceSwitchMaxMin(clock, listener) + val provider = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) provider.consume(workload) yield() } finally { @@ -140,13 +130,13 @@ internal class SimResourceSwitchMaxMinTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> { + val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L override fun onSliceFinish( - switch: SimResourceSwitchMaxMin<SimCpu>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -180,12 +170,12 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) - val providerA = switch.addOutput(SimCpu(3200.0)) - val providerB = switch.addOutput(SimCpu(3200.0)) + val switch = SimResourceSwitchMaxMin(clock, listener) + val providerA = switch.addOutput(3200.0) + val providerB = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) coroutineScope { launch { providerA.consume(workloadA) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt new file mode 100644 index 00000000..4d6b19ee --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * A test suite for the [SimWorkConsumer] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimWorkConsumerTest { + @Test + fun testSmoke() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = SimWorkConsumer(1.0, 1.0) + + try { + provider.consume(consumer) + assertEquals(1000, currentTime) + } finally { + provider.close() + } + } + + @Test + fun testUtilization() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(1.0, clock, scheduler) + + val consumer = SimWorkConsumer(1.0, 0.5) + + try { + provider.consume(consumer) + assertEquals(2000, currentTime) + } finally { + provider.close() + } + } +} diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index 49964938..d4bc7b5c 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -93,7 +93,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo try { timer() } catch (e: Throwable) { - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e) + coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e) } } } |
