summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-12-30 14:03:12 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-16 12:06:40 +0100
commit6a2a5423479696e8dc28885be27cc3e3252f28b0 (patch)
treee23dd1d7ab3a15969da5f7e02baf24a9434b9912 /simulator
parentdf2f52780c08c5d108741d3746eaf03222c64841 (diff)
simulator: Add generic framework for resource consumption modeling
This change adds a generic framework for modeling resource consumptions and adapts opendc-simulator-compute to model machines and VMs on top of this framework. This framework anticipates the addition of additional resource types such as memory, disk and network to the OpenDC codebase.
Diffstat (limited to 'simulator')
-rw-r--r--simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts26
-rw-r--r--simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt12
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt16
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt12
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt16
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts1
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt276
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt263
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt)25
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt199
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt)9
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt)2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt)11
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt46
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt34
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt52
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt27
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt86
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt63
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts37
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt255
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt33
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt)14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt45
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt46
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt45
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt133
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt74
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt285
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt2
-rw-r--r--simulator/settings.gradle.kts1
36 files changed, 1436 insertions, 747 deletions
diff --git a/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts
index 544e34bf..e0bc2ce4 100644
--- a/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts
+++ b/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts
@@ -1,29 +1,3 @@
-import org.gradle.kotlin.dsl.`java-library`
-import org.gradle.kotlin.dsl.jacoco
-import org.gradle.kotlin.dsl.kotlin
-
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
/*
* Copyright (c) 2021 AtLarge Research
*
diff --git a/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts
index 8d6420be..ab13215b 100644
--- a/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts
+++ b/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts
@@ -46,5 +46,6 @@ kotlin {
tasks.withType<KotlinCompile>().configureEach {
kotlinOptions.jvmTarget = Versions.jvmTarget.toString()
+ kotlinOptions.useIR = true
kotlinOptions.freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 19fa3e97..0da81152 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -35,7 +35,7 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
import org.opendc.simulator.failures.FailureDomain
import org.opendc.utils.flow.EventFlow
import java.time.Clock
@@ -216,7 +216,7 @@ public class SimHost(
val originalCpu = machine.model.cpus[0]
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+ val memoryUnits = listOf(SimMemoryUnit("Generic", "Generic", 3200.0, memorySize))
return SimMachineModel(processingUnits, memoryUnits)
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e1a1d87e..a45ab9fc 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -40,9 +40,9 @@ import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.service.driver.HostEvent
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.time.Clock
@@ -62,11 +62,11 @@ internal class SimHostTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 3da8d0b3..85a2e413 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.io.InputStream
import java.util.*
@@ -61,12 +61,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
val cores = machine.cpus.flatMap { id ->
when (id) {
1 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) }
}
2 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) }
}
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
@@ -75,7 +75,7 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
UUID(0L, counter++.toLong()),
"node-$counter",
emptyMap(),
- SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
+ SimMachineModel(cores, listOf(SimMemoryUnit("", "", 2300.0, 16000))),
ConstantPowerModel(0.0)
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 9a06a40f..094bc975 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -26,9 +26,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
@@ -88,8 +88,8 @@ public class Sc20ClusterEnvironmentReader(
memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
coresPerHost = values[coresPerHostCol].trim().toInt()
- val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
- val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val unknownProcessingNode = SimProcessingNode("unknown", "unknown", "unknown", coresPerHost)
+ val unknownMemoryUnit = SimMemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
repeat(numberOfHosts) {
nodes.add(
@@ -99,7 +99,7 @@ public class Sc20ClusterEnvironmentReader(
mapOf("cluster" to clusterId),
SimMachineModel(
List(coresPerHost) { coreId ->
- ProcessingUnit(unknownProcessingNode, coreId, speed)
+ SimProcessingUnit(unknownProcessingNode, coreId, speed)
},
listOf(unknownMemoryUnit)
),
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index effd0286..87a49f49 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.io.InputStream
import java.util.*
@@ -60,19 +60,19 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
val cores = machine.cpus.flatMap { id ->
when (id) {
1 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) }
}
2 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) }
}
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
val memories = machine.memories.map { id ->
when (id) {
- 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
+ 1 -> SimMemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
index e7e99a3d..0ff40a28 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -34,9 +34,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.util.*
/**
@@ -56,13 +56,13 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
val cores = cpu.getInteger("numberOfCores")
val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
// TODO Remove hardcoding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
+ val node = SimProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
+ SimProcessingUnit(node, coreId, speed)
}
}
val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
- MemoryUnit(
+ SimMemoryUnit(
"Samsung",
memory.getString("name"),
memory.get("speedMbPerS", Number::class.java).toDouble(),
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index 19af6fe8..66d7d9e5 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -30,5 +30,6 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-simulator:opendc-simulator-core"))
+ api(project(":opendc-simulator:opendc-simulator-resources"))
implementation(project(":opendc-utils"))
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index f74c5697..b1d1c0b7 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -25,16 +25,16 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.*
import org.opendc.utils.TimerScheduler
import java.time.Clock
import java.util.*
import kotlin.coroutines.*
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
* A simulated bare-metal machine that is able to run a single workload.
@@ -52,11 +52,9 @@ public class SimBareMetalMachine(
private val clock: Clock,
override val model: SimMachineModel
) : SimMachine {
- /**
- * A [StateFlow] representing the CPU usage of the simulated machine.
- */
+ private val _usage = MutableStateFlow(0.0)
override val usage: StateFlow<Double>
- get() = usageState
+ get() = _usage
/**
* A flag to indicate that the machine is terminated.
@@ -64,249 +62,63 @@ public class SimBareMetalMachine(
private var isTerminated = false
/**
- * The [MutableStateFlow] containing the load of the server.
- */
- private val usageState = MutableStateFlow(0.0)
-
- /**
- * The current active workload.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
- * The active CPUs of this machine.
- */
- private var cpus: List<Cpu> = emptyList()
-
- /**
* The [TimerScheduler] to use for scheduling the interrupts.
*/
- private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock)
+ private val scheduler = TimerScheduler<Any>(coroutineScope, clock)
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The execution context in which the workload runs.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = this@SimBareMetalMachine.model
-
- override val clock: Clock
- get() = this@SimBareMetalMachine.clock
-
- override val meta: Map<String, Any>
- get() = meta
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- workload.onStart(ctx)
+ private inner class Context(val map: Map<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>,
+ override val meta: Map<String, Any>) : SimMachineContext {
+ override val clock: Clock
+ get() = this@SimBareMetalMachine.clock
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
- this.cpus = model.cpus.map { Cpu(ctx, it, workload) }
+ override val cpus: List<SimProcessingUnit> = model.cpus
- for (cpu in cpus) {
- cpu.start()
- }
- }
- }
+ override val memory: List<SimMemoryUnit> = model.memory
- /**
- * Terminate the specified bare-metal machine.
- */
- override fun close() {
- isTerminated = true
- }
-
- /**
- * Update the usage of the machine.
- */
- private fun updateUsage() {
- usageState.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency }
- }
-
- /**
- * This method is invoked when one of the CPUs has exited.
- */
- private fun onCpuExit(cpu: Int) {
- // Check whether all other CPUs have finished
- if (cpus.all { it.hasExited }) {
- val cont = cont
- this.cont = null
- cont?.resume(Unit)
+ override fun interrupt(resource: SimResource) {
+ val context = map[resource]
+ checkNotNull(context) { "Invalid resource" }
+ context.interrupt()
}
}
/**
- * This method is invoked when one of the CPUs failed.
- */
- private fun onCpuFailure(e: Throwable) {
- // Make sure no other tasks will be resumed.
- scheduler.cancelAll()
-
- // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
- // tasks.
- val cont = cont
- this.cont = null
- cont?.resumeWithException(e)
- }
-
- /**
- * A physical CPU of the machine.
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) {
- /**
- * The current command.
- */
- private var currentCommand: CommandWrapper? = null
-
- /**
- * The actual processing speed.
- */
- var speed: Double = 0.0
- set(value) {
- field = value
- updateUsage()
- }
-
- /**
- * A flag to indicate that the CPU is currently processing a command.
- */
- var isIntermediate: Boolean = false
-
- /**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
- /**
- * Process the specified [SimResourceCommand] for this CPU.
- */
- fun process(command: SimResourceCommand) {
- val timestamp = clock.millis()
-
- val task = when (command) {
- is SimResourceCommand.Idle -> {
- speed = 0.0
-
- val deadline = command.deadline
-
- require(deadline >= timestamp) { "Deadline already passed" }
-
- if (deadline != Long.MAX_VALUE) {
- scheduler.startSingleTimerTo(this, deadline) { flush() }
- } else {
- null
- }
- }
- is SimResourceCommand.Consume -> {
- val work = command.work
- val limit = command.limit
- val deadline = command.deadline
-
- require(deadline >= timestamp) { "Deadline already passed" }
-
- speed = min(model.frequency, limit)
-
- // The required duration to process all the work
- val finishedAt = timestamp + ceil(work / speed * 1000).toLong()
-
- scheduler.startSingleTimerTo(this, min(finishedAt, deadline)) { flush() }
- }
- is SimResourceCommand.Exit -> {
- speed = 0.0
- hasExited = true
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = coroutineScope {
+ require(!isTerminated) { "Machine is terminated" }
+ val map = mutableMapOf<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>()
+ val ctx = Context(map, meta)
+ val sources = model.cpus.map { SimResourceSource(it, clock, scheduler) }
+ val totalCapacity = model.cpus.sumByDouble { it.frequency }
- onCpuExit(model.id)
+ workload.onStart(ctx)
- null
+ for (source in sources) {
+ val consumer = workload.getConsumer(ctx, source.resource)
+ val job = source.speed
+ .onEach {
+ _usage.value = sources.sumByDouble { it.speed.value } / totalCapacity
}
- }
-
- assert(currentCommand == null) { "Concurrent access to current command" }
- currentCommand = CommandWrapper(timestamp, command)
- }
+ .launchIn(this)
- /**
- * Request the workload for more work.
- */
- private fun next(remainingWork: Double) {
- process(workload.onNext(ctx, model.id, remainingWork))
- }
-
- /**
- * Start the CPU.
- */
- fun start() {
- try {
- isIntermediate = true
-
- process(workload.onStart(ctx, model.id))
- } catch (e: Throwable) {
- onCpuFailure(e)
- } finally {
- isIntermediate = false
- }
- }
-
- /**
- * Flush the work performed by the CPU.
- */
- fun flush() {
- try {
- val (timestamp, command) = currentCommand ?: return
-
- isIntermediate = true
- currentCommand = null
-
- // Cancel the running task and flush the progress
- scheduler.cancel(this)
-
- when (command) {
- is SimResourceCommand.Idle -> next(remainingWork = 0.0)
- is SimResourceCommand.Consume -> {
- val duration = clock.millis() - timestamp
- val remainingWork = if (duration > 0L) {
- val processed = duration / 1000.0 * speed
- max(0.0, command.work - processed)
- } else {
- 0.0
- }
-
- next(remainingWork)
+ launch {
+ source.consume(object : SimResourceConsumer<SimProcessingUnit> by consumer {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ map[ctx.resource] = ctx
+ return consumer.onStart(ctx)
}
- SimResourceCommand.Exit -> throw IllegalStateException()
- }
- } catch (e: Throwable) {
- onCpuFailure(e)
- } finally {
- isIntermediate = false
+ })
+ job.cancel()
}
}
-
- /**
- * Interrupt the CPU.
- */
- fun interrupt() {
- // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
- // to infinite recursion.
- if (isIntermediate) {
- return
- }
-
- flush()
- }
}
- /**
- * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
- */
- private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
+ override fun close() {
+ isTerminated = true
+ scheduler.close()
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index bf6d8a5e..12b3b428 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -26,10 +26,11 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.compute.workload.SimWorkloadBarrier
+import org.opendc.simulator.resources.*
import java.time.Clock
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
@@ -44,22 +45,22 @@ import kotlin.math.min
*
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor {
+public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimExecutionContext) {
- val model = ctx.machine
+ override fun onStart(ctx: SimMachineContext) {
this.ctx = ctx
- this.commands = Array(model.cpus.size) { SimResourceCommand.Idle() }
- this.pCpus = model.cpus.indices.sortedBy { model.cpus[it].frequency }.toIntArray()
- this.maxUsage = model.cpus.sumByDouble { it.frequency }
- this.barrier = SimWorkloadBarrier(model.cpus.size)
+ this.commands = Array(ctx.cpus.size) { SimResourceCommand.Idle() }
+ this.pCpus = ctx.cpus.indices.sortedBy { ctx.cpus[it].frequency }.toIntArray()
+ this.maxUsage = ctx.cpus.sumByDouble { it.frequency }
+ this.barrier = SimWorkloadBarrier(ctx.cpus.size)
}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return commands[cpu]
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return this
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ val cpu = ctx.resource.id
totalRemainingWork += remainingWork
val isLast = barrier.enter()
@@ -82,6 +83,10 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
}
}
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ return commands[ctx.resource.id]
+ }
+
override fun canFit(model: SimMachineModel): Boolean = true
override fun createMachine(
@@ -92,7 +97,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
/**
* The execution context in which the hypervisor runs.
*/
- private lateinit var ctx: SimExecutionContext
+ private lateinit var ctx: SimMachineContext
/**
* The commands to submit to the underlying host.
@@ -199,7 +204,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
val vcpu = vcpuIterator.next()
val availableShare = availableSpeed / remaining--
- when (val command = vcpu.command) {
+ when (val command = vcpu.activeCommand) {
is SimResourceCommand.Idle -> {
// Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
@@ -246,7 +251,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
// Divide the requests over the available capacity of the pCPUs fairly
for (i in pCpus) {
- val maxCpuUsage = ctx.machine.cpus[i].frequency
+ val maxCpuUsage = ctx.cpus[i].frequency
val fraction = maxCpuUsage / maxUsage
val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction)
val grantedWork = duration * grantedSpeed
@@ -275,7 +280,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
private fun flushGuests() {
// Flush all the vCPUs work
for (vcpu in vcpus) {
- vcpu.flush(interrupt = false)
+ vcpu.flush(isIntermediate = true)
}
// Report metrics
@@ -299,9 +304,9 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
/**
* Interrupt all host CPUs.
*/
- private fun SimExecutionContext.interruptAll() {
- for (i in machine.cpus.indices) {
- interrupt(i)
+ private fun SimMachineContext.interruptAll() {
+ for (cpu in ctx.cpus) {
+ interrupt(cpu)
}
}
@@ -336,33 +341,38 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
private var cpus: List<VCpu> = emptyList()
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The execution context in which the workload runs.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
+ inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ override val cpus: List<SimProcessingUnit>
+ get() = model.cpus
- override val clock: Clock
- get() = this@SimFairShareHypervisor.ctx.clock
+ override val memory: List<SimMemoryUnit>
+ get() = model.memory
- override val meta: Map<String, Any>
- get() = meta
+ override val clock: Clock
+ get() = this@SimFairShareHypervisor.ctx.clock
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
+ override fun interrupt(resource: SimResource) {
+ TODO()
}
+ }
+
+ lateinit var ctx: SimMachineContext
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
+ ctx = Context(meta)
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) }
+ this.cpus = model.cpus.map { VCpu(this, it, workload.getConsumer(ctx, it), ctx.clock) }
for (cpu in cpus) {
// Register vCPU to scheduler
@@ -387,13 +397,13 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
* Update the usage of the VM.
*/
fun updateUsage() {
- usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.model.frequency }
+ usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.resource.frequency }
}
/**
* This method is invoked when one of the CPUs has exited.
*/
- fun onCpuExit(cpu: Int) {
+ fun onCpuExit() {
// Check whether all other CPUs have finished
if (cpus.all { it.hasExited }) {
val cont = cont
@@ -419,19 +429,14 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
*/
private inner class VCpu(
val vm: SimVm,
- val ctx: SimExecutionContext,
- val model: ProcessingUnit,
- val workload: SimWorkload
- ) : Comparable<VCpu> {
+ resource: SimProcessingUnit,
+ consumer: SimResourceConsumer<SimProcessingUnit>,
+ clock: Clock
+ ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer), Comparable<VCpu> {
/**
- * The latest command processed by the CPU.
+ * The current command that is processed by the vCPU.
*/
- var command: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The latest timestamp at which the vCPU was flushed.
- */
- var latestFlush: Long = 0
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
/**
* The processing speed that is allowed by the model constraints.
@@ -448,148 +453,74 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
}
/**
- * A flag to indicate that the CPU is currently processing a command.
- */
- var isIntermediate: Boolean = false
-
- /**
* A flag to indicate that the CPU has exited.
*/
- val hasExited: Boolean
- get() = command is SimResourceCommand.Exit
-
- /**
- * Process the specified [SimResourceCommand] for this CPU.
- */
- fun process(command: SimResourceCommand) {
- // Assign command as the most recent executed command
- this.command = command
-
- when (command) {
- is SimResourceCommand.Idle -> {
- require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" }
+ var hasExited: Boolean = false
- allowedSpeed = 0.0
- }
- is SimResourceCommand.Consume -> {
- require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" }
+ override fun onIdle(deadline: Long) {
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
- allowedSpeed = min(model.frequency, command.limit)
- }
- is SimResourceCommand.Exit -> {
- allowedSpeed = 0.0
- actualSpeed = 0.0
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ allowedSpeed = getSpeed(limit)
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
- vm.onCpuExit(model.id)
- }
- }
+ override fun onFinish() {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ vm.onCpuExit()
}
- /**
- * Start the CPU.
- */
- fun start() {
- try {
- isIntermediate = true
- latestFlush = ctx.clock.millis()
-
- process(workload.onStart(ctx, model.id))
- } catch (e: Throwable) {
- fail(e)
- } finally {
- isIntermediate = false
- }
+ override fun onFailure(cause: Throwable) {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ vm.onCpuFailure(cause)
}
- /**
- * Flush the work performed by the CPU.
- */
- fun flush(interrupt: Boolean) {
- val now = ctx.clock.millis()
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ // Apply performance interference model
+ val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0
- // Fast path: if the CPU was already flushed at at the current instant, no need to flush the progress.
- if (latestFlush >= now) {
- return
+ // Compute the remaining amount of work
+ val remainingWork = if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
}
- try {
- isIntermediate = true
- when (val command = command) {
- is SimResourceCommand.Idle -> {
- // Act like nothing has happened in case the vCPU did not reach its deadline or was not
- // interrupted by the user.
- if (interrupt || command.deadline <= now) {
- process(workload.onNext(ctx, model.id, 0.0))
- }
- }
- is SimResourceCommand.Consume -> {
- // Apply performance interference model
- val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0
-
- // Compute the remaining amount of work
- val remainingWork = if (command.work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = actualSpeed / totalAllocatedSpeed
-
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
- totalInterferedWork += interferedWork
-
- max(0.0, command.work - processed)
- } else {
- 0.0
- }
-
- // Act like nothing has happened in case the vCPU did not finish yet or was not interrupted by
- // the user.
- if (interrupt || remainingWork == 0.0 || command.deadline <= now) {
- if (!interrupt) {
- totalOvercommittedWork += remainingWork
- }
-
- process(workload.onNext(ctx, model.id, remainingWork))
- } else {
- process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
- }
- }
- SimResourceCommand.Exit ->
- throw IllegalStateException()
- }
- } catch (e: Throwable) {
- fail(e)
- } finally {
- latestFlush = now
- isIntermediate = false
+ if (!isInterrupted) {
+ totalOvercommittedWork += remainingWork
}
+
+ return remainingWork
}
- /**
- * Interrupt the CPU.
- */
- fun interrupt() {
+ override fun interrupt() {
// Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
// to infinite recursion.
- if (isIntermediate) {
+ if (isProcessing) {
return
}
- flush(interrupt = true)
+ super.interrupt()
// Force the scheduler to re-schedule
shouldSchedule()
}
- /**
- * Fail the CPU.
- */
- fun fail(e: Throwable) {
- command = SimResourceCommand.Exit
- vm.onCpuFailure(e)
- }
-
override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed)
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index 657dac66..5c67b990 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -22,6 +22,9 @@
package org.opendc.simulator.compute
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResource
import java.time.Clock
/**
@@ -29,27 +32,31 @@ import java.time.Clock
* firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on
* which the image runs.
*/
-public interface SimExecutionContext {
+public interface SimMachineContext {
/**
* The virtual clock tracking simulation time.
*/
public val clock: Clock
+
+ /**
+ * The metadata associated with the context.
+ */
+ public val meta: Map<String, Any>
/**
- * The machine model of the machine that is running the image.
+ * The CPUs available on the machine.
*/
- public val machine: SimMachineModel
+ public val cpus: List<SimProcessingUnit>
/**
- * The metadata associated with the context.
+ * The memory available on the machine
*/
- public val meta: Map<String, Any>
+ public val memory: List<SimMemoryUnit>
/**
- * Ask the host machine to interrupt the specified vCPU.
+ * Interrupt the specified [resource].
*
- * @param cpu The id of the vCPU to interrupt.
- * @throws IllegalArgumentException if the identifier points to a non-existing vCPU.
+ * @throws IllegalArgumentException if the resource does not belong to this execution context.
*/
- public fun interrupt(cpu: Int)
+ public fun interrupt(resource: SimResource)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
index c2988b11..d6bf0e99 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
/**
* A description of the physical or virtual machine on which a bootable image runs.
@@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.ProcessingUnit
* @property cpus The list of processing units available to the image.
* @property memory The list of memory units available to the image.
*/
-public data class SimMachineModel(public val cpus: List<ProcessingUnit>, public val memory: List<MemoryUnit>)
+public data class SimMachineModel(public val cpus: List<SimProcessingUnit>, public val memory: List<SimMemoryUnit>)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 778b68ca..751873a5 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -26,26 +26,26 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.*
import java.time.Clock
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
-import kotlin.math.min
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*
* @param listener The hypervisor listener to use.
*/
-public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor {
+public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> {
/**
* The execution context in which the hypervisor runs.
*/
- private lateinit var ctx: SimExecutionContext
+ private lateinit var ctx: SimMachineContext
/**
* The mapping from pCPU to vCPU.
@@ -67,18 +67,36 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
return SimVm(model, performanceInterferenceModel)
}
- override fun onStart(ctx: SimExecutionContext) {
+ override fun onStart(ctx: SimMachineContext) {
this.ctx = ctx
- this.vcpus = arrayOfNulls(ctx.machine.cpus.size)
- this.availableCpus.addAll(ctx.machine.cpus.indices)
+ this.vcpus = arrayOfNulls(ctx.cpus.size)
+ this.availableCpus.addAll(ctx.cpus.indices)
}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return onNext(ctx, cpu, 0.0)
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return this
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle()
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ return onNext(ctx, 0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ val vcpu = vcpus[ctx.resource.id] ?: return SimResourceCommand.Idle()
+
+ if (vcpu.isStarted) {
+ vcpu.remainingWork = remainingWork
+ vcpu.flush()
+ } else {
+ vcpu.isStarted = true
+ vcpu.start()
+ }
+
+ if (vcpu.hasExited && vcpu != vcpus[ctx.resource.id]) {
+ return onNext(ctx, remainingWork)
+ }
+
+ return vcpu.activeCommand
}
/**
@@ -117,36 +135,46 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
private var cpus: List<VCpu> = emptyList()
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The execution context in which the workload runs.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
+ inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ override val cpus: List<SimProcessingUnit>
+ get() = model.cpus
- override val clock: Clock
- get() = this@SimSpaceSharedHypervisor.ctx.clock
+ override val memory: List<SimMemoryUnit>
+ get() = model.memory
- override val meta: Map<String, Any>
- get() = meta
+ override val clock: Clock
+ get() = this@SimSpaceSharedHypervisor.ctx.clock
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
+ override fun interrupt(resource: SimResource) {
+ TODO()
}
+ }
+
+ lateinit var ctx: SimMachineContext
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
+
+ ctx = Context(meta)
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) }
+ try {
+ this.cpus = model.cpus.map { model -> VCpu(this, model, workload.getConsumer(ctx, model), ctx.clock) }
- for (cpu in cpus) {
- cpu.start()
+ for ((index, pCPU) in pCPUs.withIndex()) {
+ vcpus[pCPU] = cpus[index]
+ this@SimSpaceSharedHypervisor.ctx.interrupt(this@SimSpaceSharedHypervisor.ctx.cpus[pCPU])
+ }
+ } catch (e: Throwable) {
+ cont.resumeWithException(e)
}
}
}
@@ -157,19 +185,23 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
vcpus[pCPU] = null
availableCpus.add(pCPU)
}
+
+ val cont = cont
+ this.cont = null
+ cont?.resume(Unit)
}
/**
* Update the usage of the VM.
*/
fun updateUsage() {
- usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency }
+ usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.resource.frequency }
}
/**
* This method is invoked when one of the CPUs has exited.
*/
- fun onCpuExit(cpu: Int) {
+ fun onCpuExit() {
// Check whether all other CPUs have finished
if (cpus.all { it.hasExited }) {
val cont = cont
@@ -193,7 +225,22 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
/**
* A CPU of the virtual machine.
*/
- private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
+ private inner class VCpu(
+ val vm: SimVm,
+ resource: SimProcessingUnit,
+ consumer: SimResourceConsumer<SimProcessingUnit>,
+ clock: Clock
+ ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer) {
+ /**
+ * Indicates that the vCPU was started.
+ */
+ var isStarted: Boolean = false
+
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
/**
* The processing speed of the vCPU.
*/
@@ -204,81 +251,41 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
}
/**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
- /**
- * A flag to indicate that the CPU was started.
+ * The amount of work remaining from the previous consumption.
*/
- var hasStarted: Boolean = false
+ var remainingWork: Double = 0.0
/**
- * Process the specified [SimResourceCommand] for this CPU.
+ * A flag to indicate that the CPU has exited.
*/
- fun process(command: SimResourceCommand): SimResourceCommand {
- return when (command) {
- is SimResourceCommand.Idle -> {
- speed = 0.0
- command
- }
- is SimResourceCommand.Consume -> {
- speed = min(model.frequency, command.limit)
- command
- }
- is SimResourceCommand.Exit -> {
- speed = 0.0
- hasExited = true
-
- vm.onCpuExit(model.id)
-
- SimResourceCommand.Idle()
- }
- }
- }
+ var hasExited: Boolean = false
- /**
- * Start the CPU.
- */
- fun start() {
- vcpus[pCPU] = this
- interrupt()
+ override fun onIdle(deadline: Long) {
+ speed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
}
- /**
- * Request the workload for more work.
- */
- fun next(remainingWork: Double): SimResourceCommand {
- return try {
- val command =
- if (hasStarted) {
- workload.onNext(ctx, model.id, remainingWork)
- } else {
- hasStarted = true
- workload.onStart(ctx, model.id)
- }
- process(command)
- } catch (e: Throwable) {
- fail(e)
- }
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ speed = getSpeed(limit)
+ activeCommand = SimResourceCommand.Consume(work, speed, deadline)
}
- /**
- * Interrupt the CPU.
- */
- fun interrupt() {
- this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU)
+ override fun onFinish() {
+ speed = 0.0
+ hasExited = true
+ activeCommand = SimResourceCommand.Idle()
+ vm.onCpuExit()
}
- /**
- * Fail the CPU.
- */
- fun fail(e: Throwable): SimResourceCommand {
+ override fun onFailure(cause: Throwable) {
+ speed = 0.0
hasExited = true
+ activeCommand = SimResourceCommand.Idle()
+ vm.onCpuFailure(cause)
+ }
- vm.onCpuFailure(e)
-
- return SimResourceCommand.Idle()
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ return remainingWork
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt
index bcbde5b1..49745868 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.model
+import org.opendc.simulator.resources.SimResource
+
/**
* A memory unit of a compute resource, either virtual or physical.
*
@@ -30,9 +32,12 @@ package org.opendc.simulator.compute.model
* @property speed The access speed of the memory in MHz.
* @property size The size of the memory unit in MBs.
*/
-public data class MemoryUnit(
+public data class SimMemoryUnit(
public val vendor: String,
public val modelName: String,
public val speed: Double,
public val size: Long
-)
+) : SimResource {
+ override val capacity: Double
+ get() = speed
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt
index 58ed816c..4022ecb3 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt
@@ -30,7 +30,7 @@ package org.opendc.simulator.compute.model
* @property arch The micro-architecture of the processor node.
* @property coreCount The number of logical CPUs in the processor node.
*/
-public data class ProcessingNode(
+public data class SimProcessingNode(
public val vendor: String,
public val arch: String,
public val modelName: String,
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt
index 415e95e6..1c989254 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.model
+import org.opendc.simulator.resources.SimResource
+
/**
* A single logical compute unit of processor node, either virtual or physical.
*
@@ -29,8 +31,11 @@ package org.opendc.simulator.compute.model
* @property id The identifier of the CPU core within the processing node.
* @property frequency The clock rate of the CPU in MHz.
*/
-public data class ProcessingUnit(
- public val node: ProcessingNode,
+public data class SimProcessingUnit(
+ public val node: SimProcessingNode,
public val id: Int,
public val frequency: Double
-)
+) : SimResource {
+ override val capacity: Double
+ get() = frequency
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index c22fcc07..9b47821e 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -22,7 +22,11 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
/**
* A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
@@ -36,31 +40,35 @@ public class SimFlopsWorkload(
public val utilization: Double = 0.8
) : SimWorkload {
init {
- require(flops >= 0) { "Negative number of flops" }
+ require(flops >= 0) { "Number of FLOPs must be positive" }
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimExecutionContext) {}
+ override fun onStart(ctx: SimMachineContext) {}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- val cores = ctx.machine.cpus.size
- val limit = ctx.machine.cpus[cpu].frequency * utilization
- val work = flops.toDouble() / cores
-
- return if (work > 0.0) {
- SimResourceCommand.Consume(work, limit)
- } else {
- SimResourceCommand.Exit
- }
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return CpuConsumer(ctx)
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.machine.cpus[cpu].frequency * utilization
+ private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ val limit = ctx.resource.frequency * utilization
+ val work = flops.toDouble() / machine.cpus.size
+
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
- return SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ return if (remainingWork > 0.0) {
+ val limit = ctx.resource.frequency * utilization
+ return SimResourceCommand.Consume(remainingWork, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 00ebebce..313b6ed5 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -22,7 +22,11 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
/**
* A [SimWorkload] that models application execution as a single duration.
@@ -39,20 +43,26 @@ public class SimRuntimeWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimExecutionContext) {}
+ override fun onStart(ctx: SimMachineContext) {}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- val limit = ctx.machine.cpus[cpu].frequency * utilization
- val work = (limit / 1000) * duration
- return SimResourceCommand.Consume(work, limit)
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return CpuConsumer()
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.machine.cpus[cpu].frequency * utilization
- SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
+ private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ val limit = ctx.resource.frequency * utilization
+ val work = (limit / 1000) * duration
+ return SimResourceCommand.Consume(work, limit)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ return if (remainingWork > 0.0) {
+ val limit = ctx.resource.frequency * utilization
+ SimResourceCommand.Consume(remainingWork, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index deb10b98..edef3843 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -22,7 +22,11 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -34,36 +38,42 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
private var fragment: Fragment? = null
private lateinit var barrier: SimWorkloadBarrier
- override fun onStart(ctx: SimExecutionContext) {
- barrier = SimWorkloadBarrier(ctx.machine.cpus.size)
+ override fun onStart(ctx: SimMachineContext) {
+ barrier = SimWorkloadBarrier(ctx.cpus.size)
fragment = nextFragment()
offset = ctx.clock.millis()
}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return onNext(ctx, cpu, 0.0)
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return CpuConsumer()
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val work = (fragment.duration / 1000) * fragment.usage
- val deadline = offset + fragment.duration
+ private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ return onNext(ctx, 0.0)
+ }
- assert(deadline >= now) { "Deadline already passed" }
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ val now = ctx.clock.millis()
+ val fragment = fragment ?: return SimResourceCommand.Exit
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = offset + fragment.duration
- val cmd =
- if (cpu < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, fragment.usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ assert(deadline >= now) { "Deadline already passed" }
- if (barrier.enter()) {
- this.fragment = nextFragment()
- this.offset += fragment.duration
- }
+ val cmd =
+ if (ctx.resource.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
- return cmd
+ if (barrier.enter()) {
+ this@SimTraceWorkload.fragment = nextFragment()
+ this@SimTraceWorkload.offset += fragment.duration
+ }
+
+ return cmd
+ }
}
override fun toString(): String = "SimTraceWorkload"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index 6fc78d56..60661e23 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -22,7 +22,9 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceConsumer
/**
* A model that characterizes the runtime behavior of some particular workload.
@@ -32,27 +34,12 @@ import org.opendc.simulator.compute.SimExecutionContext
*/
public interface SimWorkload {
/**
- * This method is invoked when the workload is started, before the (virtual) CPUs assigned to the workload will
- * start.
+ * This method is invoked when the workload is started.
*/
- public fun onStart(ctx: SimExecutionContext)
+ public fun onStart(ctx: SimMachineContext)
/**
- * This method is invoked when a (virtual) CPU assigned to the workload has started.
- *
- * @param ctx The execution context in which the workload runs.
- * @param cpu The index of the (virtual) CPU to start.
- * @return The command to perform on the CPU.
+ * Obtain the resource consumer for the specified processing unit.
*/
- public fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand
-
- /**
- * This method is invoked when a (virtual) CPU assigned to the workload was interrupted or reached its deadline.
- *
- * @param ctx The execution context in which the workload runs.
- * @param cpu The index of the (virtual) CPU to obtain the resource consumption of.
- * @param remainingWork The remaining work that was not yet completed.
- * @return The next command to perform on the CPU.
- */
- public fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand
+ public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit>
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index b8eee4f0..4b4d7eca 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -30,9 +30,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.time.Clock
@@ -51,10 +51,10 @@ internal class SimHypervisorTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 1036f1ac..00efba53 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -29,14 +29,10 @@ import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.simulator.compute.workload.SimResourceCommand
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
/**
@@ -48,11 +44,11 @@ class SimMachineTest {
@BeforeEach
fun setUp() {
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
@@ -86,74 +82,4 @@ class SimMachineTest {
assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" }
}
}
-
- @Test
- fun testInterrupt() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
-
- val workload = object : SimWorkload {
- override fun onStart(ctx: SimExecutionContext) {}
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- ctx.interrupt(cpu)
- return SimResourceCommand.Exit
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
-
- assertDoesNotThrow {
- testScope.runBlockingTest { machine.run(workload) }
- }
- }
-
- @Test
- fun testExceptionPropagationOnStart() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
-
- val workload = object : SimWorkload {
- override fun onStart(ctx: SimExecutionContext) {}
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- throw IllegalStateException()
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
-
- assertThrows<IllegalStateException> {
- testScope.runBlockingTest { machine.run(workload) }
- }
- }
-
- @Test
- fun testExceptionPropagationOnNext() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
-
- val workload = object : SimWorkload {
- override fun onStart(ctx: SimExecutionContext) {}
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
-
- assertThrows<IllegalStateException> {
- testScope.runBlockingTest { machine.run(workload) }
- }
- }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
index 1a9faf11..583d989c 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -31,9 +31,10 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
@@ -53,10 +54,10 @@ internal class SimSpaceSharedHypervisorTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
@@ -126,6 +127,56 @@ internal class SimSpaceSharedHypervisorTest {
}
/**
+ * Test FLOPs workload on hypervisor.
+ */
+ @Test
+ fun testFlopsWorkload() {
+ val duration = 5 * 60L * 1000
+ val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch { hypervisor.createMachine(machineModel).run(workload) }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertEquals(duration, scope.currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() {
+ val duration = 5 * 60L * 1000
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch {
+ val vm = hypervisor.createMachine(machineModel)
+ vm.run(SimRuntimeWorkload(duration))
+ vm.close()
+
+ val vm2 = hypervisor.createMachine(machineModel)
+ vm2.run(SimRuntimeWorkload(duration))
+ }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertEquals(duration * 2, scope.currentTime) { "Took enough time" }
+ }
+
+ /**
* Test concurrent workloads on the machine.
*/
@Test
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts
new file mode 100644
index 00000000..831ca3db
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Uniform resource consumption simulation model"
+
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(project(":opendc-platform")))
+ api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
+ implementation(project(":opendc-utils"))
+
+ testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
new file mode 100644
index 00000000..f9da74c7
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -0,0 +1,255 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ */
+public abstract class SimAbstractResourceContext<R : SimResource>(
+ override val resource: R,
+ override val clock: Clock,
+ private val consumer: SimResourceConsumer<R>
+) : SimResourceContext<R> {
+ /**
+ * This method is invoked when the resource will idle until the specified [deadline].
+ */
+ public abstract fun onIdle(deadline: Long)
+
+ /**
+ * This method is invoked when the resource will be consumed until the specified [work] was processed or the
+ * [deadline] was reached.
+ */
+ public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public abstract fun onFinish()
+
+ /**
+ * This method is invoked when the resource consumer throws an exception.
+ */
+ public abstract fun onFailure(cause: Throwable)
+
+ /**
+ * Compute the duration that a resource consumption will take with the specified [speed].
+ */
+ protected open fun getDuration(work: Double, speed: Double): Long {
+ return ceil(work / speed * 1000).toLong()
+ }
+
+ /**
+ * Compute the speed at which the resource may be consumed.
+ */
+ protected open fun getSpeed(limit: Double): Double {
+ return min(limit, resource.capacity)
+ }
+
+ /**
+ * Get the remaining work to process after a resource consumption was flushed.
+ *
+ * @param work The size of the resource consumption.
+ * @param speed The speed of consumption.
+ * @param duration The duration from the start of the consumption until now.
+ * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to
+ * it being interrupted before it could finish or reach its deadline.
+ * @return The amount of work remaining.
+ */
+ protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ return if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ /**
+ * Start the consumer.
+ */
+ public fun start() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ interpret(consumer.onStart(this))
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Immediately stop the consumer.
+ */
+ public fun stop() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ flush(isIntermediate = true)
+ onFinish()
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public open fun flush(isIntermediate: Boolean = false) {
+ val now = clock.millis()
+
+ // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
+ if (isIntermediate && latestFlush >= now) {
+ return
+ }
+
+ try {
+ val (timestamp, command) = activeCommand ?: return
+
+ isProcessing = true
+ activeCommand = null
+
+ val duration = now - timestamp
+ assert(duration >= 0) { "Flush in the past" }
+
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ // We should only continue processing the next command if:
+ // 1. The resource consumer reached its deadline.
+ // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (command.deadline <= now || !isIntermediate) {
+ next(remainingWork = 0.0)
+ }
+ }
+ is SimResourceCommand.Consume -> {
+ val speed = min(resource.capacity, command.limit)
+ val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed)
+ val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted)
+
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource consumer reached its deadline.
+ // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
+ next(remainingWork)
+ } else {
+ interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
+ }
+ }
+ SimResourceCommand.Exit ->
+ // Flush may not be called when the resource consumer has finished
+ throw IllegalStateException()
+ }
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ latestFlush = now
+ isProcessing = false
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun toString(): String = "SimAbstractResourceContext[resource=$resource]"
+
+ /**
+ * A flag to indicate that the resource is currently processing a command.
+ */
+ protected var isProcessing: Boolean = false
+
+ /**
+ * The current command that is being processed.
+ */
+ private var activeCommand: CommandWrapper? = null
+
+ /**
+ * The latest timestamp at which the resource was flushed.
+ */
+ private var latestFlush: Long = Long.MIN_VALUE
+
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand) {
+ val now = clock.millis()
+
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ onIdle(deadline)
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ onConsume(work, limit, deadline)
+ }
+ is SimResourceCommand.Exit -> {
+ onFinish()
+ }
+ }
+
+ assert(activeCommand == null) { "Concurrent access to current command" }
+ activeCommand = CommandWrapper(now, command)
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ private fun next(remainingWork: Double) {
+ interpret(consumer.onNext(this, remainingWork))
+ }
+
+ /**
+ * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
+ */
+ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
new file mode 100644
index 00000000..31b0a175
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A generic representation of resource that may be consumed.
+ */
+public interface SimResource {
+ /**
+ * The capacity of the resource.
+ */
+ public val capacity: Double
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
index 41a5028e..77c0a7a9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -20,14 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload
+package org.opendc.simulator.resources
/**
- * A command that is sent to the host machine.
+ * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer].
*/
public sealed class SimResourceCommand {
/**
- * A request to the host to process the specified amount of [work] on a vCPU before the specified [deadline].
+ * A request to the resource to perform the specified amount of work before the given [deadline].
*
* @param work The amount of work to process on the CPU.
* @param limit The maximum amount of work to be processed per second.
@@ -35,18 +35,18 @@ public sealed class SimResourceCommand {
*/
public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() {
init {
- require(work > 0) { "The amount of work must be positive." }
- require(limit > 0) { "Limit must be positive." }
+ require(work > 0) { "Amount of work must be positive" }
+ require(limit > 0) { "Limit must be positive" }
}
}
/**
- * An indication to the host that the vCPU will idle until the specified [deadline] or is interrupted.
+ * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted.
*/
public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand()
/**
- * An indication to the host that the vCPU has finished processing.
+ * An indication to the resource that the consumer has finished.
*/
public object Exit : SimResourceCommand()
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
new file mode 100644
index 00000000..f516faa6
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A SimResourceConsumer characterizes how a [SimResource] is consumed.
+ */
+public interface SimResourceConsumer<in R : SimResource> {
+ /**
+ * This method is invoked when the consumer is started for a resource.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @return The next command that the resource should perform.
+ */
+ public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand
+
+ /**
+ * This method is invoked when a resource was either interrupted or reached its deadline.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param remainingWork The remaining work that was not yet completed.
+ * @return The next command that the resource should perform.
+ */
+ public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
new file mode 100644
index 00000000..dfb5e9ce
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
+ * resource and a resource consumer.
+ */
+public interface SimResourceContext<out R : SimResource> {
+ /**
+ * The resource that is managed by this context.
+ */
+ public val resource: R
+
+ /**
+ * The virtual clock tracking simulation time.
+ */
+ public val clock: Clock
+
+ /**
+ * Ask the resource provider to interrupt its resource.
+ */
+ public fun interrupt()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
new file mode 100644
index 00000000..91a745ab
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceProvider] provides some resource of type [R].
+ */
+public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
+ /**
+ * The resource that is managed by this provider.
+ */
+ public val resource: R
+
+ /**
+ * Consume the resource provided by this provider using the specified [consumer].
+ */
+ public suspend fun consume(consumer: SimResourceConsumer<R>)
+
+ /**
+ * End the lifetime of the resource.
+ *
+ * This operation terminates the existing resource consumer.
+ */
+ public override fun close()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
new file mode 100644
index 00000000..4445df86
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.min
+
+/**
+ * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
+ *
+ * @param resource The resource to provide.
+ * @param clock The virtual clock to track simulation time.
+ */
+public class SimResourceSource<R : SimResource>(
+ override val resource: R,
+ private val clock: Clock,
+ private val scheduler: TimerScheduler<Any>
+) : SimResourceProvider<R> {
+ /**
+ * The resource processing speed over time.
+ */
+ public val speed: StateFlow<Double>
+ get() = _speed
+ private val _speed = MutableStateFlow(0.0)
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) {
+ check(!isClosed) { "Lifetime of resource has ended." }
+ check(cont == null) { "Run should not be called concurrently" }
+
+ try {
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ val ctx = Context(consumer, cont)
+ ctx.start()
+ cont.invokeOnCancellation {
+ ctx.stop()
+ }
+ }
+ } finally {
+ cont = null
+ }
+ }
+
+ override fun close() {
+ isClosed = true
+ cont?.cancel()
+ cont = null
+ }
+
+ /**
+ * A flag to indicate that the resource was closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The current active consumer.
+ */
+ private var cont: CancellableContinuation<Unit>? = null
+
+ /**
+ * Internal implementation of [SimResourceContext] for this class.
+ */
+ private inner class Context(
+ consumer: SimResourceConsumer<R>,
+ val cont: Continuation<Unit>
+ ) : SimAbstractResourceContext<R>(resource, clock, consumer) {
+ /**
+ * The processing speed of the resource.
+ */
+ private var speed: Double = 0.0
+ set(value) {
+ field = value
+ _speed.value = field
+ }
+
+ override fun onIdle(deadline: Long) {
+ speed = 0.0
+
+ // Do not resume if deadline is "infinite"
+ if (deadline != Long.MAX_VALUE) {
+ scheduler.startSingleTimerTo(this, deadline) { flush() }
+ }
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ speed = getSpeed(limit)
+ val until = min(deadline, clock.millis() + getDuration(work, speed))
+
+ scheduler.startSingleTimerTo(this, until) { flush() }
+ }
+
+ override fun onFinish() {
+ speed = 0.0
+ scheduler.cancel(this)
+ cont.resume(Unit)
+ }
+
+ override fun onFailure(cause: Throwable) {
+ speed = 0.0
+ scheduler.cancel(this)
+ cont.resumeWithException(cause)
+ }
+
+ override fun toString(): String = "SimResourceSource.Context[resource=$resource]"
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
new file mode 100644
index 00000000..02d456ff
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for [SimResourceCommand].
+ */
+class SimResourceCommandTest {
+ @Test
+ fun testZeroWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(0.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testNegativeWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(-1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testZeroLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, 0.0)
+ }
+ }
+
+ @Test
+ fun testNegativeLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, -1.0, 1)
+ }
+ }
+
+ @Test
+ fun testConsumeCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Consume(1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testIdleCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Idle(1)
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
new file mode 100644
index 00000000..8b380efb
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -0,0 +1,285 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+
+/**
+ * A test suite for the [SimResourceScheduler] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceSourceTest {
+
+ private lateinit var scope: TestCoroutineScope
+ private lateinit var clock: Clock
+
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @BeforeEach
+ fun setUp() {
+ scope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(scope)
+ }
+
+ @Test
+ fun testSpeed() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ scope.runBlockingTest {
+ val res = mutableListOf<Double>()
+ val job = launch { provider.speed.toList(res) }
+
+ provider.consume(consumer)
+
+ job.cancel()
+ assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" }
+ }
+ }
+
+ @Test
+ fun testSpeedLimit() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ scope.runBlockingTest {
+ val res = mutableListOf<Double>()
+ val job = launch { provider.speed.toList(res) }
+
+ provider.consume(consumer)
+
+ job.cancel()
+ assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" }
+ }
+ }
+
+ @Test
+ fun testInterrupt() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ ctx.interrupt()
+ return SimResourceCommand.Exit
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertDoesNotThrow {
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testFailure() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ throw IllegalStateException()
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest { provider.consume(consumer) }
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ launch { provider.consume(consumer) }
+ launch { provider.consume(consumer) }
+ }
+ }
+ }
+
+ @Test
+ fun testClosedConsumption() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ provider.close()
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testCloseDuringConsumption() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ scope.runBlockingTest {
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.close()
+ }
+
+ assertEquals(500, scope.currentTime)
+ }
+
+ @Test
+ fun testIdle() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle(ctx.clock.millis() + 500)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+
+ assertEquals(500, scope.currentTime)
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle()
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index ff116443..bb6f3299 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -51,7 +51,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
private val timers = mutableMapOf<T, Timer>()
/**
- * The channel to communicate with the
+ * The channel to communicate with the scheduling job.
*/
private val channel = Channel<Long?>(Channel.CONFLATED)
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index e87dd4d8..0e5a2711 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -32,6 +32,7 @@ include(":opendc-experiments:opendc-experiments-sc18")
include(":opendc-experiments:opendc-experiments-capelin")
include(":opendc-runner-web")
include(":opendc-simulator:opendc-simulator-core")
+include(":opendc-simulator:opendc-simulator-resources")
include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
include(":opendc-trace:opendc-trace-core")