summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-23 11:56:53 +0100
committerGitHub <noreply@github.com>2021-03-23 11:56:53 +0100
commit6de1ef7424e058603be9ae5a86f0568b40579e5f (patch)
tree2a882a67667e8efcd51d74cfbe32fdeaad02f502
parent0fa1dc262905c42b3549172fea59f7ad4cb58b1f (diff)
parent38a13e5c201c828f9f21f17e89916b4638396945 (diff)
simulator: Add uniform resource consumption model (v2)
This is the second pull request in the series of pull requests to add a uniform resource consumption model to OpenDC. This pull request focusses on adding dynamic capacity negotiation and propagation between resource consumer and resource provider: * The generic resource constraint is removed from the interfaces of `opendc-simulator-resources`. Users of the API are expected to use the untyped variants where only the capacity needs to be specified. Users are expected to build higher-level abstractions on top of these interface to represent actual resources (e.g., CPU, disk or network). * Added benchmarks for the most important implementations of `opendc-simulator-resources`. This allows us to quantify the effects of changes on the runtime. * The `SimResourceSwitchMaxMin` has been split into a `SimResourceAggregatorMaxMin` and `SimResourceDistributorMaxMin` which respectively aggregate input resources and distribute output resources using max-min fair sharing. * The `SimResourceConsumer` interface has a new method for receiving capacity change events: `onCapacityChanged(ctx, isThrottled)` **Breaking API Changes** * All interfaces in `opendc-simulator-resources`.
-rw-r--r--simulator/buildSrc/build.gradle.kts2
-rw-r--r--simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts60
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt18
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt14
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt16
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt12
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt16
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt12
-rw-r--r--simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt27
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt25
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt6
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt11
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt13
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt6
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt)9
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt)2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt)11
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt31
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt27
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt44
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt23
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt43
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt139
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt197
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt188
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt48
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt63
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt44
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt15
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt43
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt420
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt149
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt41
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt110
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt (renamed from simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt)20
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt63
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt475
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt7
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt68
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt17
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt58
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt208
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt127
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt142
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt231
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt73
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt46
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt69
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt2
58 files changed, 2365 insertions, 1200 deletions
diff --git a/simulator/buildSrc/build.gradle.kts b/simulator/buildSrc/build.gradle.kts
index be071d0c..a71e18cf 100644
--- a/simulator/buildSrc/build.gradle.kts
+++ b/simulator/buildSrc/build.gradle.kts
@@ -36,6 +36,8 @@ dependencies {
implementation(kotlin("gradle-plugin", version = "1.4.31"))
implementation("org.jlleitschuh.gradle:ktlint-gradle:10.0.0")
implementation("org.jetbrains.dokka:dokka-gradle-plugin:0.10.1")
+ implementation("org.jetbrains.kotlin:kotlin-allopen:1.4.30")
+ implementation("org.jetbrains.kotlinx:kotlinx-benchmark-plugin:0.3.0")
}
kotlinDslPluginOptions {
diff --git a/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
new file mode 100644
index 00000000..d3bb886d
--- /dev/null
+++ b/simulator/buildSrc/src/main/kotlin/benchmark-conventions.gradle.kts
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+import kotlinx.benchmark.gradle.*
+import org.jetbrains.kotlin.allopen.gradle.*
+
+plugins {
+ id("org.jetbrains.kotlinx.benchmark")
+ `java-library`
+ kotlin("plugin.allopen")
+}
+
+sourceSets {
+ register("jmh") {
+ compileClasspath += sourceSets["main"].output
+ runtimeClasspath += sourceSets["main"].output
+ }
+}
+
+configurations {
+ named("jmhImplementation") {
+ extendsFrom(configurations["implementation"])
+ }
+}
+
+configure<AllOpenExtension> {
+ annotation("org.openjdk.jmh.annotations.State")
+}
+
+benchmark {
+ targets {
+ register("jmh") {
+ this as JvmBenchmarkTarget
+ jmhVersion = "1.21"
+ }
+ }
+}
+
+dependencies {
+ implementation("org.jetbrains.kotlinx:kotlinx-benchmark-runtime-jvm:0.3.0")
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 6e9b8151..694676bc 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -35,7 +35,7 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.failures.FailureDomain
import org.opendc.utils.flow.EventFlow
import java.time.Clock
@@ -217,7 +217,7 @@ public class SimHost(
val originalCpu = machine.model.cpus[0]
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(SimMemoryUnit("Generic", "Generic", 3200.0, memorySize))
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
return SimMachineModel(processingUnits, memoryUnits)
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 6929b06c..e311cd21 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -40,9 +40,9 @@ import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.service.driver.HostEvent
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.time.Clock
@@ -62,11 +62,11 @@ internal class SimHostTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
@@ -136,9 +136,9 @@ internal class SimHostTest {
assertAll(
{ assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { assertEquals(4273200, requestedWork, "Requested work does not match") },
- { assertEquals(3133200, grantedWork, "Granted work does not match") },
- { assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") },
+ { assertEquals(4197600, requestedWork, "Requested work does not match") },
+ { assertEquals(2157600, grantedWork, "Granted work does not match") },
+ { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
{ assertEquals(1200006, scope.currentTime) }
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 59ce895f..a812490a 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -143,10 +143,10 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1707132711051, monitor.totalRequestedBurst) },
- { assertEquals(457881474296, monitor.totalGrantedBurst) },
- { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
- { assertEquals(0, monitor.totalInterferedBurst) }
+ { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -189,9 +189,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(702636229989, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(172807361391, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(528959213229, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 85a2e413..3da8d0b3 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.InputStream
import java.util.*
@@ -61,12 +61,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
val cores = machine.cpus.flatMap { id ->
when (id) {
1 -> {
- val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) }
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
}
2 -> {
- val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) }
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
}
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
@@ -75,7 +75,7 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
UUID(0L, counter++.toLong()),
"node-$counter",
emptyMap(),
- SimMachineModel(cores, listOf(SimMemoryUnit("", "", 2300.0, 16000))),
+ SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
ConstantPowerModel(0.0)
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 094bc975..9a06a40f 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -26,9 +26,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
@@ -88,8 +88,8 @@ public class Sc20ClusterEnvironmentReader(
memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
coresPerHost = values[coresPerHostCol].trim().toInt()
- val unknownProcessingNode = SimProcessingNode("unknown", "unknown", "unknown", coresPerHost)
- val unknownMemoryUnit = SimMemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
+ val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
repeat(numberOfHosts) {
nodes.add(
@@ -99,7 +99,7 @@ public class Sc20ClusterEnvironmentReader(
mapOf("cluster" to clusterId),
SimMachineModel(
List(coresPerHost) { coreId ->
- SimProcessingUnit(unknownProcessingNode, coreId, speed)
+ ProcessingUnit(unknownProcessingNode, coreId, speed)
},
listOf(unknownMemoryUnit)
),
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index 87a49f49..effd0286 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.InputStream
import java.util.*
@@ -60,19 +60,19 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
val cores = machine.cpus.flatMap { id ->
when (id) {
1 -> {
- val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) }
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
}
2 -> {
- val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) }
+ val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
}
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
val memories = machine.memories.map { id ->
when (id) {
- 1 -> SimMemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
+ 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
index 0ff40a28..e7e99a3d 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -34,9 +34,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import java.util.*
/**
@@ -56,13 +56,13 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
val cores = cpu.getInteger("numberOfCores")
val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
// TODO Remove hardcoding of vendor
- val node = SimProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
+ val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
List(cores) { coreId ->
- SimProcessingUnit(node, coreId, speed)
+ ProcessingUnit(node, coreId, speed)
}
}
val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
- SimMemoryUnit(
+ MemoryUnit(
"Samsung",
memory.getString("name"),
memory.get("speedMbPerS", Number::class.java).toDouble(),
diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
index a80365de..f68e206a 100644
--- a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
+++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
@@ -35,9 +35,9 @@ import org.opendc.serverless.service.ServerlessService
import org.opendc.serverless.service.router.RandomRoutingPolicy
import org.opendc.serverless.simulator.workload.SimServerlessWorkload
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
@@ -52,11 +52,11 @@ internal class SimServerlessServiceTest {
@BeforeEach
fun setUp() {
- val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) },
- memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index a99b082a..81d09f12 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -27,12 +27,11 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
import java.time.Clock
-import kotlin.coroutines.CoroutineContext
/**
* Abstract implementation of the [SimHypervisor] interface.
@@ -46,7 +45,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
/**
* The resource switch to use.
*/
- private lateinit var switch: SimResourceSwitch<SimProcessingUnit>
+ private lateinit var switch: SimResourceSwitch
/**
* The virtual machines running on this hypervisor.
@@ -58,12 +57,12 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
/**
* Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs.
*/
- public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit>
+ public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch
/**
* Check whether the specified machine model fits on this hypervisor.
*/
- public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean
+ public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean
override fun canFit(model: SimMachineModel): Boolean {
return canFit(model, switch)
@@ -102,7 +101,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
/**
* The vCPUs of the machine.
*/
- private val cpus: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>> = model.cpus.associateWith { switch.addOutput(it) }
+ private val cpus: Map<ProcessingUnit, SimResourceProvider> = model.cpus.associateWith { switch.addOutput(it.frequency) }
/**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
@@ -112,19 +111,19 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
require(!isTerminated) { "Machine is terminated" }
val ctx = object : SimMachineContext {
- override val cpus: List<SimProcessingUnit>
+ override val cpus: List<ProcessingUnit>
get() = model.cpus
- override val memory: List<SimMemoryUnit>
+ override val memory: List<MemoryUnit>
get() = model.memory
override val clock: Clock
get() = this@SimAbstractHypervisor.context.clock
- override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext)
+ override val meta: Map<String, Any> = meta
- override fun interrupt(resource: SimResource) {
- requireNotNull(this@VirtualMachine.cpus[resource]).interrupt()
+ override fun interrupt(cpu: ProcessingUnit) {
+ requireNotNull(this@VirtualMachine.cpus[cpu]).interrupt()
}
}
@@ -156,8 +155,8 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
switch = createSwitch(ctx)
}
- override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- val forwarder = SimResourceForwarder(cpu)
+ override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
+ val forwarder = SimResourceForwarder()
switch.addInput(forwarder)
return forwarder
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 39ae34fe..1c0f94fd 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -27,12 +27,12 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResource
import org.opendc.simulator.resources.SimResourceProvider
import org.opendc.simulator.resources.SimResourceSource
+import org.opendc.simulator.resources.consume
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -64,24 +64,24 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
/**
* The resources allocated for this machine.
*/
- protected abstract val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>>
+ protected abstract val resources: Map<ProcessingUnit, SimResourceSource>
/**
* The execution context in which the workload runs.
*/
private inner class Context(
- val sources: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>>,
+ val sources: Map<ProcessingUnit, SimResourceProvider>,
override val meta: Map<String, Any>
) : SimMachineContext {
override val clock: Clock
get() = this@SimAbstractMachine.clock
- override val cpus: List<SimProcessingUnit> = model.cpus
+ override val cpus: List<ProcessingUnit> = model.cpus
- override val memory: List<SimMemoryUnit> = model.memory
+ override val memory: List<MemoryUnit> = model.memory
- override fun interrupt(resource: SimResource) {
- checkNotNull(sources[resource]) { "Invalid resource" }.interrupt()
+ override fun interrupt(cpu: ProcessingUnit) {
+ checkNotNull(sources[cpu]) { "Invalid resource" }.interrupt()
}
}
@@ -91,7 +91,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) {
val resources = resources
require(!isTerminated) { "Machine is terminated" }
- val ctx = Context(resources, meta + mapOf("coroutine-context" to context))
+ val ctx = Context(resources, meta)
val totalCapacity = model.cpus.sumByDouble { it.frequency }
_speed = MutableList(model.cpus.size) { 0.0 }
@@ -102,7 +102,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
val consumer = workload.getConsumer(ctx, cpu)
val job = source.speed
.onEach {
- _speed[cpu.id] = source.speed.value
+ _speed[cpu.id] = it
_usage.value = _speed.sum() / totalCapacity
}
.launchIn(this)
@@ -116,9 +116,8 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
override fun close() {
if (!isTerminated) {
- resources.forEach { (_, provider) -> provider.close() }
- } else {
isTerminated = true
+ resources.forEach { (_, provider) -> provider.close() }
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 79982ea8..19479719 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.*
import org.opendc.utils.TimerScheduler
import java.time.Clock
@@ -57,8 +57,8 @@ public class SimBareMetalMachine(
*/
private val scheduler = TimerScheduler<Any>(this.context, clock)
- override val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> =
- model.cpus.associateWith { SimResourceSource(it, clock, scheduler) }
+ override val resources: Map<ProcessingUnit, SimResourceSource> =
+ model.cpus.associateWith { SimResourceSource(it.frequency, clock, scheduler) }
override fun close() {
super.close()
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index bb97192d..fa677de9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -22,10 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
-import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
@@ -35,15 +33,14 @@ import kotlin.coroutines.CoroutineContext
*/
public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
- override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean = true
+ override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
- override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
+ override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
return SimResourceSwitchMaxMin(
ctx.clock,
- ctx.meta["coroutine-context"] as CoroutineContext,
- object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> {
+ object : SimResourceSwitchMaxMin.Listener {
override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin<SimProcessingUnit>,
+ switch: SimResourceSwitchMaxMin,
requestedWork: Long,
grantedWork: Long,
overcommittedWork: Long,
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index cff70826..85404e6e 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -22,9 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.resources.SimResource
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingUnit
import java.time.Clock
/**
@@ -46,17 +45,17 @@ public interface SimMachineContext {
/**
* The CPUs available on the machine.
*/
- public val cpus: List<SimProcessingUnit>
+ public val cpus: List<ProcessingUnit>
/**
* The memory available on the machine
*/
- public val memory: List<SimMemoryUnit>
+ public val memory: List<MemoryUnit>
/**
- * Interrupt the specified [resource].
+ * Interrupt the specified [cpu].
*
* @throws IllegalArgumentException if the resource does not belong to this execution context.
*/
- public fun interrupt(resource: SimResource)
+ public fun interrupt(cpu: ProcessingUnit)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
index d6bf0e99..2b414540 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingUnit
/**
* A description of the physical or virtual machine on which a bootable image runs.
@@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.SimProcessingUnit
* @property cpus The list of processing units available to the image.
* @property memory The list of memory units available to the image.
*/
-public data class SimMachineModel(public val cpus: List<SimProcessingUnit>, public val memory: List<SimMemoryUnit>)
+public data class SimMachineModel(public val cpus: List<ProcessingUnit>, public val memory: List<MemoryUnit>)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 2001a230..fd8e546f 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -22,19 +22,17 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.resources.*
-import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
public class SimSpaceSharedHypervisor : SimAbstractHypervisor() {
- override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean {
+ override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean {
return switch.inputs.size - switch.outputs.size >= model.cpus.size
}
- override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
- return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext)
+ override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
+ return SimResourceSwitchExclusive()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt
index 49745868..bcbde5b1 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.compute.model
-import org.opendc.simulator.resources.SimResource
-
/**
* A memory unit of a compute resource, either virtual or physical.
*
@@ -32,12 +30,9 @@ import org.opendc.simulator.resources.SimResource
* @property speed The access speed of the memory in MHz.
* @property size The size of the memory unit in MBs.
*/
-public data class SimMemoryUnit(
+public data class MemoryUnit(
public val vendor: String,
public val modelName: String,
public val speed: Double,
public val size: Long
-) : SimResource {
- override val capacity: Double
- get() = speed
-}
+)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt
index 4022ecb3..58ed816c 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt
@@ -30,7 +30,7 @@ package org.opendc.simulator.compute.model
* @property arch The micro-architecture of the processor node.
* @property coreCount The number of logical CPUs in the processor node.
*/
-public data class SimProcessingNode(
+public data class ProcessingNode(
public val vendor: String,
public val arch: String,
public val modelName: String,
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt
index 1c989254..415e95e6 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.compute.model
-import org.opendc.simulator.resources.SimResource
-
/**
* A single logical compute unit of processor node, either virtual or physical.
*
@@ -31,11 +29,8 @@ import org.opendc.simulator.resources.SimResource
* @property id The identifier of the CPU core within the processing node.
* @property frequency The clock rate of the CPU in MHz.
*/
-public data class SimProcessingUnit(
- public val node: SimProcessingNode,
+public data class ProcessingUnit(
+ public val node: ProcessingNode,
public val id: Int,
public val frequency: Double
-) : SimResource {
- override val capacity: Double
- get() = frequency
-}
+)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 9b47821e..63c9d28c 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -23,10 +23,9 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
@@ -46,30 +45,8 @@ public class SimFlopsWorkload(
override fun onStart(ctx: SimMachineContext) {}
- override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return CpuConsumer(ctx)
- }
-
- private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- val limit = ctx.resource.frequency * utilization
- val work = flops.toDouble() / machine.cpus.size
-
- return if (work > 0.0) {
- SimResourceCommand.Consume(work, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.resource.frequency * utilization
- return SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
+ override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
+ return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)
}
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 313b6ed5..a3420e32 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -23,10 +23,9 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* A [SimWorkload] that models application execution as a single duration.
@@ -45,25 +44,9 @@ public class SimRuntimeWorkload(
override fun onStart(ctx: SimMachineContext) {}
- override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return CpuConsumer()
- }
-
- private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- val limit = ctx.resource.frequency * utilization
- val work = (limit / 1000) * duration
- return SimResourceCommand.Consume(work, limit)
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.resource.frequency * utilization
- SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
- }
- }
+ override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
+ val limit = cpu.frequency * utilization
+ return SimWorkConsumer((limit / 1000) * duration, utilization)
}
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 31f58a0f..2442d748 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -45,35 +45,29 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
offset = ctx.clock.millis()
}
- override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return CpuConsumer()
- }
-
- private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- return onNext(ctx, 0.0)
- }
+ override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
+ return object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val now = ctx.clock.millis()
+ val fragment = fragment ?: return SimResourceCommand.Exit
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = offset + fragment.duration
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val work = (fragment.duration / 1000) * fragment.usage
- val deadline = offset + fragment.duration
+ assert(deadline >= now) { "Deadline already passed" }
- assert(deadline >= now) { "Deadline already passed" }
+ val cmd =
+ if (cpu.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
- val cmd =
- if (ctx.resource.id < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, fragment.usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ if (barrier.enter()) {
+ this@SimTraceWorkload.fragment = nextFragment()
+ this@SimTraceWorkload.offset += fragment.duration
+ }
- if (barrier.enter()) {
- this@SimTraceWorkload.fragment = nextFragment()
- this@SimTraceWorkload.offset += fragment.duration
+ return cmd
}
-
- return cmd
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index 60661e23..bdc12bb5 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.SimResourceConsumer
/**
@@ -41,5 +41,5 @@ public interface SimWorkload {
/**
* Obtain the resource consumer for the specified processing unit.
*/
- public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit>
+ public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index 4ac8cf63..5773b325 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -24,6 +24,7 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.yield
@@ -31,9 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
@@ -46,10 +47,10 @@ internal class SimHypervisorTest {
@BeforeEach
fun setUp() {
- val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
model = SimMachineModel(
- cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
@@ -98,15 +99,21 @@ internal class SimHypervisorTest {
println("Hypervisor finished")
}
yield()
- hypervisor.createMachine(model).run(workloadA)
+ val vm = hypervisor.createMachine(model)
+ val res = mutableListOf<Double>()
+ val job = launch { machine.usage.toList(res) }
+
+ vm.run(workloadA)
yield()
+ job.cancel()
machine.close()
assertAll(
{ assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
{ assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
{ assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
- { assertEquals(1200000, currentTime) }
+ { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), res) { "VM usage is correct" } },
+ { assertEquals(1200000, currentTime) { "Current time is correct" } }
)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 6adc41d0..071bdf77 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -28,9 +28,9 @@ import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
@@ -43,11 +43,11 @@ class SimMachineTest {
@BeforeEach
fun setUp() {
- val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) },
- memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
@@ -76,7 +76,7 @@ class SimMachineTest {
try {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
-
+ yield()
job.cancel()
assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" }
} finally {
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
index 8428a0a7..fb0523af 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -31,9 +31,9 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.model.SimMemoryUnit
-import org.opendc.simulator.compute.model.SimProcessingNode
-import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
@@ -48,10 +48,10 @@ internal class SimSpaceSharedHypervisorTest {
@BeforeEach
fun setUp() {
- val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts
index 831ca3db..3b0a197c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts
+++ b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts
@@ -26,6 +26,7 @@ plugins {
`kotlin-library-conventions`
`testing-conventions`
`jacoco-conventions`
+ `benchmark-conventions`
}
dependencies {
@@ -33,5 +34,6 @@ dependencies {
api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation(project(":opendc-utils"))
+ jmhImplementation(project(":opendc-simulator:opendc-simulator-core"))
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
new file mode 100644
index 00000000..8d2587b1
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+
+/**
+ * Helper function to create simple consumer workload.
+ */
+fun createSimpleConsumer(): SimResourceConsumer {
+ return SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(1000, 28.0),
+ SimTraceConsumer.Fragment(1000, 3500.0),
+ SimTraceConsumer.Fragment(1000, 0.0),
+ SimTraceConsumer.Fragment(1000, 183.0),
+ SimTraceConsumer.Fragment(1000, 400.0),
+ SimTraceConsumer.Fragment(1000, 100.0),
+ SimTraceConsumer.Fragment(1000, 3000.0),
+ SimTraceConsumer.Fragment(1000, 4500.0),
+ ),
+ )
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
new file mode 100644
index 00000000..f2eea97c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+import org.openjdk.jmh.annotations.*
+import java.time.Clock
+import java.util.concurrent.TimeUnit
+
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceBenchmarks {
+ private lateinit var scope: TestCoroutineScope
+ private lateinit var clock: Clock
+ private lateinit var scheduler: TimerScheduler<Any>
+
+ @Setup
+ fun setUp() {
+ scope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(scope)
+ scheduler = TimerScheduler(scope.coroutineContext, clock)
+ }
+
+ @State(Scope.Thread)
+ class Workload {
+ lateinit var consumers: Array<SimResourceConsumer>
+
+ @Setup
+ fun setUp() {
+ consumers = Array(3) { createSimpleConsumer() }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSource(state: Workload) {
+ return scope.runBlockingTest {
+ val provider = SimResourceSource(4200.0, clock, scheduler)
+ return@runBlockingTest provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkForwardOverhead(state: Workload) {
+ return scope.runBlockingTest {
+ val provider = SimResourceSource(4200.0, clock, scheduler)
+ val forwarder = SimResourceForwarder()
+ provider.startConsumer(forwarder)
+ return@runBlockingTest forwarder.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
+ return scope.runBlockingTest {
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ val provider = switch.addOutput(3500.0)
+ return@runBlockingTest provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
+ return scope.runBlockingTest {
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ repeat(3) { i ->
+ launch {
+ val provider = switch.addOutput(3500.0)
+ provider.consume(state.consumers[i])
+ }
+ }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) {
+ return scope.runBlockingTest {
+ val switch = SimResourceSwitchExclusive()
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ val provider = switch.addOutput(3500.0)
+ return@runBlockingTest provider.consume(state.consumers[0])
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) {
+ return scope.runBlockingTest {
+ val switch = SimResourceSwitchExclusive()
+
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+
+ repeat(2) { i ->
+ launch {
+ val provider = switch.addOutput(3500.0)
+ provider.consume(state.consumers[i])
+ }
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
new file mode 100644
index 00000000..e5991264
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * Abstract implementation of [SimResourceAggregator].
+ */
+public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
+ /**
+ * The available resource provider contexts.
+ */
+ protected val inputContexts: Set<SimResourceContext>
+ get() = _inputContexts
+ private val _inputContexts = mutableSetOf<SimResourceContext>()
+
+ /**
+ * The output context.
+ */
+ protected val outputContext: SimResourceContext
+ get() = context
+
+ /**
+ * The commands to submit to the underlying input resources.
+ */
+ protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf()
+
+ /**
+ * This method is invoked when the resource consumer consumes resources.
+ */
+ protected abstract fun doConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer enters an idle state.
+ */
+ protected open fun doIdle(deadline: Long) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ /**
+ * This method is invoked when the resource consumer finishes processing.
+ */
+ protected open fun doFinish(cause: Throwable?) {
+ for (input in inputContexts) {
+ commands[input] = SimResourceCommand.Exit
+ }
+ }
+
+ /**
+ * This method is invoked when an input context is started.
+ */
+ protected open fun onContextStarted(ctx: SimResourceContext) {
+ _inputContexts.add(ctx)
+ }
+
+ protected open fun onContextFinished(ctx: SimResourceContext) {
+ assert(_inputContexts.remove(ctx)) { "Lost context" }
+ }
+
+ override fun addInput(input: SimResourceProvider) {
+ check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+
+ val consumer = Consumer()
+ _inputs.add(input)
+ input.startConsumer(consumer)
+ }
+
+ override fun close() {
+ output.close()
+ }
+
+ override val output: SimResourceProvider
+ get() = _output
+ private val _output = SimResourceForwarder()
+
+ override val inputs: Set<SimResourceProvider>
+ get() = _inputs
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+
+ private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
+ override val remainingWork: Double
+ get() = inputContexts.sumByDouble { it.remainingWork }
+
+ override fun interrupt() {
+ super.interrupt()
+
+ interruptAll()
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
+
+ override fun onIdle(deadline: Long) = doIdle(deadline)
+
+ override fun onFinish(cause: Throwable?) {
+ doFinish(cause)
+
+ super.onFinish(cause)
+
+ interruptAll()
+ }
+ }
+
+ /**
+ * A flag to indicate that an interrupt is active.
+ */
+ private var isInterrupting: Boolean = false
+
+ /**
+ * Schedule the work over the input resources.
+ */
+ private fun doSchedule() {
+ context.flush(isIntermediate = true)
+ interruptAll()
+ }
+
+ /**
+ * Interrupt all inputs.
+ */
+ private fun interruptAll() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isInterrupting) {
+ return
+ }
+
+ try {
+ isInterrupting = true
+
+ val iterator = _inputs.iterator()
+ while (iterator.hasNext()) {
+ val input = iterator.next()
+ input.interrupt()
+
+ if (input.state != SimResourceState.Active) {
+ iterator.remove()
+ }
+ }
+ } finally {
+ isInterrupting = false
+ }
+ }
+
+ /**
+ * An internal [SimResourceConsumer] implementation for aggregator inputs.
+ */
+ private inner class Consumer : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
+ onContextStarted(ctx)
+ onCapacityChanged(ctx, false)
+
+ // Make sure we initialize the output if we have not done so yet
+ if (context.state == SimResourceState.Pending) {
+ context.start()
+ }
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ doSchedule()
+
+ return commands[ctx] ?: SimResourceCommand.Idle()
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ // Adjust capacity of output resource
+ context.capacity = inputContexts.sumByDouble { it.capacity }
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ onContextFinished(ctx)
+
+ super.onFinish(ctx, cause)
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 52251bff..9705bd17 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -23,64 +23,79 @@
package org.opendc.simulator.resources
import java.time.Clock
-import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
/**
* Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
*/
-public abstract class SimAbstractResourceContext<R : SimResource>(
- override val resource: R,
+public abstract class SimAbstractResourceContext(
+ initialCapacity: Double,
override val clock: Clock,
- private val consumer: SimResourceConsumer<R>
-) : SimResourceContext<R> {
+ private val consumer: SimResourceConsumer
+) : SimResourceContext {
/**
- * This method is invoked when the resource will idle until the specified [deadline].
+ * The capacity of the resource.
*/
- public abstract fun onIdle(deadline: Long)
+ public final override var capacity: Double = initialCapacity
+ set(value) {
+ val oldValue = field
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ field = value
+ onCapacityChange()
+ }
+ }
/**
- * This method is invoked when the resource will be consumed until the specified [work] was processed or the
- * [deadline] was reached.
+ * The amount of work still remaining at this instant.
*/
- public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
+ override val remainingWork: Double
+ get() {
+ val activeCommand = activeCommand ?: return 0.0
+ return computeRemainingWork(activeCommand, clock.millis())
+ }
/**
- * This method is invoked when the resource consumer has finished.
+ * A flag to indicate the state of the context.
*/
- public abstract fun onFinish()
+ public var state: SimResourceState = SimResourceState.Pending
+ private set
/**
- * This method is invoked when the resource consumer throws an exception.
+ * The current processing speed of the resource.
*/
- public abstract fun onFailure(cause: Throwable)
+ public var speed: Double = 0.0
+ private set
/**
- * Compute the duration that a resource consumption will take with the specified [speed].
+ * This method is invoked when the resource will idle until the specified [deadline].
*/
- protected open fun getDuration(work: Double, speed: Double): Long {
- return ceil(work / speed * 1000).toLong()
- }
+ public abstract fun onIdle(deadline: Long)
/**
- * Compute the speed at which the resource may be consumed.
+ * This method is invoked when the resource will be consumed until the specified [work] was processed or the
+ * [deadline] was reached.
*/
- protected open fun getSpeed(limit: Double): Double {
- return min(limit, resource.capacity)
+ public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public open fun onFinish(cause: Throwable?) {
+ consumer.onFinish(this, cause)
}
/**
- * Get the remaining work to process after a resource consumption was flushed.
+ * Get the remaining work to process after a resource consumption.
*
* @param work The size of the resource consumption.
* @param speed The speed of consumption.
* @param duration The duration from the start of the consumption until now.
- * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to
- * it being interrupted before it could finish or reach its deadline.
* @return The amount of work remaining.
*/
- protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
return if (duration > 0L) {
val processed = duration / 1000.0 * speed
max(0.0, work - processed)
@@ -93,13 +108,19 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* Start the consumer.
*/
public fun start() {
- try {
- isProcessing = true
- latestFlush = clock.millis()
+ check(state == SimResourceState.Pending) { "Consumer is already started" }
- interpret(consumer.onStart(this))
- } catch (e: Throwable) {
- onFailure(e)
+ val now = clock.millis()
+
+ state = SimResourceState.Active
+ isProcessing = true
+ latestFlush = now
+
+ try {
+ consumer.onStart(this)
+ activeCommand = interpret(consumer.onNext(this), now)
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
isProcessing = false
}
@@ -114,9 +135,9 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
latestFlush = clock.millis()
flush(isIntermediate = true)
- onFinish()
- } catch (e: Throwable) {
- onFailure(e)
+ doStop(null)
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
isProcessing = false
}
@@ -129,7 +150,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
* flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
* will be asked to deliver a new command and is essentially interrupted.
*/
- public open fun flush(isIntermediate: Boolean = false) {
+ public fun flush(isIntermediate: Boolean = false) {
+ // Flush is no-op when the consumer is finished or not yet started
+ if (state != SimResourceState.Active) {
+ return
+ }
+
val now = clock.millis()
// Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
@@ -141,44 +167,42 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
val activeCommand = activeCommand ?: return
val (timestamp, command) = activeCommand
+ // Note: accessor is reliant on activeCommand being set
+ val remainingWork = remainingWork
+
isProcessing = true
- this.activeCommand = null
val duration = now - timestamp
assert(duration >= 0) { "Flush in the past" }
- when (command) {
+ this.activeCommand = when (command) {
is SimResourceCommand.Idle -> {
// We should only continue processing the next command if:
// 1. The resource consumer reached its deadline.
// 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
if (command.deadline <= now || !isIntermediate) {
- next(remainingWork = 0.0)
+ next(now)
} else {
- this.activeCommand = activeCommand
+ interpret(SimResourceCommand.Idle(command.deadline), now)
}
}
is SimResourceCommand.Consume -> {
- val speed = min(resource.capacity, command.limit)
- val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed)
- val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted)
-
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
- // 2. The resource consumer reached its deadline.
- // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ // 2. The resource capacity cannot satisfy the demand.
+ // 4. The resource consumer should be interrupted (e.g., someone called .interrupt())
if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
- next(remainingWork)
+ next(now)
} else {
- interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
+ interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now)
}
}
SimResourceCommand.Exit ->
// Flush may not be called when the resource consumer has finished
throw IllegalStateException()
}
- } catch (e: Throwable) {
- onFailure(e)
+ } catch (cause: Throwable) {
+ doStop(cause)
} finally {
latestFlush = now
isProcessing = false
@@ -195,7 +219,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
flush()
}
- override fun toString(): String = "SimAbstractResourceContext[resource=$resource]"
+ override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
/**
* A flag to indicate that the resource is currently processing a command.
@@ -213,17 +237,30 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
private var latestFlush: Long = Long.MIN_VALUE
/**
- * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ * Finish the consumer and resource provider.
*/
- private fun interpret(command: SimResourceCommand) {
- val now = clock.millis()
+ private fun doStop(cause: Throwable?) {
+ val state = state
+ this.state = SimResourceState.Stopped
+
+ if (state == SimResourceState.Active) {
+ activeCommand = null
+ onFinish(cause)
+ }
+ }
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? {
when (command) {
is SimResourceCommand.Idle -> {
val deadline = command.deadline
require(deadline >= now) { "Deadline already passed" }
+ speed = 0.0
+
onIdle(deadline)
}
is SimResourceCommand.Consume -> {
@@ -233,22 +270,57 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
require(deadline >= now) { "Deadline already passed" }
+ speed = min(capacity, limit)
+
onConsume(work, limit, deadline)
}
is SimResourceCommand.Exit -> {
- onFinish()
+ speed = 0.0
+
+ doStop(null)
+
+ // No need to set the next active command
+ return null
}
}
- assert(activeCommand == null) { "Concurrent access to current command" }
- activeCommand = CommandWrapper(now, command)
+ return CommandWrapper(now, command)
}
/**
* Request the workload for more work.
*/
- private fun next(remainingWork: Double) {
- interpret(consumer.onNext(this, remainingWork))
+ private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now)
+
+ /**
+ * Compute the remaining work based on the specified [wrapper] and [timestamp][now].
+ */
+ private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double {
+ val (timestamp, command) = wrapper
+ val duration = now - timestamp
+ return when (command) {
+ is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration)
+ is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0
+ }
+ }
+
+ /**
+ * Indicate that the capacity of the resource has changed.
+ */
+ private fun onCapacityChange() {
+ // Do not inform the consumer if it has not been started yet
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val isThrottled = speed > capacity
+ consumer.onCapacityChanged(this, isThrottled)
+
+ // Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
+ // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
+ if (isThrottled) {
+ flush(isIntermediate = true)
+ }
}
/**
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
new file mode 100644
index 00000000..bb4e6a2c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
+ */
+public interface SimResourceAggregator : AutoCloseable {
+ /**
+ * The output resource provider to which resource consumers can be attached.
+ */
+ public val output: SimResourceProvider
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider>
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider)
+
+ /**
+ * End the lifecycle of the aggregator.
+ */
+ public override fun close()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
new file mode 100644
index 00000000..08bc064e
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A [SimResourceAggregator] that distributes the load equally across the input resources.
+ */
+public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
+ private val consumers = mutableListOf<SimResourceContext>()
+
+ override fun doConsume(work: Double, limit: Double, deadline: Long) {
+ // Sort all consumers by their capacity
+ consumers.sortWith(compareBy { it.capacity })
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (input in consumers) {
+ val inputCapacity = input.capacity
+ val fraction = inputCapacity / outputContext.capacity
+ val grantedSpeed = limit * fraction
+ val grantedWork = fraction * work
+
+ commands[input] =
+ if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+ }
+
+ override fun onContextStarted(ctx: SimResourceContext) {
+ super.onContextStarted(ctx)
+
+ consumers.add(ctx)
+ }
+
+ override fun onContextFinished(ctx: SimResourceContext) {
+ super.onContextFinished(ctx)
+
+ consumers.remove(ctx)
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
index 77c0a7a9..f7f3fa4d 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -23,13 +23,13 @@
package org.opendc.simulator.resources
/**
- * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer].
+ * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer].
*/
public sealed class SimResourceCommand {
/**
* A request to the resource to perform the specified amount of work before the given [deadline].
*
- * @param work The amount of work to process on the CPU.
+ * @param work The amount of work to process.
* @param limit The maximum amount of work to be processed per second.
* @param deadline The instant at which the work needs to be fulfilled.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index f516faa6..672a3e9d 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -23,23 +23,49 @@
package org.opendc.simulator.resources
/**
- * A SimResourceConsumer characterizes how a [SimResource] is consumed.
+ * A [SimResourceConsumer] characterizes how a [SimResource] is consumed.
+ *
+ * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
+ * for multiple resource providers, unless explicitly said otherwise.
*/
-public interface SimResourceConsumer<in R : SimResource> {
+public interface SimResourceConsumer {
+ /**
+ * This method is invoked when the consumer is started for some resource.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ */
+ public fun onStart(ctx: SimResourceContext) {}
+
/**
- * This method is invoked when the consumer is started for a resource.
+ * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
+ * the resource finished processing, reached its deadline or was interrupted.
*
* @param ctx The execution context in which the consumer runs.
- * @return The next command that the resource should perform.
+ * @return The next command that the resource should execute.
*/
- public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand
+ public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
- * This method is invoked when a resource was either interrupted or reached its deadline.
+ * This is method is invoked when the capacity of the resource changes.
+ *
+ * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
+ * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly
+ * causing the active resource command to finish at a later moment than initially planned.
*
* @param ctx The execution context in which the consumer runs.
- * @param remainingWork The remaining work that was not yet completed.
- * @return The next command that the resource should perform.
+ * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the
+ * capacity change.
+ */
+ public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {}
+
+ /**
+ * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
+ * the resource finished itself, or a failure occurred at the resource.
+ *
+ * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider.
+ *
+ * @param ctx The execution context in which the consumer ran.
+ * @param cause The cause of the finish in case the resource finished exceptionally.
*/
- public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand
+ public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index dfb5e9ce..11dbb09f 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -28,16 +28,21 @@ import java.time.Clock
* The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
* resource and a resource consumer.
*/
-public interface SimResourceContext<out R : SimResource> {
+public interface SimResourceContext {
/**
- * The resource that is managed by this context.
+ * The virtual clock tracking simulation time.
*/
- public val resource: R
+ public val clock: Clock
/**
- * The virtual clock tracking simulation time.
+ * The resource capacity available at this instant.
*/
- public val clock: Clock
+ public val capacity: Double
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ public val remainingWork: Double
/**
* Ask the resource provider to interrupt its resource.
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
new file mode 100644
index 00000000..b2759b7f
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
+ */
+public interface SimResourceDistributor : AutoCloseable {
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
+ public val outputs: Set<SimResourceProvider>
+
+ /**
+ * The input resource that will be distributed over the consumers.
+ */
+ public val input: SimResourceProvider
+
+ /**
+ * Add an output to the switch with the specified [capacity].
+ */
+ public fun addOutput(capacity: Double): SimResourceProvider
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
new file mode 100644
index 00000000..9df333e3
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -0,0 +1,420 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
+ */
+public class SimResourceDistributorMaxMin(
+ override val input: SimResourceProvider,
+ private val clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceDistributor {
+ override val outputs: Set<SimResourceProvider>
+ get() = _outputs
+ private val _outputs = mutableSetOf<OutputProvider>()
+
+ /**
+ * The active output contexts.
+ */
+ private val outputContexts: MutableList<OutputContext> = mutableListOf()
+
+ /**
+ * The total speed requested by the output resources.
+ */
+ private var totalRequestedSpeed = 0.0
+
+ /**
+ * The total amount of work requested by the output resources.
+ */
+ private var totalRequestedWork = 0.0
+
+ /**
+ * The total allocated speed for the output resources.
+ */
+ private var totalAllocatedSpeed = 0.0
+
+ /**
+ * The total allocated work requested for the output resources.
+ */
+ private var totalAllocatedWork = 0.0
+
+ /**
+ * The amount of work that could not be performed due to over-committing resources.
+ */
+ private var totalOvercommittedWork = 0.0
+
+ /**
+ * The amount of work that was lost due to interference.
+ */
+ private var totalInterferedWork = 0.0
+
+ /**
+ * A flag to indicate that the switch is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * An internal [SimResourceConsumer] implementation for switch inputs.
+ */
+ private val consumer = object : SimResourceConsumer {
+ /**
+ * The resource context of the consumer.
+ */
+ private lateinit var ctx: SimResourceContext
+
+ val remainingWork: Double
+ get() = ctx.remainingWork
+
+ override fun onStart(ctx: SimResourceContext) {
+ this.ctx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
+ }
+
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
+
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call tou output.close()
+ iterator.remove()
+
+ output.close()
+ }
+ }
+ }
+
+ /**
+ * The total amount of remaining work.
+ */
+ private val totalRemainingWork: Double
+ get() = consumer.remainingWork
+
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Distributor has been closed" }
+
+ val provider = OutputProvider(capacity)
+ _outputs.add(provider)
+ return provider
+ }
+
+ override fun close() {
+ if (!isClosed) {
+ isClosed = true
+ input.cancel()
+ }
+ }
+
+ init {
+ input.startConsumer(consumer)
+ }
+
+ /**
+ * Indicate that the workloads should be re-scheduled.
+ */
+ private fun schedule() {
+ input.interrupt()
+ }
+
+ /**
+ * Schedule the work over the physical CPUs.
+ */
+ private fun doSchedule(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark all inputs as idle.
+ if (outputContexts.isEmpty()) {
+ return SimResourceCommand.Idle()
+ }
+
+ val maxUsage = capacity
+ var duration: Double = Double.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ var availableSpeed = maxUsage
+ var totalRequestedSpeed = 0.0
+ var totalRequestedWork = 0.0
+
+ // Flush the work of the outputs
+ var outputIterator = outputContexts.listIterator()
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+
+ output.flush(isIntermediate = true)
+
+ if (output.activeCommand == SimResourceCommand.Exit) {
+ // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ outputIterator.remove()
+ }
+ }
+
+ // Sort the outputs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ outputContexts.sort()
+
+ // Divide the available input capacity fairly across the outputs using max-min fair sharing
+ outputIterator = outputContexts.listIterator()
+ var remaining = outputContexts.size
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+ val availableShare = availableSpeed / remaining--
+
+ when (val command = output.activeCommand) {
+ is SimResourceCommand.Idle -> {
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ output.actualSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ val grantedSpeed = min(output.allowedSpeed, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || command.work <= 0.0) {
+ output.actualSpeed = 0.0
+ continue
+ }
+
+ totalRequestedSpeed += command.limit
+ totalRequestedWork += command.work
+
+ output.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
+
+ // The duration that we want to run is that of the shortest request from an output
+ duration = min(duration, command.work / grantedSpeed)
+ }
+ SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
+ }
+ }
+
+ assert(deadline >= clock.millis()) { "Deadline already passed" }
+
+ this.totalRequestedSpeed = totalRequestedSpeed
+ this.totalRequestedWork = totalRequestedWork
+ this.totalAllocatedSpeed = maxUsage - availableSpeed
+ this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration)
+
+ return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
+ SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ }
+
+ /**
+ * Obtain the next command to perform.
+ */
+ private fun doNext(capacity: Double): SimResourceCommand {
+ val totalRequestedWork = totalRequestedWork.toLong()
+ val totalRemainingWork = totalRemainingWork.toLong()
+ val totalAllocatedWork = totalAllocatedWork.toLong()
+ val totalRequestedSpeed = totalRequestedSpeed
+ val totalAllocatedSpeed = totalAllocatedSpeed
+
+ // Force all inputs to re-schedule their work.
+ val command = doSchedule(capacity)
+
+ // Report metrics
+ listener?.onSliceFinish(
+ this,
+ totalRequestedWork,
+ totalAllocatedWork - totalRemainingWork,
+ totalOvercommittedWork.toLong(),
+ totalInterferedWork.toLong(),
+ totalRequestedSpeed,
+ totalAllocatedSpeed,
+ )
+
+ totalInterferedWork = 0.0
+ totalOvercommittedWork = 0.0
+
+ return command
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+
+ /**
+ * An internal [SimResourceProvider] implementation for switch outputs.
+ */
+ private inner class OutputProvider(val capacity: Double) : SimResourceProvider {
+ /**
+ * The [OutputContext] that is currently running.
+ */
+ private var ctx: OutputContext? = null
+
+ override var state: SimResourceState = SimResourceState.Pending
+ internal set
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
+
+ val ctx = OutputContext(this, consumer)
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+ outputContexts += ctx
+
+ ctx.start()
+ schedule()
+ }
+
+ override fun close() {
+ cancel()
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Stopped
+ _outputs.remove(this)
+ }
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+ }
+
+ /**
+ * A [SimAbstractResourceContext] for the output resources.
+ */
+ private inner class OutputContext(
+ private val provider: OutputProvider,
+ consumer: SimResourceConsumer
+ ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ var allowedSpeed: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
+
+ private fun reportOvercommit() {
+ val remainingWork = remainingWork
+ totalOvercommittedWork += remainingWork
+ }
+
+ override fun onIdle(deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ reportOvercommit()
+
+ allowedSpeed = speed
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
+
+ override fun onFinish(cause: Throwable?) {
+ reportOvercommit()
+
+ activeCommand = SimResourceCommand.Exit
+ provider.cancel()
+
+ super.onFinish(cause)
+ }
+
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
+ // Apply performance interference model
+ val performanceScore = 1.0
+
+ // Compute the remaining amount of work
+ return if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ }
+
+ override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
index ca23557c..1a05accd 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -22,34 +22,19 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-
/**
* A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer].
*/
-public class SimResourceForwarder<R : SimResource>(override val resource: R) :
- SimResourceProvider<R>, SimResourceConsumer<R> {
+public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer {
/**
* The [SimResourceContext] in which the forwarder runs.
*/
- private var ctx: SimResourceContext<R>? = null
-
- /**
- * A flag to indicate that the forwarder is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The continuation to resume after consumption.
- */
- private var cont: Continuation<Unit>? = null
+ private var ctx: SimResourceContext? = null
/**
* The delegate [SimResourceConsumer].
*/
- private var delegate: SimResourceConsumer<R>? = null
+ private var delegate: SimResourceConsumer? = null
/**
* A flag to indicate that the delegate was started.
@@ -57,99 +42,115 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) :
private var hasDelegateStarted: Boolean = false
/**
- * The remaining amount of work last cycle.
+ * The state of the forwarder.
*/
- private var remainingWork: Double = 0.0
-
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifecycle of forwarder has ended" }
- check(cont == null) { "Run should not be called concurrently" }
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
- this.delegate = consumer
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
- cont.invokeOnCancellation { reset() }
+ state = SimResourceState.Active
+ delegate = consumer
- ctx?.interrupt()
- }
+ // Interrupt the provider to replace the consumer
+ interrupt()
}
override fun interrupt() {
ctx?.interrupt()
}
+ override fun cancel() {
+ val delegate = delegate
+ val ctx = ctx
+
+ state = SimResourceState.Pending
+
+ if (delegate != null && ctx != null) {
+ this.delegate = null
+ delegate.onFinish(ctx)
+ }
+ }
+
override fun close() {
- isClosed = true
- interrupt()
- ctx = null
+ val ctx = ctx
+
+ state = SimResourceState.Stopped
+
+ if (ctx != null) {
+ this.ctx = null
+ ctx.interrupt()
+ }
}
- override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ override fun onStart(ctx: SimResourceContext) {
this.ctx = ctx
-
- return onNext(ctx, 0.0)
}
- override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
- this.remainingWork = remainingWork
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val delegate = delegate
- return if (isClosed) {
- SimResourceCommand.Exit
- } else if (!hasDelegateStarted) {
+ if (!hasDelegateStarted) {
start()
+ }
+
+ return if (state == SimResourceState.Stopped) {
+ SimResourceCommand.Exit
+ } else if (delegate != null) {
+ val command = delegate.onNext(ctx)
+ if (command == SimResourceCommand.Exit) {
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onFinish(ctx)
+
+ if (state == SimResourceState.Stopped)
+ SimResourceCommand.Exit
+ else
+ onNext(ctx)
+ } else {
+ command
+ }
} else {
- next()
+ SimResourceCommand.Idle()
}
}
- /**
- * Start the delegate.
- */
- private fun start(): SimResourceCommand {
- val delegate = delegate ?: return SimResourceCommand.Idle()
- val command = delegate.onStart(checkNotNull(ctx))
-
- hasDelegateStarted = true
-
- return forward(command)
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ delegate?.onCapacityChanged(ctx, isThrottled)
}
- /**
- * Obtain the next command to process.
- */
- private fun next(): SimResourceCommand {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ this.ctx = null
+
val delegate = delegate
- return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle())
+ if (delegate != null) {
+ reset()
+ delegate.onFinish(ctx, cause)
+ }
}
/**
- * Forward the specified [command].
+ * Start the delegate.
*/
- private fun forward(command: SimResourceCommand): SimResourceCommand {
- return if (command == SimResourceCommand.Exit) {
- val cont = checkNotNull(cont)
-
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
- cont.resume(Unit)
+ private fun start() {
+ val delegate = delegate ?: return
+ delegate.onStart(checkNotNull(ctx))
- if (isClosed)
- SimResourceCommand.Exit
- else
- start()
- } else {
- command
- }
+ hasDelegateStarted = true
}
/**
* Reset the delegate.
*/
private fun reset() {
- cont = null
delegate = null
hasDelegateStarted = false
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index e35aa683..52b13c5c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -22,29 +22,58 @@
package org.opendc.simulator.resources
+import kotlinx.coroutines.suspendCancellableCoroutine
+
/**
* A [SimResourceProvider] provides some resource of type [R].
*/
-public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
+public interface SimResourceProvider : AutoCloseable {
/**
- * The resource that is managed by this provider.
+ * The state of the resource.
*/
- public val resource: R
+ public val state: SimResourceState
/**
- * Consume the resource provided by this provider using the specified [consumer].
+ * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
+ *
+ * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
*/
- public suspend fun consume(consumer: SimResourceConsumer<R>)
+ public fun startConsumer(consumer: SimResourceConsumer)
/**
- * Interrupt the resource.
+ * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
*/
public fun interrupt()
/**
+ * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
+ */
+ public fun cancel()
+
+ /**
* End the lifetime of the resource.
*
* This operation terminates the existing resource consumer.
*/
public override fun close()
}
+
+/**
+ * Consume the resource provided by this provider using the specified [consumer] and suspend execution until
+ * the consumer has finished.
+ */
+public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
+ return suspendCancellableCoroutine { cont ->
+ startConsumer(object : SimResourceConsumer by consumer {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ assert(!cont.isCompleted) { "Coroutine already completed" }
+
+ consumer.onFinish(ctx, cause)
+
+ cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
+ }
+
+ override fun toString(): String = "SimSuspendingResourceConsumer"
+ })
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 540a17c9..9b10edaf 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,27 +22,25 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
import java.time.Clock
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
+import kotlin.math.ceil
import kotlin.math.min
/**
* A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
*
- * @param resource The resource to provide.
+ * @param initialCapacity The initial capacity of the resource.
* @param clock The virtual clock to track simulation time.
+ * @param scheduler The scheduler to schedule the interrupts.
*/
-public class SimResourceSource<R : SimResource>(
- override val resource: R,
+public class SimResourceSource(
+ initialCapacity: Double,
private val clock: Clock,
private val scheduler: TimerScheduler<Any>
-) : SimResourceProvider<R> {
+) : SimResourceProvider {
/**
* The resource processing speed over time.
*/
@@ -51,71 +49,59 @@ public class SimResourceSource<R : SimResource>(
private val _speed = MutableStateFlow(0.0)
/**
- * A flag to indicate that the resource was closed.
+ * The capacity of the resource.
*/
- private var isClosed: Boolean = false
-
- /**
- * The current active consumer.
- */
- private var cont: CancellableContinuation<Unit>? = null
+ public var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ ctx?.capacity = value
+ }
/**
* The [Context] that is currently running.
*/
private var ctx: Context? = null
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifetime of resource has ended." }
- check(cont == null) { "Run should not be called concurrently" }
+ override var state: SimResourceState = SimResourceState.Pending
+ private set
- try {
- return suspendCancellableCoroutine { cont ->
- val ctx = Context(consumer, cont)
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ val ctx = Context(consumer)
- this.cont = cont
- this.ctx = ctx
+ this.ctx = ctx
+ this.state = SimResourceState.Active
- ctx.start()
- cont.invokeOnCancellation {
- ctx.stop()
- }
- }
- } finally {
- cont = null
- ctx = null
- }
+ ctx.start()
}
override fun close() {
- isClosed = true
- cont?.cancel()
- cont = null
- ctx = null
+ cancel()
+ state = SimResourceState.Stopped
}
override fun interrupt() {
ctx?.interrupt()
}
+ override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.stop()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(
- consumer: SimResourceConsumer<R>,
- val cont: Continuation<Unit>
- ) : SimAbstractResourceContext<R>(resource, clock, consumer) {
- /**
- * The processing speed of the resource.
- */
- private var speed: Double = 0.0
- set(value) {
- field = value
- _speed.value = field
- }
-
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
override fun onIdle(deadline: Long) {
- speed = 0.0
+ _speed.value = speed
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
@@ -124,24 +110,28 @@ public class SimResourceSource<R : SimResource>(
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
- speed = getSpeed(limit)
+ _speed.value = speed
+
val until = min(deadline, clock.millis() + getDuration(work, speed))
scheduler.startSingleTimerTo(this, until, ::flush)
}
- override fun onFinish() {
- speed = 0.0
+ override fun onFinish(cause: Throwable?) {
+ _speed.value = speed
scheduler.cancel(this)
- cont.resume(Unit)
- }
+ cancel()
- override fun onFailure(cause: Throwable) {
- speed = 0.0
- scheduler.cancel(this)
- cont.resumeWithException(cause)
+ super.onFinish(cause)
}
- override fun toString(): String = "SimResourceSource.Context[resource=$resource]"
+ override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
+ }
+
+ /**
+ * Compute the duration that a resource consumption will take with the specified [speed].
+ */
+ private fun getDuration(work: Double, speed: Double): Long {
+ return ceil(work / speed * 1000).toLong()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
index 31b0a175..c72951d0 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceState.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -23,11 +23,21 @@
package org.opendc.simulator.resources
/**
- * A generic representation of resource that may be consumed.
+ * The state of a resource provider.
*/
-public interface SimResource {
+public enum class SimResourceState {
/**
- * The capacity of the resource.
+ * The resource provider is pending and the resource is waiting to be consumed.
*/
- public val capacity: Double
+ Pending,
+
+ /**
+ * The resource provider is active and the resource is currently being consumed.
+ */
+ Active,
+
+ /**
+ * The resource provider is stopped and the resource cannot be consumed anymore.
+ */
+ Stopped
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index cd1af3fc..53fec16a 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -23,26 +23,26 @@
package org.opendc.simulator.resources
/**
- * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers.
+ * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
*/
-public interface SimResourceSwitch<R : SimResource> : AutoCloseable {
+public interface SimResourceSwitch : AutoCloseable {
/**
* The output resource providers to which resource consumers can be attached.
*/
- public val outputs: Set<SimResourceProvider<R>>
+ public val outputs: Set<SimResourceProvider>
/**
* The input resources that will be switched between the output providers.
*/
- public val inputs: Set<SimResourceProvider<R>>
+ public val inputs: Set<SimResourceProvider>
/**
- * Add an output to the switch represented by [resource].
+ * Add an output to the switch with the specified [capacity].
*/
- public fun addOutput(resource: R): SimResourceProvider<R>
+ public fun addOutput(capacity: Double): SimResourceProvider
/**
* Add the specified [input] to the switch.
*/
- public fun addInput(input: SimResourceProvider<R>)
+ public fun addInput(input: SimResourceProvider)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 060d0ea2..a10f84b6 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -22,69 +22,72 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.launch
import java.util.ArrayDeque
-import kotlin.coroutines.CoroutineContext
/**
* A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
* a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
*/
-public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> {
+public class SimResourceSwitchExclusive : SimResourceSwitch {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ * A flag to indicate that the switch is closed.
*/
- private val scope = CoroutineScope(context + Job())
+ private var isClosed: Boolean = false
- private val _outputs = mutableSetOf<SimResourceProvider<R>>()
- override val outputs: Set<SimResourceProvider<R>>
+ private val _outputs = mutableSetOf<Provider>()
+ override val outputs: Set<SimResourceProvider>
get() = _outputs
- private val availableResources = ArrayDeque<SimResourceForwarder<R>>()
- private val _inputs = mutableSetOf<SimResourceProvider<R>>()
- override val inputs: Set<SimResourceProvider<R>>
+ private val availableResources = ArrayDeque<SimResourceForwarder>()
+
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
get() = _inputs
- override fun addOutput(resource: R): SimResourceProvider<R> {
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
- val output = Provider(resource, forwarder)
+ val output = Provider(capacity, forwarder)
_outputs += output
return output
}
- override fun addInput(input: SimResourceProvider<R>) {
+ override fun addInput(input: SimResourceProvider) {
+ check(!isClosed) { "Switch has been closed" }
+
if (input in inputs) {
return
}
- val forwarder = SimResourceForwarder(input.resource)
-
- scope.launch { input.consume(forwarder) }
+ val forwarder = SimResourceForwarder()
_inputs += input
availableResources += forwarder
+
+ input.startConsumer(object : SimResourceConsumer by forwarder {
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ // De-register the input after it has finished
+ _inputs -= input
+ forwarder.onFinish(ctx, cause)
+ }
+ })
}
override fun close() {
- scope.cancel()
+ isClosed = true
+
+ // Cancel all upstream subscriptions
+ _inputs.forEach(SimResourceProvider::cancel)
}
private inner class Provider(
- override val resource: R,
- private val forwarder: SimResourceForwarder<R>
- ) : SimResourceProvider<R> {
-
- override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer)
-
- override fun interrupt() {
- forwarder.interrupt()
- }
-
+ private val capacity: Double,
+ private val forwarder: SimResourceForwarder
+ ) : SimResourceProvider by forwarder {
override fun close() {
+ // We explicitly do not close the forwarder here in order to re-use it across output resources.
+
_outputs -= this
availableResources += forwarder
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index bcf76d3c..c796c251 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -23,99 +23,61 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.*
-import org.opendc.simulator.resources.consumer.SimConsumerBarrier
import java.time.Clock
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
-public class SimResourceSwitchMaxMin<R : SimResource>(
- private val clock: Clock,
- context: CoroutineContext,
- private val listener: Listener<R>? = null
-) : SimResourceSwitch<R> {
- /**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- private val inputConsumers = mutableSetOf<InputConsumer>()
- private val _outputs = mutableSetOf<OutputProvider>()
- override val outputs: Set<SimResourceProvider<R>>
+public class SimResourceSwitchMaxMin(
+ clock: Clock,
+ private val listener: Listener? = null
+) : SimResourceSwitch {
+ private val _outputs = mutableSetOf<SimResourceProvider>()
+ override val outputs: Set<SimResourceProvider>
get() = _outputs
- private val _inputs = mutableSetOf<SimResourceProvider<R>>()
- override val inputs: Set<SimResourceProvider<R>>
+ private val _inputs = mutableSetOf<SimResourceProvider>()
+ override val inputs: Set<SimResourceProvider>
get() = _inputs
/**
- * The commands to submit to the underlying host.
- */
- private val commands = mutableMapOf<R, SimResourceCommand>()
-
- /**
- * The active output contexts.
- */
- private val outputContexts: MutableList<OutputContext> = mutableListOf()
-
- /**
- * The total amount of remaining work (of all pCPUs).
- */
- private var totalRemainingWork: Double = 0.0
-
- /**
- * The total speed requested by the vCPUs.
- */
- private var totalRequestedSpeed = 0.0
-
- /**
- * The total amount of work requested by the vCPUs.
- */
- private var totalRequestedWork = 0.0
-
- /**
- * The total allocated speed for the vCPUs.
- */
- private var totalAllocatedSpeed = 0.0
-
- /**
- * The total allocated work requested for the vCPUs.
- */
- private var totalAllocatedWork = 0.0
-
- /**
- * The amount of work that could not be performed due to over-committing resources.
- */
- private var totalOvercommittedWork = 0.0
-
- /**
- * The amount of work that was lost due to interference.
+ * A flag to indicate that the switch was closed.
*/
- private var totalInterferedWork = 0.0
+ private var isClosed = false
/**
- * A flag to indicate that the scheduler has submitted work that has not yet been completed.
+ * The aggregator to aggregate the resources.
*/
- private var isDirty: Boolean = false
+ private val aggregator = SimResourceAggregatorMaxMin(clock)
/**
- * The scheduler barrier.
+ * The distributor to distribute the aggregated resources.
*/
- private var barrier: SimConsumerBarrier = SimConsumerBarrier(0)
+ private val distributor = SimResourceDistributorMaxMin(
+ aggregator.output, clock,
+ object : SimResourceDistributorMaxMin.Listener {
+ override fun onSliceFinish(
+ switch: SimResourceDistributor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
+ }
+ }
+ )
/**
* Add an output to the switch represented by [resource].
*/
- override fun addOutput(resource: R): SimResourceProvider<R> {
- val provider = OutputProvider(resource)
+ override fun addOutput(capacity: Double): SimResourceProvider {
+ check(!isClosed) { "Switch has been closed" }
+
+ val provider = distributor.addOutput(capacity)
_outputs.add(provider)
return provider
}
@@ -123,166 +85,29 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
/**
* Add the specified [input] to the switch.
*/
- override fun addInput(input: SimResourceProvider<R>) {
- val consumer = InputConsumer(input)
- _inputs.add(input)
- inputConsumers += consumer
- }
-
- override fun close() {
- scope.cancel()
- }
-
- /**
- * Indicate that the workloads should be re-scheduled.
- */
- private fun schedule() {
- isDirty = true
- interruptAll()
- }
-
- /**
- * Schedule the work over the physical CPUs.
- */
- private fun doSchedule() {
- // If there is no work yet, mark all inputs as idle.
- if (outputContexts.isEmpty()) {
- commands.replaceAll { _, _ -> SimResourceCommand.Idle() }
- interruptAll()
- }
-
- val maxUsage = inputs.sumByDouble { it.resource.capacity }
- var duration: Double = Double.MAX_VALUE
- var deadline: Long = Long.MAX_VALUE
- var availableSpeed = maxUsage
- var totalRequestedSpeed = 0.0
- var totalRequestedWork = 0.0
-
- // Sort the outputs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- outputContexts.sort()
-
- // Divide the available input capacity fairly across the outputs using max-min fair sharing
- val outputIterator = outputContexts.listIterator()
- var remaining = outputContexts.size
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
- val availableShare = availableSpeed / remaining--
-
- when (val command = output.activeCommand) {
- is SimResourceCommand.Idle -> {
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, command.deadline)
-
- output.actualSpeed = 0.0
- }
- is SimResourceCommand.Consume -> {
- val grantedSpeed = min(output.allowedSpeed, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, command.deadline)
-
- // Ignore idle computation
- if (grantedSpeed <= 0.0 || command.work <= 0.0) {
- output.actualSpeed = 0.0
- continue
- }
-
- totalRequestedSpeed += command.limit
- totalRequestedWork += command.work
-
- output.actualSpeed = grantedSpeed
- availableSpeed -= grantedSpeed
-
- // The duration that we want to run is that of the shortest request from an output
- duration = min(duration, command.work / grantedSpeed)
- }
- SimResourceCommand.Exit -> {
- // Apparently the output consumer has exited, so remove it from the scheduling queue.
- outputIterator.remove()
- }
- }
- }
-
- // Round the duration to milliseconds
- duration = ceil(duration * 1000) / 1000
-
- assert(deadline >= clock.millis()) { "Deadline already passed" }
-
- val totalAllocatedSpeed = maxUsage - availableSpeed
- var totalAllocatedWork = 0.0
- availableSpeed = totalAllocatedSpeed
-
- // Divide the requests over the available capacity of the input resources fairly
- for (input in inputs.sortedByDescending { it.resource.capacity }) {
- val maxResourceUsage = input.resource.capacity
- val fraction = maxResourceUsage / maxUsage
- val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction)
- val grantedWork = duration * grantedSpeed
-
- commands[input.resource] =
- if (grantedWork > 0.0 && grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
- else
- SimResourceCommand.Idle(deadline)
-
- totalAllocatedWork += grantedWork
- availableSpeed -= grantedSpeed
- }
-
- this.totalRequestedSpeed = totalRequestedSpeed
- this.totalRequestedWork = totalRequestedWork
- this.totalAllocatedSpeed = totalAllocatedSpeed
- this.totalAllocatedWork = totalAllocatedWork
-
- interruptAll()
- }
-
- /**
- * Flush the progress of the vCPUs.
- */
- private fun flushGuests() {
- // Flush all the outputs work
- for (output in outputContexts) {
- output.flush(isIntermediate = true)
- }
-
- // Report metrics
- listener?.onSliceFinish(
- this,
- totalRequestedWork.toLong(),
- (totalAllocatedWork - totalRemainingWork).toLong(),
- totalOvercommittedWork.toLong(),
- totalInterferedWork.toLong(),
- totalRequestedSpeed,
- totalAllocatedSpeed
- )
- totalRemainingWork = 0.0
- totalInterferedWork = 0.0
- totalOvercommittedWork = 0.0
+ override fun addInput(input: SimResourceProvider) {
+ check(!isClosed) { "Switch has been closed" }
- // Force all inputs to re-schedule their work.
- doSchedule()
+ aggregator.addInput(input)
}
- /**
- * Interrupt all inputs.
- */
- private fun interruptAll() {
- for (input in inputConsumers) {
- input.interrupt()
+ override fun close() {
+ if (!isClosed) {
+ isClosed = true
+ distributor.close()
+ aggregator.close()
}
}
/**
* Event listener for hypervisor events.
*/
- public interface Listener<R : SimResource> {
+ public interface Listener {
/**
* This method is invoked when a slice is finished.
*/
public fun onSliceFinish(
- switch: SimResourceSwitchMaxMin<R>,
+ switch: SimResourceSwitchMaxMin,
requestedWork: Long,
grantedWork: Long,
overcommittedWork: Long,
@@ -291,218 +116,4 @@ public class SimResourceSwitchMaxMin<R : SimResource>(
cpuDemand: Double
)
}
-
- /**
- * An internal [SimResourceProvider] implementation for switch outputs.
- */
- private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> {
- /**
- * A flag to indicate that the resource was closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The current active consumer.
- */
- private var cont: CancellableContinuation<Unit>? = null
-
- /**
- * The [OutputContext] that is currently running.
- */
- private var ctx: OutputContext? = null
-
- override suspend fun consume(consumer: SimResourceConsumer<R>) {
- check(!isClosed) { "Lifetime of resource has ended." }
- check(cont == null) { "Run should not be called concurrently" }
-
- try {
- return suspendCancellableCoroutine { cont ->
- val ctx = OutputContext(resource, consumer, cont)
- ctx.start()
- cont.invokeOnCancellation {
- ctx.stop()
- }
-
- this.cont = cont
- this.ctx = ctx
-
- outputContexts += ctx
- schedule()
- }
- } finally {
- cont = null
- ctx = null
- }
- }
-
- override fun close() {
- isClosed = true
- cont?.cancel()
- cont = null
- ctx = null
- _outputs.remove(this)
- }
-
- override fun interrupt() {
- ctx?.interrupt()
- }
- }
-
- /**
- * A [SimAbstractResourceContext] for the output resources.
- */
- private inner class OutputContext(
- resource: R,
- consumer: SimResourceConsumer<R>,
- private val cont: Continuation<Unit>
- ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> {
- /**
- * The current command that is processed by the vCPU.
- */
- var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- var allowedSpeed: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- var actualSpeed: Double = 0.0
-
- /**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
- override fun onIdle(deadline: Long) {
- allowedSpeed = 0.0
- activeCommand = SimResourceCommand.Idle(deadline)
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- allowedSpeed = getSpeed(limit)
- activeCommand = SimResourceCommand.Consume(work, limit, deadline)
- }
-
- override fun onFinish() {
- hasExited = true
- activeCommand = SimResourceCommand.Exit
- cont.resume(Unit)
- }
-
- override fun onFailure(cause: Throwable) {
- hasExited = true
- activeCommand = SimResourceCommand.Exit
- cont.resumeWithException(cause)
- }
-
- override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
- // Apply performance interference model
- val performanceScore = 1.0
-
- // Compute the remaining amount of work
- val remainingWork = if (work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = actualSpeed / totalAllocatedSpeed
-
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
-
- totalInterferedWork += interferedWork
-
- max(0.0, work - processed)
- } else {
- 0.0
- }
-
- if (!isInterrupted) {
- totalOvercommittedWork += remainingWork
- }
-
- return remainingWork
- }
-
- override fun interrupt() {
- // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
- // to infinite recursion.
- if (isProcessing) {
- return
- }
-
- super.interrupt()
-
- // Force the scheduler to re-schedule
- schedule()
- }
-
- override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
- }
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> {
- /**
- * The resource context of the consumer.
- */
- private lateinit var ctx: SimResourceContext<R>
-
- init {
- scope.launch {
- try {
- barrier = SimConsumerBarrier(barrier.parties + 1)
- input.consume(this@InputConsumer)
- } catch (e: CancellationException) {
- // Cancel gracefully
- throw e
- } catch (e: Throwable) {
- e.printStackTrace()
- } finally {
- barrier = SimConsumerBarrier(barrier.parties - 1)
- inputConsumers -= this@InputConsumer
- _inputs -= input
- }
- }
- }
-
- /**
- * Interrupt the consumer
- */
- fun interrupt() {
- ctx.interrupt()
- }
-
- override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
- this.ctx = ctx
- return commands[ctx.resource] ?: SimResourceCommand.Idle()
- }
-
- override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
- totalRemainingWork += remainingWork
- val isLast = barrier.enter()
-
- // Flush the progress of the guest after the barrier has been reached.
- if (isLast && isDirty) {
- isDirty = false
- flushGuests()
- }
-
- return if (isDirty) {
- // Wait for the scheduler determine the work after the barrier has been reached by all CPUs.
- SimResourceCommand.Idle()
- } else {
- // Indicate that the scheduler needs to run next call.
- if (isLast) {
- isDirty = true
- }
-
- commands[ctx.resource] ?: SimResourceCommand.Idle()
- }
- }
- }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
index 7aa5a5aa..52a42241 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
@@ -42,4 +42,11 @@ public class SimConsumerBarrier(public val parties: Int) {
}
return false
}
+
+ /**
+ * Reset the barrier.
+ */
+ public fun reset() {
+ counter = 0
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
new file mode 100644
index 00000000..fd4a9ed5
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+import kotlin.math.min
+
+/**
+ * Helper class to expose an observable [speed] field describing the speed of the consumer.
+ */
+public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate {
+ /**
+ * The resource processing speed over time.
+ */
+ public val speed: StateFlow<Double>
+ get() = _speed
+ private val _speed = MutableStateFlow(0.0)
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val command = delegate.onNext(ctx)
+
+ when (command) {
+ is SimResourceCommand.Idle -> _speed.value = 0.0
+ is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit)
+ is SimResourceCommand.Exit -> _speed.value = 0.0
+ }
+
+ return command
+ }
+
+ override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ val oldSpeed = _speed.value
+
+ delegate.onCapacityChanged(ctx, isThrottled)
+
+ // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
+ // need to update the current speed.
+ if (oldSpeed == _speed.value) {
+ _speed.value = min(ctx.capacity, _speed.value)
+ }
+ }
+
+ override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index 03a3cebd..a52d1d5d 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResource
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -31,14 +30,16 @@ import org.opendc.simulator.resources.SimResourceContext
* A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
* consumption for some period of time.
*/
-public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
- private val iterator = trace.iterator()
+public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
+ private var iterator: Iterator<Fragment>? = null
- override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand {
- return onNext(ctx, 0.0)
+ override fun onStart(ctx: SimResourceContext) {
+ check(iterator == null) { "Consumer already running" }
+ iterator = trace.iterator()
}
- override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
val now = ctx.clock.millis()
val fragment = iterator.next()
@@ -56,6 +57,10 @@ public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<S
}
}
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ iterator = null
+ }
+
/**
* A fragment of the workload.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
new file mode 100644
index 00000000..faa693c4
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
+ */
+public class SimWorkConsumer(
+ private val work: Double,
+ private val utilization: Double
+) : SimResourceConsumer {
+
+ init {
+ require(work >= 0.0) { "Work must be positive" }
+ require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ }
+
+ private var isFirst = true
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val limit = ctx.capacity * utilization
+ val work = if (isFirst) {
+ isFirst = false
+ work
+ } else {
+ ctx.remainingWork
+ }
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
new file mode 100644
index 00000000..de864c1c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceAggregatorMaxMin] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceAggregatorMaxMinTest {
+ @Test
+ fun testSingleCapacity() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ val usage = mutableListOf<Double>()
+ val job = launch { sources[0].speed.toList(usage) }
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+
+ assertAll(
+ { assertEquals(1000, currentTime) },
+ { assertEquals(listOf(0.0, 0.5, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ job.cancel()
+ }
+ }
+
+ @Test
+ fun testDoubleCapacity() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ val usage = mutableListOf<Double>()
+ val job = launch { sources[0].speed.toList(usage) }
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+ assertAll(
+ { assertEquals(1000, currentTime) },
+ { assertEquals(listOf(0.0, 1.0, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ job.cancel()
+ }
+ }
+
+ @Test
+ fun testOvercommit() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+ assertEquals(1000, currentTime)
+
+ verify(exactly = 2) { consumer.onNext(any()) }
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testException() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ yield()
+ assertEquals(SimResourceState.Pending, sources[0].state)
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(4.0, 1.0)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(1000)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(2334, currentTime)
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testFailOverCapacity() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(500)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(1000, currentTime)
+ } finally {
+ aggregator.output.close()
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index e7642dc1..030a0f6b 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -22,11 +22,10 @@
package org.opendc.simulator.resources
+import io.mockk.*
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertTrue
import org.opendc.simulator.utils.DelayControllerClockAdapter
/**
@@ -34,39 +33,17 @@ import org.opendc.simulator.utils.DelayControllerClockAdapter
*/
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceContextTest {
- data class SimCpu(val speed: Double) : SimResource {
- override val capacity: Double
- get() = speed
- }
-
@Test
fun testFlushWithoutCommand() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val resource = SimCpu(4200.0)
-
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(10.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
- val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- }
-
- override fun onFinish() {
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- override fun onFailure(cause: Throwable) {
- }
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
}
context.flush()
@@ -75,72 +52,35 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlush() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(10.0, 1.0)
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
- var counter = 0
- val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- counter++
- }
-
- override fun onFinish() {
- }
-
- override fun onFailure(cause: Throwable) {
- }
- }
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
context.flush(isIntermediate = true)
- assertEquals(2, counter)
+
+ verify(exactly = 2) { context.onConsume(any(), any(), any()) }
}
@Test
fun testIntermediateFlushIdle() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val resource = SimCpu(4200.0)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle(10)
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
- var counter = 0
- var isFinished = false
- val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
- override fun onIdle(deadline: Long) {
- counter++
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- }
-
- override fun onFinish() {
- isFinished = true
- }
-
- override fun onFailure(cause: Throwable) {
- }
- }
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
context.start()
delay(5)
@@ -149,8 +89,25 @@ class SimResourceContextTest {
context.flush(isIntermediate = true)
assertAll(
- { assertEquals(1, counter) },
- { assertTrue(isFinished) }
+ { verify(exactly = 2) { context.onIdle(any()) } },
+ { verify(exactly = 1) { context.onFinish(null) } }
)
}
+
+ @Test
+ fun testDoubleStart() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ }
+
+ context.start()
+ assertThrows<IllegalStateException> { context.start() }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
index ced1bd98..143dbca9 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
@@ -22,10 +22,16 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.launch
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -34,59 +40,147 @@ import org.opendc.utils.TimerScheduler
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceForwarderTest {
-
- data class SimCpu(val speed: Double) : SimResource {
- override val capacity: Double
- get() = speed
- }
-
@Test
fun testExitImmediately() = runBlockingTest {
- val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val forwarder = SimResourceForwarder()
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+ val source = SimResourceSource(2000.0, clock, scheduler)
launch {
source.consume(forwarder)
source.close()
}
- forwarder.consume(object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Exit
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ forwarder.consume(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
return SimResourceCommand.Exit
}
})
+
forwarder.close()
scheduler.close()
}
@Test
fun testExit() = runBlockingTest {
- val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val forwarder = SimResourceForwarder()
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+ val source = SimResourceSource(2000.0, clock, scheduler)
launch {
source.consume(forwarder)
source.close()
}
- forwarder.consume(object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
+ forwarder.consume(object : SimResourceConsumer {
+ var isFirst = true
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(10.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
})
forwarder.close()
}
+
+ @Test
+ fun testState() = runBlockingTest {
+ val forwarder = SimResourceForwarder()
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit
+ }
+
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.startConsumer(consumer)
+ assertEquals(SimResourceState.Active, forwarder.state)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.close()
+ assertEquals(SimResourceState.Stopped, forwarder.state)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingTest {
+ val forwarder = SimResourceForwarder()
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingTest {
+ val forwarder = SimResourceForwarder()
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingTest {
+ val forwarder = SimResourceForwarder()
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingTest {
+ val forwarder = SimResourceForwarder()
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(consumer) }
+ delay(1000)
+ source.capacity = 0.5
+ }
+
+ assertEquals(3000, currentTime)
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 4f7825fc..58e19421 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -22,11 +22,16 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -35,26 +40,17 @@ import org.opendc.utils.TimerScheduler
*/
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceSourceTest {
- data class SimCpu(val speed: Double) : SimResource {
- override val capacity: Double
- get() = speed
- }
-
@Test
fun testSpeed() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, capacity))
+ .andThen(SimResourceCommand.Exit)
try {
val res = mutableListOf<Double>()
@@ -63,7 +59,7 @@ class SimResourceSourceTest {
provider.consume(consumer)
job.cancel()
- assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" }
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
scheduler.close()
provider.close()
@@ -71,20 +67,38 @@ class SimResourceSourceTest {
}
@Test
- fun testSpeedLimit() = runBlockingTest {
+ fun testAdjustCapacity() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val provider = SimResourceSource(1.0, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed)
- }
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
+ try {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
}
+ assertEquals(3000, currentTime)
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ } finally {
+ scheduler.close()
+ provider.close()
}
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity))
+ .andThen(SimResourceCommand.Exit)
try {
val res = mutableListOf<Double>()
@@ -93,7 +107,7 @@ class SimResourceSourceTest {
provider.consume(consumer)
job.cancel()
- assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" }
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
scheduler.close()
provider.close()
@@ -108,16 +122,16 @@ class SimResourceSourceTest {
fun testIntermediateInterrupt() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ val consumer = object : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
ctx.interrupt()
- return SimResourceCommand.Exit
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return SimResourceCommand.Exit
}
}
@@ -133,18 +147,24 @@ class SimResourceSourceTest {
fun testInterrupt() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
- lateinit var resCtx: SimResourceContext<SimCpu>
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+ lateinit var resCtx: SimResourceContext
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ val consumer = object : SimResourceConsumer {
+ var isFirst = true
+ override fun onStart(ctx: SimResourceContext) {
resCtx = ctx
- return SimResourceCommand.Consume(4.0, 1.0)
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- assertEquals(0.0, remainingWork)
- return SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ assertEquals(0.0, ctx.remainingWork)
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(4.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
@@ -166,17 +186,12 @@ class SimResourceSourceTest {
fun testFailure() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- throw IllegalStateException()
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onStart(any()) }
+ .throws(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -192,17 +207,13 @@ class SimResourceSourceTest {
fun testExceptionPropagationOnNext() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -218,17 +229,13 @@ class SimResourceSourceTest {
fun testConcurrentConsumption() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -247,17 +254,13 @@ class SimResourceSourceTest {
fun testClosedConsumption() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
assertThrows<IllegalStateException> {
@@ -274,17 +277,13 @@ class SimResourceSourceTest {
fun testCloseDuringConsumption() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
try {
launch { provider.consume(consumer) }
@@ -302,17 +301,13 @@ class SimResourceSourceTest {
fun testIdle() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle(ctx.clock.millis() + 500)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(clock.millis() + 500))
+ .andThen(SimResourceCommand.Exit)
try {
provider.consume(consumer)
@@ -330,17 +325,13 @@ class SimResourceSourceTest {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
-
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle()
- }
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle())
+ .andThenThrows(IllegalStateException())
try {
provider.consume(consumer)
@@ -351,4 +342,26 @@ class SimResourceSourceTest {
}
}
}
+
+ @Test
+ fun testIncorrectDeadline() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(2))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ delay(10)
+
+ assertThrows<IllegalArgumentException> { provider.consume(consumer) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index ca6558bf..edd60502 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
@@ -34,18 +36,12 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
-import java.lang.IllegalStateException
/**
* Test suite for the [SimResourceSwitchExclusive] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceSwitchExclusiveTest {
- class SimCpu(val speed: Double) : SimResource {
- override val capacity: Double
- get() = speed
- }
-
/**
* Test a trace workload.
*/
@@ -67,12 +63,12 @@ internal class SimResourceSwitchExclusiveTest {
),
)
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
- val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
switch.addInput(source)
- val provider = switch.addOutput(SimCpu(3200.0))
+ val provider = switch.addOutput(3200.0)
val job = launch { source.speed.toList(speed) }
try {
@@ -98,22 +94,15 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration / 1000.0, 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
- val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
switch.addInput(source)
- val provider = switch.addOutput(SimCpu(3200.0))
+ val provider = switch.addOutput(3200.0)
try {
provider.consume(workload)
@@ -133,22 +122,29 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ val workload = object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onStart(ctx: SimResourceContext) {
+ isFirst = true
}
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
- val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
switch.addInput(source)
- val provider = switch.addOutput(SimCpu(3200.0))
+ val provider = switch.addOutput(3200.0)
try {
provider.consume(workload)
@@ -169,22 +165,15 @@ internal class SimResourceSwitchExclusiveTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(duration.toDouble(), 1.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
- val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
- val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
switch.addInput(source)
- switch.addOutput(SimCpu(3200.0))
- assertThrows<IllegalStateException> { switch.addOutput(SimCpu(3200.0)) }
+ switch.addOutput(3200.0)
+ assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 698c1700..5f4fd187 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -38,31 +40,19 @@ import org.opendc.utils.TimerScheduler
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimResourceSwitchMaxMinTest {
- class SimCpu(val speed: Double) : SimResource {
- override val capacity: Double
- get() = speed
- }
-
@Test
fun testSmoke() = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext)
+ val switch = SimResourceSwitchMaxMin(clock)
- val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) }
+ val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) }
sources.forEach { switch.addInput(it) }
- val provider = switch.addOutput(SimCpu(1000.0))
-
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
+ val provider = switch.addOutput(1000.0)
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
try {
provider.consume(consumer)
@@ -81,13 +71,13 @@ internal class SimResourceSwitchMaxMinTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> {
+ val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
var totalGrantedWork = 0L
var totalOvercommittedWork = 0L
override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin<SimCpu>,
+ switch: SimResourceSwitchMaxMin,
requestedWork: Long,
grantedWork: Long,
overcommittedWork: Long,
@@ -112,11 +102,11 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
- val provider = switch.addOutput(SimCpu(3200.0))
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val provider = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
provider.consume(workload)
yield()
} finally {
@@ -140,13 +130,13 @@ internal class SimResourceSwitchMaxMinTest {
val clock = DelayControllerClockAdapter(this)
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> {
+ val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
var totalGrantedWork = 0L
var totalOvercommittedWork = 0L
override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin<SimCpu>,
+ switch: SimResourceSwitchMaxMin,
requestedWork: Long,
grantedWork: Long,
overcommittedWork: Long,
@@ -180,12 +170,12 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
- val providerA = switch.addOutput(SimCpu(3200.0))
- val providerB = switch.addOutput(SimCpu(3200.0))
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val providerA = switch.addOutput(3200.0)
+ val providerB = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
coroutineScope {
launch { providerA.consume(workloadA) }
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
new file mode 100644
index 00000000..4d6b19ee
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimWorkConsumer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimWorkConsumerTest {
+ @Test
+ fun testSmoke() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(1000, currentTime)
+ } finally {
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testUtilization() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(2000, currentTime)
+ } finally {
+ provider.close()
+ }
+ }
+}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index 49964938..d4bc7b5c 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -93,7 +93,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
try {
timer()
} catch (e: Throwable) {
- Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e)
+ coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e)
}
}
}