summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-12-30 14:03:12 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-16 12:06:40 +0100
commit6a2a5423479696e8dc28885be27cc3e3252f28b0 (patch)
treee23dd1d7ab3a15969da5f7e02baf24a9434b9912
parentdf2f52780c08c5d108741d3746eaf03222c64841 (diff)
simulator: Add generic framework for resource consumption modeling
This change adds a generic framework for modeling resource consumptions and adapts opendc-simulator-compute to model machines and VMs on top of this framework. This framework anticipates the addition of additional resource types such as memory, disk and network to the OpenDC codebase.
-rw-r--r--simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts26
-rw-r--r--simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt12
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt16
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt12
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt16
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts1
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt276
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt263
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt)25
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt199
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt)9
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt)2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt)11
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt46
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt34
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt52
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt27
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt86
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt63
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts37
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt255
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt33
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt)14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt45
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt46
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt45
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt133
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt74
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt285
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt2
-rw-r--r--simulator/settings.gradle.kts1
36 files changed, 1436 insertions, 747 deletions
diff --git a/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts
index 544e34bf..e0bc2ce4 100644
--- a/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts
+++ b/simulator/buildSrc/src/main/kotlin/jacoco-conventions.gradle.kts
@@ -1,29 +1,3 @@
-import org.gradle.kotlin.dsl.`java-library`
-import org.gradle.kotlin.dsl.jacoco
-import org.gradle.kotlin.dsl.kotlin
-
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
/*
* Copyright (c) 2021 AtLarge Research
*
diff --git a/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts b/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts
index 8d6420be..ab13215b 100644
--- a/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts
+++ b/simulator/buildSrc/src/main/kotlin/kotlin-library-conventions.gradle.kts
@@ -46,5 +46,6 @@ kotlin {
tasks.withType<KotlinCompile>().configureEach {
kotlinOptions.jvmTarget = Versions.jvmTarget.toString()
+ kotlinOptions.useIR = true
kotlinOptions.freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 19fa3e97..0da81152 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -35,7 +35,7 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
import org.opendc.simulator.failures.FailureDomain
import org.opendc.utils.flow.EventFlow
import java.time.Clock
@@ -216,7 +216,7 @@ public class SimHost(
val originalCpu = machine.model.cpus[0]
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+ val memoryUnits = listOf(SimMemoryUnit("Generic", "Generic", 3200.0, memorySize))
return SimMachineModel(processingUnits, memoryUnits)
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e1a1d87e..a45ab9fc 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -40,9 +40,9 @@ import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.service.driver.HostEvent
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.time.Clock
@@ -62,11 +62,11 @@ internal class SimHostTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 3da8d0b3..85a2e413 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.io.InputStream
import java.util.*
@@ -61,12 +61,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
val cores = machine.cpus.flatMap { id ->
when (id) {
1 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) }
}
2 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) }
}
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
@@ -75,7 +75,7 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
UUID(0L, counter++.toLong()),
"node-$counter",
emptyMap(),
- SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
+ SimMachineModel(cores, listOf(SimMemoryUnit("", "", 2300.0, 16000))),
ConstantPowerModel(0.0)
)
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 9a06a40f..094bc975 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -26,9 +26,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
@@ -88,8 +88,8 @@ public class Sc20ClusterEnvironmentReader(
memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L
coresPerHost = values[coresPerHostCol].trim().toInt()
- val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost)
- val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
+ val unknownProcessingNode = SimProcessingNode("unknown", "unknown", "unknown", coresPerHost)
+ val unknownMemoryUnit = SimMemoryUnit("unknown", "unknown", -1.0, memoryPerHost)
repeat(numberOfHosts) {
nodes.add(
@@ -99,7 +99,7 @@ public class Sc20ClusterEnvironmentReader(
mapOf("cluster" to clusterId),
SimMachineModel(
List(coresPerHost) { coreId ->
- ProcessingUnit(unknownProcessingNode, coreId, speed)
+ SimProcessingUnit(unknownProcessingNode, coreId, speed)
},
listOf(unknownMemoryUnit)
),
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index effd0286..87a49f49 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -29,9 +29,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.io.InputStream
import java.util.*
@@ -60,19 +60,19 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
val cores = machine.cpus.flatMap { id ->
when (id) {
1 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
- List(node.coreCount) { ProcessingUnit(node, it, 4100.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4)
+ List(node.coreCount) { SimProcessingUnit(node, it, 4100.0) }
}
2 -> {
- val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
- List(node.coreCount) { ProcessingUnit(node, it, 3500.0) }
+ val node = SimProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2)
+ List(node.coreCount) { SimProcessingUnit(node, it, 3500.0) }
}
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
val memories = machine.memories.map { id ->
when (id) {
- 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
+ 1 -> SimMemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L)
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
index e7e99a3d..0ff40a28 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -34,9 +34,9 @@ import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import java.util.*
/**
@@ -56,13 +56,13 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
val cores = cpu.getInteger("numberOfCores")
val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
// TODO Remove hardcoding of vendor
- val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
+ val node = SimProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
List(cores) { coreId ->
- ProcessingUnit(node, coreId, speed)
+ SimProcessingUnit(node, coreId, speed)
}
}
val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
- MemoryUnit(
+ SimMemoryUnit(
"Samsung",
memory.getString("name"),
memory.get("speedMbPerS", Number::class.java).toDouble(),
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index 19af6fe8..66d7d9e5 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/simulator/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -30,5 +30,6 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-simulator:opendc-simulator-core"))
+ api(project(":opendc-simulator:opendc-simulator-resources"))
implementation(project(":opendc-utils"))
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index f74c5697..b1d1c0b7 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -25,16 +25,16 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.*
import org.opendc.utils.TimerScheduler
import java.time.Clock
import java.util.*
import kotlin.coroutines.*
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
* A simulated bare-metal machine that is able to run a single workload.
@@ -52,11 +52,9 @@ public class SimBareMetalMachine(
private val clock: Clock,
override val model: SimMachineModel
) : SimMachine {
- /**
- * A [StateFlow] representing the CPU usage of the simulated machine.
- */
+ private val _usage = MutableStateFlow(0.0)
override val usage: StateFlow<Double>
- get() = usageState
+ get() = _usage
/**
* A flag to indicate that the machine is terminated.
@@ -64,249 +62,63 @@ public class SimBareMetalMachine(
private var isTerminated = false
/**
- * The [MutableStateFlow] containing the load of the server.
- */
- private val usageState = MutableStateFlow(0.0)
-
- /**
- * The current active workload.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
- * The active CPUs of this machine.
- */
- private var cpus: List<Cpu> = emptyList()
-
- /**
* The [TimerScheduler] to use for scheduling the interrupts.
*/
- private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock)
+ private val scheduler = TimerScheduler<Any>(coroutineScope, clock)
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The execution context in which the workload runs.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = this@SimBareMetalMachine.model
-
- override val clock: Clock
- get() = this@SimBareMetalMachine.clock
-
- override val meta: Map<String, Any>
- get() = meta
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- workload.onStart(ctx)
+ private inner class Context(val map: Map<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>,
+ override val meta: Map<String, Any>) : SimMachineContext {
+ override val clock: Clock
+ get() = this@SimBareMetalMachine.clock
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
- this.cpus = model.cpus.map { Cpu(ctx, it, workload) }
+ override val cpus: List<SimProcessingUnit> = model.cpus
- for (cpu in cpus) {
- cpu.start()
- }
- }
- }
+ override val memory: List<SimMemoryUnit> = model.memory
- /**
- * Terminate the specified bare-metal machine.
- */
- override fun close() {
- isTerminated = true
- }
-
- /**
- * Update the usage of the machine.
- */
- private fun updateUsage() {
- usageState.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency }
- }
-
- /**
- * This method is invoked when one of the CPUs has exited.
- */
- private fun onCpuExit(cpu: Int) {
- // Check whether all other CPUs have finished
- if (cpus.all { it.hasExited }) {
- val cont = cont
- this.cont = null
- cont?.resume(Unit)
+ override fun interrupt(resource: SimResource) {
+ val context = map[resource]
+ checkNotNull(context) { "Invalid resource" }
+ context.interrupt()
}
}
/**
- * This method is invoked when one of the CPUs failed.
- */
- private fun onCpuFailure(e: Throwable) {
- // Make sure no other tasks will be resumed.
- scheduler.cancelAll()
-
- // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
- // tasks.
- val cont = cont
- this.cont = null
- cont?.resumeWithException(e)
- }
-
- /**
- * A physical CPU of the machine.
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) {
- /**
- * The current command.
- */
- private var currentCommand: CommandWrapper? = null
-
- /**
- * The actual processing speed.
- */
- var speed: Double = 0.0
- set(value) {
- field = value
- updateUsage()
- }
-
- /**
- * A flag to indicate that the CPU is currently processing a command.
- */
- var isIntermediate: Boolean = false
-
- /**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
- /**
- * Process the specified [SimResourceCommand] for this CPU.
- */
- fun process(command: SimResourceCommand) {
- val timestamp = clock.millis()
-
- val task = when (command) {
- is SimResourceCommand.Idle -> {
- speed = 0.0
-
- val deadline = command.deadline
-
- require(deadline >= timestamp) { "Deadline already passed" }
-
- if (deadline != Long.MAX_VALUE) {
- scheduler.startSingleTimerTo(this, deadline) { flush() }
- } else {
- null
- }
- }
- is SimResourceCommand.Consume -> {
- val work = command.work
- val limit = command.limit
- val deadline = command.deadline
-
- require(deadline >= timestamp) { "Deadline already passed" }
-
- speed = min(model.frequency, limit)
-
- // The required duration to process all the work
- val finishedAt = timestamp + ceil(work / speed * 1000).toLong()
-
- scheduler.startSingleTimerTo(this, min(finishedAt, deadline)) { flush() }
- }
- is SimResourceCommand.Exit -> {
- speed = 0.0
- hasExited = true
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = coroutineScope {
+ require(!isTerminated) { "Machine is terminated" }
+ val map = mutableMapOf<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>()
+ val ctx = Context(map, meta)
+ val sources = model.cpus.map { SimResourceSource(it, clock, scheduler) }
+ val totalCapacity = model.cpus.sumByDouble { it.frequency }
- onCpuExit(model.id)
+ workload.onStart(ctx)
- null
+ for (source in sources) {
+ val consumer = workload.getConsumer(ctx, source.resource)
+ val job = source.speed
+ .onEach {
+ _usage.value = sources.sumByDouble { it.speed.value } / totalCapacity
}
- }
-
- assert(currentCommand == null) { "Concurrent access to current command" }
- currentCommand = CommandWrapper(timestamp, command)
- }
+ .launchIn(this)
- /**
- * Request the workload for more work.
- */
- private fun next(remainingWork: Double) {
- process(workload.onNext(ctx, model.id, remainingWork))
- }
-
- /**
- * Start the CPU.
- */
- fun start() {
- try {
- isIntermediate = true
-
- process(workload.onStart(ctx, model.id))
- } catch (e: Throwable) {
- onCpuFailure(e)
- } finally {
- isIntermediate = false
- }
- }
-
- /**
- * Flush the work performed by the CPU.
- */
- fun flush() {
- try {
- val (timestamp, command) = currentCommand ?: return
-
- isIntermediate = true
- currentCommand = null
-
- // Cancel the running task and flush the progress
- scheduler.cancel(this)
-
- when (command) {
- is SimResourceCommand.Idle -> next(remainingWork = 0.0)
- is SimResourceCommand.Consume -> {
- val duration = clock.millis() - timestamp
- val remainingWork = if (duration > 0L) {
- val processed = duration / 1000.0 * speed
- max(0.0, command.work - processed)
- } else {
- 0.0
- }
-
- next(remainingWork)
+ launch {
+ source.consume(object : SimResourceConsumer<SimProcessingUnit> by consumer {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ map[ctx.resource] = ctx
+ return consumer.onStart(ctx)
}
- SimResourceCommand.Exit -> throw IllegalStateException()
- }
- } catch (e: Throwable) {
- onCpuFailure(e)
- } finally {
- isIntermediate = false
+ })
+ job.cancel()
}
}
-
- /**
- * Interrupt the CPU.
- */
- fun interrupt() {
- // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
- // to infinite recursion.
- if (isIntermediate) {
- return
- }
-
- flush()
- }
}
- /**
- * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
- */
- private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
+ override fun close() {
+ isTerminated = true
+ scheduler.close()
+ }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index bf6d8a5e..12b3b428 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -26,10 +26,11 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.compute.workload.SimWorkloadBarrier
+import org.opendc.simulator.resources.*
import java.time.Clock
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
@@ -44,22 +45,22 @@ import kotlin.math.min
*
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor {
+public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> {
- override fun onStart(ctx: SimExecutionContext) {
- val model = ctx.machine
+ override fun onStart(ctx: SimMachineContext) {
this.ctx = ctx
- this.commands = Array(model.cpus.size) { SimResourceCommand.Idle() }
- this.pCpus = model.cpus.indices.sortedBy { model.cpus[it].frequency }.toIntArray()
- this.maxUsage = model.cpus.sumByDouble { it.frequency }
- this.barrier = SimWorkloadBarrier(model.cpus.size)
+ this.commands = Array(ctx.cpus.size) { SimResourceCommand.Idle() }
+ this.pCpus = ctx.cpus.indices.sortedBy { ctx.cpus[it].frequency }.toIntArray()
+ this.maxUsage = ctx.cpus.sumByDouble { it.frequency }
+ this.barrier = SimWorkloadBarrier(ctx.cpus.size)
}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return commands[cpu]
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return this
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ val cpu = ctx.resource.id
totalRemainingWork += remainingWork
val isLast = barrier.enter()
@@ -82,6 +83,10 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
}
}
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ return commands[ctx.resource.id]
+ }
+
override fun canFit(model: SimMachineModel): Boolean = true
override fun createMachine(
@@ -92,7 +97,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
/**
* The execution context in which the hypervisor runs.
*/
- private lateinit var ctx: SimExecutionContext
+ private lateinit var ctx: SimMachineContext
/**
* The commands to submit to the underlying host.
@@ -199,7 +204,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
val vcpu = vcpuIterator.next()
val availableShare = availableSpeed / remaining--
- when (val command = vcpu.command) {
+ when (val command = vcpu.activeCommand) {
is SimResourceCommand.Idle -> {
// Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
@@ -246,7 +251,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
// Divide the requests over the available capacity of the pCPUs fairly
for (i in pCpus) {
- val maxCpuUsage = ctx.machine.cpus[i].frequency
+ val maxCpuUsage = ctx.cpus[i].frequency
val fraction = maxCpuUsage / maxUsage
val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction)
val grantedWork = duration * grantedSpeed
@@ -275,7 +280,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
private fun flushGuests() {
// Flush all the vCPUs work
for (vcpu in vcpus) {
- vcpu.flush(interrupt = false)
+ vcpu.flush(isIntermediate = true)
}
// Report metrics
@@ -299,9 +304,9 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
/**
* Interrupt all host CPUs.
*/
- private fun SimExecutionContext.interruptAll() {
- for (i in machine.cpus.indices) {
- interrupt(i)
+ private fun SimMachineContext.interruptAll() {
+ for (cpu in ctx.cpus) {
+ interrupt(cpu)
}
}
@@ -336,33 +341,38 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
private var cpus: List<VCpu> = emptyList()
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The execution context in which the workload runs.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
+ inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ override val cpus: List<SimProcessingUnit>
+ get() = model.cpus
- override val clock: Clock
- get() = this@SimFairShareHypervisor.ctx.clock
+ override val memory: List<SimMemoryUnit>
+ get() = model.memory
- override val meta: Map<String, Any>
- get() = meta
+ override val clock: Clock
+ get() = this@SimFairShareHypervisor.ctx.clock
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
+ override fun interrupt(resource: SimResource) {
+ TODO()
}
+ }
+
+ lateinit var ctx: SimMachineContext
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
+ ctx = Context(meta)
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) }
+ this.cpus = model.cpus.map { VCpu(this, it, workload.getConsumer(ctx, it), ctx.clock) }
for (cpu in cpus) {
// Register vCPU to scheduler
@@ -387,13 +397,13 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
* Update the usage of the VM.
*/
fun updateUsage() {
- usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.model.frequency }
+ usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.resource.frequency }
}
/**
* This method is invoked when one of the CPUs has exited.
*/
- fun onCpuExit(cpu: Int) {
+ fun onCpuExit() {
// Check whether all other CPUs have finished
if (cpus.all { it.hasExited }) {
val cont = cont
@@ -419,19 +429,14 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
*/
private inner class VCpu(
val vm: SimVm,
- val ctx: SimExecutionContext,
- val model: ProcessingUnit,
- val workload: SimWorkload
- ) : Comparable<VCpu> {
+ resource: SimProcessingUnit,
+ consumer: SimResourceConsumer<SimProcessingUnit>,
+ clock: Clock
+ ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer), Comparable<VCpu> {
/**
- * The latest command processed by the CPU.
+ * The current command that is processed by the vCPU.
*/
- var command: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The latest timestamp at which the vCPU was flushed.
- */
- var latestFlush: Long = 0
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
/**
* The processing speed that is allowed by the model constraints.
@@ -448,148 +453,74 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
}
/**
- * A flag to indicate that the CPU is currently processing a command.
- */
- var isIntermediate: Boolean = false
-
- /**
* A flag to indicate that the CPU has exited.
*/
- val hasExited: Boolean
- get() = command is SimResourceCommand.Exit
-
- /**
- * Process the specified [SimResourceCommand] for this CPU.
- */
- fun process(command: SimResourceCommand) {
- // Assign command as the most recent executed command
- this.command = command
-
- when (command) {
- is SimResourceCommand.Idle -> {
- require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" }
+ var hasExited: Boolean = false
- allowedSpeed = 0.0
- }
- is SimResourceCommand.Consume -> {
- require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" }
+ override fun onIdle(deadline: Long) {
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
- allowedSpeed = min(model.frequency, command.limit)
- }
- is SimResourceCommand.Exit -> {
- allowedSpeed = 0.0
- actualSpeed = 0.0
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ allowedSpeed = getSpeed(limit)
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
- vm.onCpuExit(model.id)
- }
- }
+ override fun onFinish() {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ vm.onCpuExit()
}
- /**
- * Start the CPU.
- */
- fun start() {
- try {
- isIntermediate = true
- latestFlush = ctx.clock.millis()
-
- process(workload.onStart(ctx, model.id))
- } catch (e: Throwable) {
- fail(e)
- } finally {
- isIntermediate = false
- }
+ override fun onFailure(cause: Throwable) {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ vm.onCpuFailure(cause)
}
- /**
- * Flush the work performed by the CPU.
- */
- fun flush(interrupt: Boolean) {
- val now = ctx.clock.millis()
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ // Apply performance interference model
+ val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0
- // Fast path: if the CPU was already flushed at at the current instant, no need to flush the progress.
- if (latestFlush >= now) {
- return
+ // Compute the remaining amount of work
+ val remainingWork = if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
}
- try {
- isIntermediate = true
- when (val command = command) {
- is SimResourceCommand.Idle -> {
- // Act like nothing has happened in case the vCPU did not reach its deadline or was not
- // interrupted by the user.
- if (interrupt || command.deadline <= now) {
- process(workload.onNext(ctx, model.id, 0.0))
- }
- }
- is SimResourceCommand.Consume -> {
- // Apply performance interference model
- val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0
-
- // Compute the remaining amount of work
- val remainingWork = if (command.work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = actualSpeed / totalAllocatedSpeed
-
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
- totalInterferedWork += interferedWork
-
- max(0.0, command.work - processed)
- } else {
- 0.0
- }
-
- // Act like nothing has happened in case the vCPU did not finish yet or was not interrupted by
- // the user.
- if (interrupt || remainingWork == 0.0 || command.deadline <= now) {
- if (!interrupt) {
- totalOvercommittedWork += remainingWork
- }
-
- process(workload.onNext(ctx, model.id, remainingWork))
- } else {
- process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
- }
- }
- SimResourceCommand.Exit ->
- throw IllegalStateException()
- }
- } catch (e: Throwable) {
- fail(e)
- } finally {
- latestFlush = now
- isIntermediate = false
+ if (!isInterrupted) {
+ totalOvercommittedWork += remainingWork
}
+
+ return remainingWork
}
- /**
- * Interrupt the CPU.
- */
- fun interrupt() {
+ override fun interrupt() {
// Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
// to infinite recursion.
- if (isIntermediate) {
+ if (isProcessing) {
return
}
- flush(interrupt = true)
+ super.interrupt()
// Force the scheduler to re-schedule
shouldSchedule()
}
- /**
- * Fail the CPU.
- */
- fun fail(e: Throwable) {
- command = SimResourceCommand.Exit
- vm.onCpuFailure(e)
- }
-
override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed)
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index 657dac66..5c67b990 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -22,6 +22,9 @@
package org.opendc.simulator.compute
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResource
import java.time.Clock
/**
@@ -29,27 +32,31 @@ import java.time.Clock
* firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on
* which the image runs.
*/
-public interface SimExecutionContext {
+public interface SimMachineContext {
/**
* The virtual clock tracking simulation time.
*/
public val clock: Clock
+
+ /**
+ * The metadata associated with the context.
+ */
+ public val meta: Map<String, Any>
/**
- * The machine model of the machine that is running the image.
+ * The CPUs available on the machine.
*/
- public val machine: SimMachineModel
+ public val cpus: List<SimProcessingUnit>
/**
- * The metadata associated with the context.
+ * The memory available on the machine
*/
- public val meta: Map<String, Any>
+ public val memory: List<SimMemoryUnit>
/**
- * Ask the host machine to interrupt the specified vCPU.
+ * Interrupt the specified [resource].
*
- * @param cpu The id of the vCPU to interrupt.
- * @throws IllegalArgumentException if the identifier points to a non-existing vCPU.
+ * @throws IllegalArgumentException if the resource does not belong to this execution context.
*/
- public fun interrupt(cpu: Int)
+ public fun interrupt(resource: SimResource)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
index c2988b11..d6bf0e99 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
/**
* A description of the physical or virtual machine on which a bootable image runs.
@@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.ProcessingUnit
* @property cpus The list of processing units available to the image.
* @property memory The list of memory units available to the image.
*/
-public data class SimMachineModel(public val cpus: List<ProcessingUnit>, public val memory: List<MemoryUnit>)
+public data class SimMachineModel(public val cpus: List<SimProcessingUnit>, public val memory: List<SimMemoryUnit>)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 778b68ca..751873a5 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -26,26 +26,26 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.*
import java.time.Clock
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
-import kotlin.math.min
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*
* @param listener The hypervisor listener to use.
*/
-public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor {
+public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> {
/**
* The execution context in which the hypervisor runs.
*/
- private lateinit var ctx: SimExecutionContext
+ private lateinit var ctx: SimMachineContext
/**
* The mapping from pCPU to vCPU.
@@ -67,18 +67,36 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
return SimVm(model, performanceInterferenceModel)
}
- override fun onStart(ctx: SimExecutionContext) {
+ override fun onStart(ctx: SimMachineContext) {
this.ctx = ctx
- this.vcpus = arrayOfNulls(ctx.machine.cpus.size)
- this.availableCpus.addAll(ctx.machine.cpus.indices)
+ this.vcpus = arrayOfNulls(ctx.cpus.size)
+ this.availableCpus.addAll(ctx.cpus.indices)
}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return onNext(ctx, cpu, 0.0)
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return this
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle()
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ return onNext(ctx, 0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ val vcpu = vcpus[ctx.resource.id] ?: return SimResourceCommand.Idle()
+
+ if (vcpu.isStarted) {
+ vcpu.remainingWork = remainingWork
+ vcpu.flush()
+ } else {
+ vcpu.isStarted = true
+ vcpu.start()
+ }
+
+ if (vcpu.hasExited && vcpu != vcpus[ctx.resource.id]) {
+ return onNext(ctx, remainingWork)
+ }
+
+ return vcpu.activeCommand
}
/**
@@ -117,36 +135,46 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
private var cpus: List<VCpu> = emptyList()
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The execution context in which the workload runs.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
+ inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ override val cpus: List<SimProcessingUnit>
+ get() = model.cpus
- override val clock: Clock
- get() = this@SimSpaceSharedHypervisor.ctx.clock
+ override val memory: List<SimMemoryUnit>
+ get() = model.memory
- override val meta: Map<String, Any>
- get() = meta
+ override val clock: Clock
+ get() = this@SimSpaceSharedHypervisor.ctx.clock
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
+ override fun interrupt(resource: SimResource) {
+ TODO()
}
+ }
+
+ lateinit var ctx: SimMachineContext
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
+
+ ctx = Context(meta)
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) }
+ try {
+ this.cpus = model.cpus.map { model -> VCpu(this, model, workload.getConsumer(ctx, model), ctx.clock) }
- for (cpu in cpus) {
- cpu.start()
+ for ((index, pCPU) in pCPUs.withIndex()) {
+ vcpus[pCPU] = cpus[index]
+ this@SimSpaceSharedHypervisor.ctx.interrupt(this@SimSpaceSharedHypervisor.ctx.cpus[pCPU])
+ }
+ } catch (e: Throwable) {
+ cont.resumeWithException(e)
}
}
}
@@ -157,19 +185,23 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
vcpus[pCPU] = null
availableCpus.add(pCPU)
}
+
+ val cont = cont
+ this.cont = null
+ cont?.resume(Unit)
}
/**
* Update the usage of the VM.
*/
fun updateUsage() {
- usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency }
+ usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.resource.frequency }
}
/**
* This method is invoked when one of the CPUs has exited.
*/
- fun onCpuExit(cpu: Int) {
+ fun onCpuExit() {
// Check whether all other CPUs have finished
if (cpus.all { it.hasExited }) {
val cont = cont
@@ -193,7 +225,22 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
/**
* A CPU of the virtual machine.
*/
- private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
+ private inner class VCpu(
+ val vm: SimVm,
+ resource: SimProcessingUnit,
+ consumer: SimResourceConsumer<SimProcessingUnit>,
+ clock: Clock
+ ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer) {
+ /**
+ * Indicates that the vCPU was started.
+ */
+ var isStarted: Boolean = false
+
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
/**
* The processing speed of the vCPU.
*/
@@ -204,81 +251,41 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
}
/**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
- /**
- * A flag to indicate that the CPU was started.
+ * The amount of work remaining from the previous consumption.
*/
- var hasStarted: Boolean = false
+ var remainingWork: Double = 0.0
/**
- * Process the specified [SimResourceCommand] for this CPU.
+ * A flag to indicate that the CPU has exited.
*/
- fun process(command: SimResourceCommand): SimResourceCommand {
- return when (command) {
- is SimResourceCommand.Idle -> {
- speed = 0.0
- command
- }
- is SimResourceCommand.Consume -> {
- speed = min(model.frequency, command.limit)
- command
- }
- is SimResourceCommand.Exit -> {
- speed = 0.0
- hasExited = true
-
- vm.onCpuExit(model.id)
-
- SimResourceCommand.Idle()
- }
- }
- }
+ var hasExited: Boolean = false
- /**
- * Start the CPU.
- */
- fun start() {
- vcpus[pCPU] = this
- interrupt()
+ override fun onIdle(deadline: Long) {
+ speed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
}
- /**
- * Request the workload for more work.
- */
- fun next(remainingWork: Double): SimResourceCommand {
- return try {
- val command =
- if (hasStarted) {
- workload.onNext(ctx, model.id, remainingWork)
- } else {
- hasStarted = true
- workload.onStart(ctx, model.id)
- }
- process(command)
- } catch (e: Throwable) {
- fail(e)
- }
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ speed = getSpeed(limit)
+ activeCommand = SimResourceCommand.Consume(work, speed, deadline)
}
- /**
- * Interrupt the CPU.
- */
- fun interrupt() {
- this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU)
+ override fun onFinish() {
+ speed = 0.0
+ hasExited = true
+ activeCommand = SimResourceCommand.Idle()
+ vm.onCpuExit()
}
- /**
- * Fail the CPU.
- */
- fun fail(e: Throwable): SimResourceCommand {
+ override fun onFailure(cause: Throwable) {
+ speed = 0.0
hasExited = true
+ activeCommand = SimResourceCommand.Idle()
+ vm.onCpuFailure(cause)
+ }
- vm.onCpuFailure(e)
-
- return SimResourceCommand.Idle()
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ return remainingWork
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt
index bcbde5b1..49745868 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.model
+import org.opendc.simulator.resources.SimResource
+
/**
* A memory unit of a compute resource, either virtual or physical.
*
@@ -30,9 +32,12 @@ package org.opendc.simulator.compute.model
* @property speed The access speed of the memory in MHz.
* @property size The size of the memory unit in MBs.
*/
-public data class MemoryUnit(
+public data class SimMemoryUnit(
public val vendor: String,
public val modelName: String,
public val speed: Double,
public val size: Long
-)
+) : SimResource {
+ override val capacity: Double
+ get() = speed
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt
index 58ed816c..4022ecb3 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt
@@ -30,7 +30,7 @@ package org.opendc.simulator.compute.model
* @property arch The micro-architecture of the processor node.
* @property coreCount The number of logical CPUs in the processor node.
*/
-public data class ProcessingNode(
+public data class SimProcessingNode(
public val vendor: String,
public val arch: String,
public val modelName: String,
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt
index 415e95e6..1c989254 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.model
+import org.opendc.simulator.resources.SimResource
+
/**
* A single logical compute unit of processor node, either virtual or physical.
*
@@ -29,8 +31,11 @@ package org.opendc.simulator.compute.model
* @property id The identifier of the CPU core within the processing node.
* @property frequency The clock rate of the CPU in MHz.
*/
-public data class ProcessingUnit(
- public val node: ProcessingNode,
+public data class SimProcessingUnit(
+ public val node: SimProcessingNode,
public val id: Int,
public val frequency: Double
-)
+) : SimResource {
+ override val capacity: Double
+ get() = frequency
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index c22fcc07..9b47821e 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -22,7 +22,11 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
/**
* A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
@@ -36,31 +40,35 @@ public class SimFlopsWorkload(
public val utilization: Double = 0.8
) : SimWorkload {
init {
- require(flops >= 0) { "Negative number of flops" }
+ require(flops >= 0) { "Number of FLOPs must be positive" }
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimExecutionContext) {}
+ override fun onStart(ctx: SimMachineContext) {}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- val cores = ctx.machine.cpus.size
- val limit = ctx.machine.cpus[cpu].frequency * utilization
- val work = flops.toDouble() / cores
-
- return if (work > 0.0) {
- SimResourceCommand.Consume(work, limit)
- } else {
- SimResourceCommand.Exit
- }
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return CpuConsumer(ctx)
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.machine.cpus[cpu].frequency * utilization
+ private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ val limit = ctx.resource.frequency * utilization
+ val work = flops.toDouble() / machine.cpus.size
+
+ return if (work > 0.0) {
+ SimResourceCommand.Consume(work, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
- return SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ return if (remainingWork > 0.0) {
+ val limit = ctx.resource.frequency * utilization
+ return SimResourceCommand.Consume(remainingWork, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 00ebebce..313b6ed5 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -22,7 +22,11 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
/**
* A [SimWorkload] that models application execution as a single duration.
@@ -39,20 +43,26 @@ public class SimRuntimeWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimExecutionContext) {}
+ override fun onStart(ctx: SimMachineContext) {}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- val limit = ctx.machine.cpus[cpu].frequency * utilization
- val work = (limit / 1000) * duration
- return SimResourceCommand.Consume(work, limit)
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return CpuConsumer()
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return if (remainingWork > 0.0) {
- val limit = ctx.machine.cpus[cpu].frequency * utilization
- SimResourceCommand.Consume(remainingWork, limit)
- } else {
- SimResourceCommand.Exit
+ private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ val limit = ctx.resource.frequency * utilization
+ val work = (limit / 1000) * duration
+ return SimResourceCommand.Consume(work, limit)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ return if (remainingWork > 0.0) {
+ val limit = ctx.resource.frequency * utilization
+ SimResourceCommand.Consume(remainingWork, limit)
+ } else {
+ SimResourceCommand.Exit
+ }
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index deb10b98..edef3843 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -22,7 +22,11 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -34,36 +38,42 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
private var fragment: Fragment? = null
private lateinit var barrier: SimWorkloadBarrier
- override fun onStart(ctx: SimExecutionContext) {
- barrier = SimWorkloadBarrier(ctx.machine.cpus.size)
+ override fun onStart(ctx: SimMachineContext) {
+ barrier = SimWorkloadBarrier(ctx.cpus.size)
fragment = nextFragment()
offset = ctx.clock.millis()
}
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return onNext(ctx, cpu, 0.0)
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ return CpuConsumer()
}
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val work = (fragment.duration / 1000) * fragment.usage
- val deadline = offset + fragment.duration
+ private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> {
+ override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
+ return onNext(ctx, 0.0)
+ }
- assert(deadline >= now) { "Deadline already passed" }
+ override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
+ val now = ctx.clock.millis()
+ val fragment = fragment ?: return SimResourceCommand.Exit
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = offset + fragment.duration
- val cmd =
- if (cpu < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, fragment.usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ assert(deadline >= now) { "Deadline already passed" }
- if (barrier.enter()) {
- this.fragment = nextFragment()
- this.offset += fragment.duration
- }
+ val cmd =
+ if (ctx.resource.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
- return cmd
+ if (barrier.enter()) {
+ this@SimTraceWorkload.fragment = nextFragment()
+ this@SimTraceWorkload.offset += fragment.duration
+ }
+
+ return cmd
+ }
}
override fun toString(): String = "SimTraceWorkload"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index 6fc78d56..60661e23 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -22,7 +22,9 @@
package org.opendc.simulator.compute.workload
-import org.opendc.simulator.compute.SimExecutionContext
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.resources.SimResourceConsumer
/**
* A model that characterizes the runtime behavior of some particular workload.
@@ -32,27 +34,12 @@ import org.opendc.simulator.compute.SimExecutionContext
*/
public interface SimWorkload {
/**
- * This method is invoked when the workload is started, before the (virtual) CPUs assigned to the workload will
- * start.
+ * This method is invoked when the workload is started.
*/
- public fun onStart(ctx: SimExecutionContext)
+ public fun onStart(ctx: SimMachineContext)
/**
- * This method is invoked when a (virtual) CPU assigned to the workload has started.
- *
- * @param ctx The execution context in which the workload runs.
- * @param cpu The index of the (virtual) CPU to start.
- * @return The command to perform on the CPU.
+ * Obtain the resource consumer for the specified processing unit.
*/
- public fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand
-
- /**
- * This method is invoked when a (virtual) CPU assigned to the workload was interrupted or reached its deadline.
- *
- * @param ctx The execution context in which the workload runs.
- * @param cpu The index of the (virtual) CPU to obtain the resource consumption of.
- * @param remainingWork The remaining work that was not yet completed.
- * @return The next command to perform on the CPU.
- */
- public fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand
+ public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit>
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index b8eee4f0..4b4d7eca 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -30,9 +30,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.time.Clock
@@ -51,10 +51,10 @@ internal class SimHypervisorTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 1036f1ac..00efba53 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -29,14 +29,10 @@ import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.simulator.compute.workload.SimResourceCommand
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
/**
@@ -48,11 +44,11 @@ class SimMachineTest {
@BeforeEach
fun setUp() {
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
@@ -86,74 +82,4 @@ class SimMachineTest {
assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" }
}
}
-
- @Test
- fun testInterrupt() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
-
- val workload = object : SimWorkload {
- override fun onStart(ctx: SimExecutionContext) {}
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- ctx.interrupt(cpu)
- return SimResourceCommand.Exit
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
-
- assertDoesNotThrow {
- testScope.runBlockingTest { machine.run(workload) }
- }
- }
-
- @Test
- fun testExceptionPropagationOnStart() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
-
- val workload = object : SimWorkload {
- override fun onStart(ctx: SimExecutionContext) {}
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- throw IllegalStateException()
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
-
- assertThrows<IllegalStateException> {
- testScope.runBlockingTest { machine.run(workload) }
- }
- }
-
- @Test
- fun testExceptionPropagationOnNext() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
-
- val workload = object : SimWorkload {
- override fun onStart(ctx: SimExecutionContext) {}
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return SimResourceCommand.Consume(1.0, 1.0)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- throw IllegalStateException()
- }
- }
-
- assertThrows<IllegalStateException> {
- testScope.runBlockingTest { machine.run(workload) }
- }
- }
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
index 1a9faf11..583d989c 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -31,9 +31,10 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingNode
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
@@ -53,10 +54,10 @@ internal class SimSpaceSharedHypervisorTest {
scope = TestCoroutineScope()
clock = DelayControllerClockAdapter(scope)
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
+ val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
}
@@ -126,6 +127,56 @@ internal class SimSpaceSharedHypervisorTest {
}
/**
+ * Test FLOPs workload on hypervisor.
+ */
+ @Test
+ fun testFlopsWorkload() {
+ val duration = 5 * 60L * 1000
+ val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch { hypervisor.createMachine(machineModel).run(workload) }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertEquals(duration, scope.currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() {
+ val duration = 5 * 60L * 1000
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch {
+ val vm = hypervisor.createMachine(machineModel)
+ vm.run(SimRuntimeWorkload(duration))
+ vm.close()
+
+ val vm2 = hypervisor.createMachine(machineModel)
+ vm2.run(SimRuntimeWorkload(duration))
+ }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertEquals(duration * 2, scope.currentTime) { "Took enough time" }
+ }
+
+ /**
* Test concurrent workloads on the machine.
*/
@Test
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts
new file mode 100644
index 00000000..831ca3db
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Uniform resource consumption simulation model"
+
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(project(":opendc-platform")))
+ api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
+ implementation(project(":opendc-utils"))
+
+ testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
new file mode 100644
index 00000000..f9da74c7
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -0,0 +1,255 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ */
+public abstract class SimAbstractResourceContext<R : SimResource>(
+ override val resource: R,
+ override val clock: Clock,
+ private val consumer: SimResourceConsumer<R>
+) : SimResourceContext<R> {
+ /**
+ * This method is invoked when the resource will idle until the specified [deadline].
+ */
+ public abstract fun onIdle(deadline: Long)
+
+ /**
+ * This method is invoked when the resource will be consumed until the specified [work] was processed or the
+ * [deadline] was reached.
+ */
+ public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public abstract fun onFinish()
+
+ /**
+ * This method is invoked when the resource consumer throws an exception.
+ */
+ public abstract fun onFailure(cause: Throwable)
+
+ /**
+ * Compute the duration that a resource consumption will take with the specified [speed].
+ */
+ protected open fun getDuration(work: Double, speed: Double): Long {
+ return ceil(work / speed * 1000).toLong()
+ }
+
+ /**
+ * Compute the speed at which the resource may be consumed.
+ */
+ protected open fun getSpeed(limit: Double): Double {
+ return min(limit, resource.capacity)
+ }
+
+ /**
+ * Get the remaining work to process after a resource consumption was flushed.
+ *
+ * @param work The size of the resource consumption.
+ * @param speed The speed of consumption.
+ * @param duration The duration from the start of the consumption until now.
+ * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to
+ * it being interrupted before it could finish or reach its deadline.
+ * @return The amount of work remaining.
+ */
+ protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ return if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+
+ /**
+ * Start the consumer.
+ */
+ public fun start() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ interpret(consumer.onStart(this))
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Immediately stop the consumer.
+ */
+ public fun stop() {
+ try {
+ isProcessing = true
+ latestFlush = clock.millis()
+
+ flush(isIntermediate = true)
+ onFinish()
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ isProcessing = false
+ }
+ }
+
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public open fun flush(isIntermediate: Boolean = false) {
+ val now = clock.millis()
+
+ // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
+ if (isIntermediate && latestFlush >= now) {
+ return
+ }
+
+ try {
+ val (timestamp, command) = activeCommand ?: return
+
+ isProcessing = true
+ activeCommand = null
+
+ val duration = now - timestamp
+ assert(duration >= 0) { "Flush in the past" }
+
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ // We should only continue processing the next command if:
+ // 1. The resource consumer reached its deadline.
+ // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (command.deadline <= now || !isIntermediate) {
+ next(remainingWork = 0.0)
+ }
+ }
+ is SimResourceCommand.Consume -> {
+ val speed = min(resource.capacity, command.limit)
+ val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed)
+ val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted)
+
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource consumer reached its deadline.
+ // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
+ next(remainingWork)
+ } else {
+ interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
+ }
+ }
+ SimResourceCommand.Exit ->
+ // Flush may not be called when the resource consumer has finished
+ throw IllegalStateException()
+ }
+ } catch (e: Throwable) {
+ onFailure(e)
+ } finally {
+ latestFlush = now
+ isProcessing = false
+ }
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the resource while they are constructing their next command, as this will
+ // only lead to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun toString(): String = "SimAbstractResourceContext[resource=$resource]"
+
+ /**
+ * A flag to indicate that the resource is currently processing a command.
+ */
+ protected var isProcessing: Boolean = false
+
+ /**
+ * The current command that is being processed.
+ */
+ private var activeCommand: CommandWrapper? = null
+
+ /**
+ * The latest timestamp at which the resource was flushed.
+ */
+ private var latestFlush: Long = Long.MIN_VALUE
+
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand) {
+ val now = clock.millis()
+
+ when (command) {
+ is SimResourceCommand.Idle -> {
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ onIdle(deadline)
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ onConsume(work, limit, deadline)
+ }
+ is SimResourceCommand.Exit -> {
+ onFinish()
+ }
+ }
+
+ assert(activeCommand == null) { "Concurrent access to current command" }
+ activeCommand = CommandWrapper(now, command)
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ private fun next(remainingWork: Double) {
+ interpret(consumer.onNext(this, remainingWork))
+ }
+
+ /**
+ * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
+ */
+ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
new file mode 100644
index 00000000..31b0a175
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A generic representation of resource that may be consumed.
+ */
+public interface SimResource {
+ /**
+ * The capacity of the resource.
+ */
+ public val capacity: Double
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
index 41a5028e..77c0a7a9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
@@ -20,14 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload
+package org.opendc.simulator.resources
/**
- * A command that is sent to the host machine.
+ * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer].
*/
public sealed class SimResourceCommand {
/**
- * A request to the host to process the specified amount of [work] on a vCPU before the specified [deadline].
+ * A request to the resource to perform the specified amount of work before the given [deadline].
*
* @param work The amount of work to process on the CPU.
* @param limit The maximum amount of work to be processed per second.
@@ -35,18 +35,18 @@ public sealed class SimResourceCommand {
*/
public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() {
init {
- require(work > 0) { "The amount of work must be positive." }
- require(limit > 0) { "Limit must be positive." }
+ require(work > 0) { "Amount of work must be positive" }
+ require(limit > 0) { "Limit must be positive" }
}
}
/**
- * An indication to the host that the vCPU will idle until the specified [deadline] or is interrupted.
+ * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted.
*/
public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand()
/**
- * An indication to the host that the vCPU has finished processing.
+ * An indication to the resource that the consumer has finished.
*/
public object Exit : SimResourceCommand()
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
new file mode 100644
index 00000000..f516faa6
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A SimResourceConsumer characterizes how a [SimResource] is consumed.
+ */
+public interface SimResourceConsumer<in R : SimResource> {
+ /**
+ * This method is invoked when the consumer is started for a resource.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @return The next command that the resource should perform.
+ */
+ public fun onStart(ctx: SimResourceContext<R>): SimResourceCommand
+
+ /**
+ * This method is invoked when a resource was either interrupted or reached its deadline.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param remainingWork The remaining work that was not yet completed.
+ * @return The next command that the resource should perform.
+ */
+ public fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
new file mode 100644
index 00000000..dfb5e9ce
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
+ * resource and a resource consumer.
+ */
+public interface SimResourceContext<out R : SimResource> {
+ /**
+ * The resource that is managed by this context.
+ */
+ public val resource: R
+
+ /**
+ * The virtual clock tracking simulation time.
+ */
+ public val clock: Clock
+
+ /**
+ * Ask the resource provider to interrupt its resource.
+ */
+ public fun interrupt()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
new file mode 100644
index 00000000..91a745ab
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceProvider] provides some resource of type [R].
+ */
+public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
+ /**
+ * The resource that is managed by this provider.
+ */
+ public val resource: R
+
+ /**
+ * Consume the resource provided by this provider using the specified [consumer].
+ */
+ public suspend fun consume(consumer: SimResourceConsumer<R>)
+
+ /**
+ * End the lifetime of the resource.
+ *
+ * This operation terminates the existing resource consumer.
+ */
+ public override fun close()
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
new file mode 100644
index 00000000..4445df86
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.min
+
+/**
+ * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
+ *
+ * @param resource The resource to provide.
+ * @param clock The virtual clock to track simulation time.
+ */
+public class SimResourceSource<R : SimResource>(
+ override val resource: R,
+ private val clock: Clock,
+ private val scheduler: TimerScheduler<Any>
+) : SimResourceProvider<R> {
+ /**
+ * The resource processing speed over time.
+ */
+ public val speed: StateFlow<Double>
+ get() = _speed
+ private val _speed = MutableStateFlow(0.0)
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) {
+ check(!isClosed) { "Lifetime of resource has ended." }
+ check(cont == null) { "Run should not be called concurrently" }
+
+ try {
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ val ctx = Context(consumer, cont)
+ ctx.start()
+ cont.invokeOnCancellation {
+ ctx.stop()
+ }
+ }
+ } finally {
+ cont = null
+ }
+ }
+
+ override fun close() {
+ isClosed = true
+ cont?.cancel()
+ cont = null
+ }
+
+ /**
+ * A flag to indicate that the resource was closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The current active consumer.
+ */
+ private var cont: CancellableContinuation<Unit>? = null
+
+ /**
+ * Internal implementation of [SimResourceContext] for this class.
+ */
+ private inner class Context(
+ consumer: SimResourceConsumer<R>,
+ val cont: Continuation<Unit>
+ ) : SimAbstractResourceContext<R>(resource, clock, consumer) {
+ /**
+ * The processing speed of the resource.
+ */
+ private var speed: Double = 0.0
+ set(value) {
+ field = value
+ _speed.value = field
+ }
+
+ override fun onIdle(deadline: Long) {
+ speed = 0.0
+
+ // Do not resume if deadline is "infinite"
+ if (deadline != Long.MAX_VALUE) {
+ scheduler.startSingleTimerTo(this, deadline) { flush() }
+ }
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ speed = getSpeed(limit)
+ val until = min(deadline, clock.millis() + getDuration(work, speed))
+
+ scheduler.startSingleTimerTo(this, until) { flush() }
+ }
+
+ override fun onFinish() {
+ speed = 0.0
+ scheduler.cancel(this)
+ cont.resume(Unit)
+ }
+
+ override fun onFailure(cause: Throwable) {
+ speed = 0.0
+ scheduler.cancel(this)
+ cont.resumeWithException(cause)
+ }
+
+ override fun toString(): String = "SimResourceSource.Context[resource=$resource]"
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
new file mode 100644
index 00000000..02d456ff
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for [SimResourceCommand].
+ */
+class SimResourceCommandTest {
+ @Test
+ fun testZeroWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(0.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testNegativeWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(-1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testZeroLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, 0.0)
+ }
+ }
+
+ @Test
+ fun testNegativeLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, -1.0, 1)
+ }
+ }
+
+ @Test
+ fun testConsumeCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Consume(1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testIdleCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Idle(1)
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
new file mode 100644
index 00000000..8b380efb
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -0,0 +1,285 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+
+/**
+ * A test suite for the [SimResourceScheduler] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceSourceTest {
+
+ private lateinit var scope: TestCoroutineScope
+ private lateinit var clock: Clock
+
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @BeforeEach
+ fun setUp() {
+ scope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(scope)
+ }
+
+ @Test
+ fun testSpeed() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1000 * ctx.resource.speed, ctx.resource.speed)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ scope.runBlockingTest {
+ val res = mutableListOf<Double>()
+ val job = launch { provider.speed.toList(res) }
+
+ provider.consume(consumer)
+
+ job.cancel()
+ assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" }
+ }
+ }
+
+ @Test
+ fun testSpeedLimit() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1000 * ctx.resource.speed, 2 * ctx.resource.speed)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ scope.runBlockingTest {
+ val res = mutableListOf<Double>()
+ val job = launch { provider.speed.toList(res) }
+
+ provider.consume(consumer)
+
+ job.cancel()
+ assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" }
+ }
+ }
+
+ @Test
+ fun testInterrupt() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ ctx.interrupt()
+ return SimResourceCommand.Exit
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertDoesNotThrow {
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testFailure() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ throw IllegalStateException()
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest { provider.consume(consumer) }
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ launch { provider.consume(consumer) }
+ launch { provider.consume(consumer) }
+ }
+ }
+ }
+
+ @Test
+ fun testClosedConsumption() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ provider.close()
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testCloseDuringConsumption() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ throw IllegalStateException()
+ }
+ }
+
+ scope.runBlockingTest {
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.close()
+ }
+
+ assertEquals(500, scope.currentTime)
+ }
+
+ @Test
+ fun testIdle() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle(ctx.clock.millis() + 500)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+
+ assertEquals(500, scope.currentTime)
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ val resource = SimCpu(4200.0)
+ val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle()
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ scope.runBlockingTest {
+ provider.consume(consumer)
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index ff116443..bb6f3299 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -51,7 +51,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
private val timers = mutableMapOf<T, Timer>()
/**
- * The channel to communicate with the
+ * The channel to communicate with the scheduling job.
*/
private val channel = Channel<Long?>(Channel.CONFLATED)
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index e87dd4d8..0e5a2711 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -32,6 +32,7 @@ include(":opendc-experiments:opendc-experiments-sc18")
include(":opendc-experiments:opendc-experiments-capelin")
include(":opendc-runner-web")
include(":opendc-simulator:opendc-simulator-core")
+include(":opendc-simulator:opendc-simulator-resources")
include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
include(":opendc-trace:opendc-trace-core")