summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-17 16:23:48 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-17 16:23:48 +0100
commitbb3b8e207a08edff81b8c2fe30b476c94bfea086 (patch)
treeee739cf4092a2b807e0043bed7cae72cff7b6bac
parent9ab482d0afd773703f78d51a2ba8a160896f03c6 (diff)
simulator: Make hypervisors generic for the resource type
This change moves the hypervisor implementations to the opendc-simulator-resources module and makes them generic to the resource type that is being used (e.g., CPU, disk or networking).
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt3
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt11
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt164
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt116
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt80
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt507
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt5
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt263
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt5
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt134
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt29
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt191
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt7
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt155
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt5
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt36
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt48
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt92
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt508
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt (renamed from simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt)8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt63
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt156
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt92
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt225
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt190
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt207
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt26
-rw-r--r--simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt14
32 files changed, 2232 insertions, 1118 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 2c38f7cb..aa7e0aa1 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -125,7 +125,7 @@ public class ComputeServiceImpl(
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
- private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope, clock)
+ private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
override val hosts: Set<Host>
get() = hostToView.keys
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 0da81152..9cc1bf54 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -84,7 +84,7 @@ public class SimHost(
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(scope, clock, model)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model)
/**
* The hypervisor to run multiple workloads.
@@ -206,6 +206,7 @@ public class SimHost(
override fun close() {
scope.cancel()
+ machine.close()
_state = HostState.DOWN
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index a45ab9fc..6929b06c 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -136,8 +136,8 @@ internal class SimHostTest {
assertAll(
{ assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
- { assertEquals(4197600, requestedWork, "Requested work does not match") },
- { assertEquals(3057600, grantedWork, "Granted work does not match") },
+ { assertEquals(4273200, requestedWork, "Requested work does not match") },
+ { assertEquals(3133200, grantedWork, "Granted work does not match") },
{ assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") },
{ assertEquals(1200006, scope.currentTime) }
)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 4e6cfddc..59ce895f 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -27,6 +27,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.yield
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -142,8 +143,8 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1678587333640, monitor.totalRequestedBurst) },
- { assertEquals(438118200924, monitor.totalGrantedBurst) },
+ { assertEquals(1707132711051, monitor.totalRequestedBurst) },
+ { assertEquals(457881474296, monitor.totalGrantedBurst) },
{ assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
@@ -176,6 +177,8 @@ class CapelinIntegrationTest {
monitor
)
+ yield()
+
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
scheduler.close()
@@ -186,8 +189,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
{ assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
new file mode 100644
index 00000000..a99b082a
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.*
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Abstract implementation of the [SimHypervisor] interface.
+ */
+public abstract class SimAbstractHypervisor : SimHypervisor {
+ /**
+ * The machine on which the hypervisor runs.
+ */
+ private lateinit var context: SimMachineContext
+
+ /**
+ * The resource switch to use.
+ */
+ private lateinit var switch: SimResourceSwitch<SimProcessingUnit>
+
+ /**
+ * The virtual machines running on this hypervisor.
+ */
+ private val _vms = mutableSetOf<VirtualMachine>()
+ override val vms: Set<SimMachine>
+ get() = _vms
+
+ /**
+ * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs.
+ */
+ public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit>
+
+ /**
+ * Check whether the specified machine model fits on this hypervisor.
+ */
+ public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean
+
+ override fun canFit(model: SimMachineModel): Boolean {
+ return canFit(model, switch)
+ }
+
+ override fun createMachine(
+ model: SimMachineModel,
+ performanceInterferenceModel: PerformanceInterferenceModel?
+ ): SimMachine {
+ require(canFit(model)) { "Machine does not fit" }
+ val vm = VirtualMachine(model, performanceInterferenceModel)
+ _vms.add(vm)
+ return vm
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @property model The machine model of the virtual machine.
+ * @property performanceInterferenceModel The performance interference model to utilize.
+ */
+ private inner class VirtualMachine(
+ override val model: SimMachineModel,
+ val performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ ) : SimMachine {
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
+ override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
+
+ /**
+ * A flag to indicate that the machine is terminated.
+ */
+ private var isTerminated = false
+
+ /**
+ * The vCPUs of the machine.
+ */
+ private val cpus: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>> = model.cpus.associateWith { switch.addOutput(it) }
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
+ coroutineScope {
+ require(!isTerminated) { "Machine is terminated" }
+
+ val ctx = object : SimMachineContext {
+ override val cpus: List<SimProcessingUnit>
+ get() = model.cpus
+
+ override val memory: List<SimMemoryUnit>
+ get() = model.memory
+
+ override val clock: Clock
+ get() = this@SimAbstractHypervisor.context.clock
+
+ override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext)
+
+ override fun interrupt(resource: SimResource) {
+ requireNotNull(this@VirtualMachine.cpus[resource]).interrupt()
+ }
+ }
+
+ workload.onStart(ctx)
+
+ for ((cpu, provider) in cpus) {
+ launch {
+ provider.consume(workload.getConsumer(ctx, cpu))
+ }
+ }
+ }
+ }
+
+ /**
+ * Terminate this VM instance.
+ */
+ override fun close() {
+ if (!isTerminated) {
+ cpus.forEach { (_, provider) -> provider.close() }
+ _vms.remove(this)
+ }
+
+ isTerminated = true
+ }
+ }
+
+ override fun onStart(ctx: SimMachineContext) {
+ context = ctx
+ switch = createSwitch(ctx)
+ }
+
+ override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
+ val forwarder = SimResourceForwarder(cpu)
+ switch.addInput(forwarder)
+ return forwarder
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
new file mode 100644
index 00000000..1bdbb7e8
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import org.opendc.simulator.compute.model.SimMemoryUnit
+import org.opendc.simulator.compute.model.SimProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResource
+import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.resources.SimResourceSource
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Abstract implementation of the [SimMachine] interface.
+ *
+ * @param context The [CoroutineContext] in which the machine runs.
+ */
+public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine {
+ private val _usage = MutableStateFlow(0.0)
+ override val usage: StateFlow<Double>
+ get() = _usage
+
+ /**
+ * A flag to indicate that the machine is terminated.
+ */
+ private var isTerminated = false
+
+ /**
+ * The [CoroutineContext] to run in.
+ */
+ protected abstract val context: CoroutineContext
+
+ /**
+ * The resources allocated for this machine.
+ */
+ protected abstract val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>>
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ private inner class Context(
+ val sources: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>>,
+ override val meta: Map<String, Any>
+ ) : SimMachineContext {
+ override val clock: Clock
+ get() = this@SimAbstractMachine.clock
+
+ override val cpus: List<SimProcessingUnit> = model.cpus
+
+ override val memory: List<SimMemoryUnit> = model.memory
+
+ override fun interrupt(resource: SimResource) {
+ checkNotNull(sources[resource]) { "Invalid resource" }.interrupt()
+ }
+ }
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) {
+ val resources = resources
+ require(!isTerminated) { "Machine is terminated" }
+ val ctx = Context(resources, meta + mapOf("coroutine-context" to context))
+ val totalCapacity = model.cpus.sumByDouble { it.frequency }
+
+ workload.onStart(ctx)
+
+ for ((cpu, source) in resources) {
+ val consumer = workload.getConsumer(ctx, cpu)
+ val job = source.speed
+ .onEach {
+ _usage.value = resources.values.sumByDouble { it.speed.value } / totalCapacity
+ }
+ .launchIn(this)
+
+ launch {
+ source.consume(consumer)
+ job.cancel()
+ }
+ }
+ }
+
+ override fun close() {
+ if (!isTerminated) {
+ resources.forEach { (_, provider) -> provider.close() }
+ } else {
+ isTerminated = true
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index b1d1c0b7..79982ea8 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -23,17 +23,10 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import org.opendc.simulator.compute.model.SimMemoryUnit
import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
import org.opendc.utils.TimerScheduler
import java.time.Clock
-import java.util.*
import kotlin.coroutines.*
/**
@@ -42,83 +35,34 @@ import kotlin.coroutines.*
* A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For
* example. the class expects only a single concurrent call to [run].
*
- * @param coroutineScope The [CoroutineScope] to run the simulated workload in.
+ * @param context The [CoroutineContext] to run the simulated workload in.
* @param clock The virtual clock to track the simulation time.
* @param model The machine model to simulate.
*/
@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
public class SimBareMetalMachine(
- private val coroutineScope: CoroutineScope,
+ context: CoroutineContext,
private val clock: Clock,
override val model: SimMachineModel
-) : SimMachine {
- private val _usage = MutableStateFlow(0.0)
- override val usage: StateFlow<Double>
- get() = _usage
-
+) : SimAbstractMachine(clock) {
/**
- * A flag to indicate that the machine is terminated.
+ * The [Job] associated with this machine.
*/
- private var isTerminated = false
+ private val job = Job()
- /**
- * The [TimerScheduler] to use for scheduling the interrupts.
- */
- private val scheduler = TimerScheduler<Any>(coroutineScope, clock)
+ override val context: CoroutineContext = context + job
/**
- * The execution context in which the workload runs.
- */
- private inner class Context(val map: Map<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>,
- override val meta: Map<String, Any>) : SimMachineContext {
- override val clock: Clock
- get() = this@SimBareMetalMachine.clock
-
- override val cpus: List<SimProcessingUnit> = model.cpus
-
- override val memory: List<SimMemoryUnit> = model.memory
-
- override fun interrupt(resource: SimResource) {
- val context = map[resource]
- checkNotNull(context) { "Invalid resource" }
- context.interrupt()
- }
- }
-
- /**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * The [TimerScheduler] to use for scheduling the interrupts.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = coroutineScope {
- require(!isTerminated) { "Machine is terminated" }
- val map = mutableMapOf<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>()
- val ctx = Context(map, meta)
- val sources = model.cpus.map { SimResourceSource(it, clock, scheduler) }
- val totalCapacity = model.cpus.sumByDouble { it.frequency }
+ private val scheduler = TimerScheduler<Any>(this.context, clock)
- workload.onStart(ctx)
-
- for (source in sources) {
- val consumer = workload.getConsumer(ctx, source.resource)
- val job = source.speed
- .onEach {
- _usage.value = sources.sumByDouble { it.speed.value } / totalCapacity
- }
- .launchIn(this)
-
- launch {
- source.consume(object : SimResourceConsumer<SimProcessingUnit> by consumer {
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- map[ctx.resource] = ctx
- return consumer.onStart(ctx)
- }
- })
- job.cancel()
- }
- }
- }
+ override val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> =
+ model.cpus.associateWith { SimResourceSource(it, clock, scheduler) }
override fun close() {
- isTerminated = true
+ super.close()
scheduler.close()
+ job.cancel()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index 12b3b428..bb97192d 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -22,22 +22,10 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.suspendCancellableCoroutine
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.compute.workload.SimWorkloadBarrier
import org.opendc.simulator.resources.*
-import java.time.Clock
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
+import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
@@ -45,482 +33,27 @@ import kotlin.math.min
*
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> {
-
- override fun onStart(ctx: SimMachineContext) {
- this.ctx = ctx
- this.commands = Array(ctx.cpus.size) { SimResourceCommand.Idle() }
- this.pCpus = ctx.cpus.indices.sortedBy { ctx.cpus[it].frequency }.toIntArray()
- this.maxUsage = ctx.cpus.sumByDouble { it.frequency }
- this.barrier = SimWorkloadBarrier(ctx.cpus.size)
- }
-
- override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return this
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- val cpu = ctx.resource.id
- totalRemainingWork += remainingWork
- val isLast = barrier.enter()
-
- // Flush the progress of the guest after the barrier has been reached.
- if (isLast && isDirty) {
- isDirty = false
- flushGuests()
- }
-
- return if (isDirty) {
- // Wait for the scheduler determine the work after the barrier has been reached by all CPUs.
- SimResourceCommand.Idle()
- } else {
- // Indicate that the scheduler needs to run next call.
- if (isLast) {
- isDirty = true
- }
-
- commands[cpu]
- }
- }
-
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- return commands[ctx.resource.id]
- }
-
- override fun canFit(model: SimMachineModel): Boolean = true
-
- override fun createMachine(
- model: SimMachineModel,
- performanceInterferenceModel: PerformanceInterferenceModel?
- ): SimMachine = SimVm(model, performanceInterferenceModel)
-
- /**
- * The execution context in which the hypervisor runs.
- */
- private lateinit var ctx: SimMachineContext
-
- /**
- * The commands to submit to the underlying host.
- */
- private lateinit var commands: Array<SimResourceCommand>
-
- /**
- * The active vCPUs.
- */
- private val vcpus: MutableList<VCpu> = mutableListOf()
-
- /**
- * The indices of the physical CPU ordered by their speed.
- */
- private lateinit var pCpus: IntArray
-
- /**
- * The maximum amount of work to be performed per second.
- */
- private var maxUsage: Double = 0.0
-
- /**
- * The current load on the hypervisor.
- */
- private var load: Double = 0.0
-
- /**
- * The total amount of remaining work (of all pCPUs).
- */
- private var totalRemainingWork: Double = 0.0
-
- /**
- * The total speed requested by the vCPUs.
- */
- private var totalRequestedSpeed = 0.0
-
- /**
- * The total amount of work requested by the vCPUs.
- */
- private var totalRequestedWork = 0.0
-
- /**
- * The total allocated speed for the vCPUs.
- */
- private var totalAllocatedSpeed = 0.0
-
- /**
- * The total allocated work requested for the vCPUs.
- */
- private var totalAllocatedWork = 0.0
-
- /**
- * The amount of work that could not be performed due to over-committing resources.
- */
- private var totalOvercommittedWork = 0.0
-
- /**
- * The amount of work that was lost due to interference.
- */
- private var totalInterferedWork = 0.0
-
- /**
- * A flag to indicate that the scheduler has submitted work that has not yet been completed.
- */
- private var isDirty: Boolean = false
-
- /**
- * The scheduler barrier.
- */
- private lateinit var barrier: SimWorkloadBarrier
-
- /**
- * Indicate that the workloads should be re-scheduled.
- */
- private fun shouldSchedule() {
- isDirty = true
- ctx.interruptAll()
- }
-
- /**
- * Schedule the work over the physical CPUs.
- */
- private fun doSchedule() {
- // If there is no work yet, mark all pCPUs as idle.
- if (vcpus.isEmpty()) {
- commands.fill(SimResourceCommand.Idle())
- ctx.interruptAll()
- }
-
- var duration: Double = Double.MAX_VALUE
- var deadline: Long = Long.MAX_VALUE
- var availableSpeed = maxUsage
- var totalRequestedSpeed = 0.0
- var totalRequestedWork = 0.0
-
- // Sort the vCPUs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- vcpus.sort()
-
- // Divide the available host capacity fairly across the vCPUs using max-min fair sharing
- val vcpuIterator = vcpus.listIterator()
- var remaining = vcpus.size
- while (vcpuIterator.hasNext()) {
- val vcpu = vcpuIterator.next()
- val availableShare = availableSpeed / remaining--
-
- when (val command = vcpu.activeCommand) {
- is SimResourceCommand.Idle -> {
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, command.deadline)
-
- vcpu.actualSpeed = 0.0
- }
- is SimResourceCommand.Consume -> {
- val grantedSpeed = min(vcpu.allowedSpeed, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
- deadline = min(deadline, command.deadline)
-
- // Ignore idle computation
- if (grantedSpeed <= 0.0 || command.work <= 0.0) {
- vcpu.actualSpeed = 0.0
- continue
- }
-
- totalRequestedSpeed += command.limit
- totalRequestedWork += command.work
-
- vcpu.actualSpeed = grantedSpeed
- availableSpeed -= grantedSpeed
-
- // The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, command.work / grantedSpeed)
- }
- SimResourceCommand.Exit -> {
- // Apparently the vCPU has exited, so remove it from the scheduling queue.
- vcpuIterator.remove()
+public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
+
+ override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean = true
+
+ override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
+ return SimResourceSwitchMaxMin(
+ ctx.clock,
+ ctx.meta["coroutine-context"] as CoroutineContext,
+ object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> {
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin<SimProcessingUnit>,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
}
}
- }
-
- // Round the duration to milliseconds
- duration = ceil(duration * 1000) / 1000
-
- assert(deadline >= ctx.clock.millis()) { "Deadline already passed" }
-
- val totalAllocatedSpeed = maxUsage - availableSpeed
- var totalAllocatedWork = 0.0
- availableSpeed = totalAllocatedSpeed
- load = totalAllocatedSpeed / maxUsage
-
- // Divide the requests over the available capacity of the pCPUs fairly
- for (i in pCpus) {
- val maxCpuUsage = ctx.cpus[i].frequency
- val fraction = maxCpuUsage / maxUsage
- val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction)
- val grantedWork = duration * grantedSpeed
-
- commands[i] =
- if (grantedWork > 0.0 && grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
- else
- SimResourceCommand.Idle(deadline)
-
- totalAllocatedWork += grantedWork
- availableSpeed -= grantedSpeed
- }
-
- this.totalRequestedSpeed = totalRequestedSpeed
- this.totalRequestedWork = totalRequestedWork
- this.totalAllocatedSpeed = totalAllocatedSpeed
- this.totalAllocatedWork = totalAllocatedWork
-
- ctx.interruptAll()
- }
-
- /**
- * Flush the progress of the vCPUs.
- */
- private fun flushGuests() {
- // Flush all the vCPUs work
- for (vcpu in vcpus) {
- vcpu.flush(isIntermediate = true)
- }
-
- // Report metrics
- listener?.onSliceFinish(
- this,
- totalRequestedWork.toLong(),
- (totalAllocatedWork - totalRemainingWork).toLong(),
- totalOvercommittedWork.toLong(),
- totalInterferedWork.toLong(),
- totalRequestedSpeed,
- totalAllocatedSpeed
)
- totalRemainingWork = 0.0
- totalInterferedWork = 0.0
- totalOvercommittedWork = 0.0
-
- // Force all pCPUs to re-schedule their work.
- doSchedule()
- }
-
- /**
- * Interrupt all host CPUs.
- */
- private fun SimMachineContext.interruptAll() {
- for (cpu in ctx.cpus) {
- interrupt(cpu)
- }
- }
-
- /**
- * A virtual machine running on the hypervisor.
- *
- * @property model The machine model of the virtual machine.
- * @property performanceInterferenceModel The performance interference model to utilize.
- */
- private inner class SimVm(
- override val model: SimMachineModel,
- val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- ) : SimMachine {
- /**
- * A [StateFlow] representing the CPU usage of the simulated machine.
- */
- override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- /**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
- /**
- * The current active workload.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
- * The active CPUs of this virtual machine.
- */
- private var cpus: List<VCpu> = emptyList()
-
- /**
- * The execution context in which the workload runs.
- */
- inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val cpus: List<SimProcessingUnit>
- get() = model.cpus
-
- override val memory: List<SimMemoryUnit>
- get() = model.memory
-
- override val clock: Clock
- get() = this@SimFairShareHypervisor.ctx.clock
-
- override fun interrupt(resource: SimResource) {
- TODO()
- }
- }
-
- lateinit var ctx: SimMachineContext
-
- /**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- ctx = Context(meta)
- workload.onStart(ctx)
-
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
- this.cpus = model.cpus.map { VCpu(this, it, workload.getConsumer(ctx, it), ctx.clock) }
-
- for (cpu in cpus) {
- // Register vCPU to scheduler
- vcpus.add(cpu)
-
- cpu.start()
- }
-
- // Re-schedule the work over the pCPUs
- shouldSchedule()
- }
- }
-
- /**
- * Terminate this VM instance.
- */
- override fun close() {
- isTerminated = true
- }
-
- /**
- * Update the usage of the VM.
- */
- fun updateUsage() {
- usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.resource.frequency }
- }
-
- /**
- * This method is invoked when one of the CPUs has exited.
- */
- fun onCpuExit() {
- // Check whether all other CPUs have finished
- if (cpus.all { it.hasExited }) {
- val cont = cont
- this.cont = null
- cont?.resume(Unit)
- }
- }
-
- /**
- * This method is invoked when one of the CPUs failed.
- */
- fun onCpuFailure(e: Throwable) {
- // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
- // tasks.
- val cont = cont
- this.cont = null
- cont?.resumeWithException(e)
- }
- }
-
- /**
- * A CPU of the virtual machine.
- */
- private inner class VCpu(
- val vm: SimVm,
- resource: SimProcessingUnit,
- consumer: SimResourceConsumer<SimProcessingUnit>,
- clock: Clock
- ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer), Comparable<VCpu> {
- /**
- * The current command that is processed by the vCPU.
- */
- var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- var allowedSpeed: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- var actualSpeed: Double = 0.0
- set(value) {
- field = value
- vm.updateUsage()
- }
-
- /**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
- override fun onIdle(deadline: Long) {
- allowedSpeed = 0.0
- activeCommand = SimResourceCommand.Idle(deadline)
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- allowedSpeed = getSpeed(limit)
- activeCommand = SimResourceCommand.Consume(work, limit, deadline)
- }
-
- override fun onFinish() {
- hasExited = true
- activeCommand = SimResourceCommand.Exit
- vm.onCpuExit()
- }
-
- override fun onFailure(cause: Throwable) {
- hasExited = true
- activeCommand = SimResourceCommand.Exit
- vm.onCpuFailure(cause)
- }
-
- override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
- // Apply performance interference model
- val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0
-
- // Compute the remaining amount of work
- val remainingWork = if (work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
- val fraction = actualSpeed / totalAllocatedSpeed
-
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
-
- totalInterferedWork += interferedWork
-
- max(0.0, work - processed)
- } else {
- 0.0
- }
-
- if (!isInterrupted) {
- totalOvercommittedWork += remainingWork
- }
-
- return remainingWork
- }
-
- override fun interrupt() {
- // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
- // to infinite recursion.
- if (isProcessing) {
- return
- }
-
- super.interrupt()
-
- // Force the scheduler to re-schedule
- shouldSchedule()
- }
-
- override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed)
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
index d8f00bef..4a233fec 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt
@@ -31,6 +31,11 @@ import org.opendc.simulator.compute.workload.SimWorkload
*/
public interface SimHypervisor : SimWorkload {
/**
+ * The machines running on the hypervisor.
+ */
+ public val vms: Set<SimMachine>
+
+ /**
* Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
*/
public fun canFit(model: SimMachineModel): Boolean
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index 5c67b990..cff70826 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -37,7 +37,7 @@ public interface SimMachineContext {
* The virtual clock tracking simulation time.
*/
public val clock: Clock
-
+
/**
* The metadata associated with the context.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 751873a5..2001a230 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -22,270 +22,19 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.suspendCancellableCoroutine
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.SimMemoryUnit
import org.opendc.simulator.compute.model.SimProcessingUnit
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
-import java.time.Clock
-import java.util.ArrayDeque
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
+import kotlin.coroutines.CoroutineContext
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
- *
- * @param listener The hypervisor listener to use.
*/
-public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> {
- /**
- * The execution context in which the hypervisor runs.
- */
- private lateinit var ctx: SimMachineContext
-
- /**
- * The mapping from pCPU to vCPU.
- */
- private lateinit var vcpus: Array<VCpu?>
-
- /**
- * The available physical CPUs to schedule on.
- */
- private val availableCpus = ArrayDeque<Int>()
-
- override fun canFit(model: SimMachineModel): Boolean = availableCpus.size >= model.cpus.size
-
- override fun createMachine(
- model: SimMachineModel,
- performanceInterferenceModel: PerformanceInterferenceModel?
- ): SimMachine {
- require(canFit(model)) { "Cannot fit machine" }
- return SimVm(model, performanceInterferenceModel)
- }
-
- override fun onStart(ctx: SimMachineContext) {
- this.ctx = ctx
- this.vcpus = arrayOfNulls(ctx.cpus.size)
- this.availableCpus.addAll(ctx.cpus.indices)
- }
-
- override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> {
- return this
+public class SimSpaceSharedHypervisor : SimAbstractHypervisor() {
+ override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean {
+ return switch.inputs.size - switch.outputs.size >= model.cpus.size
}
- override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand {
- return onNext(ctx, 0.0)
- }
-
- override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand {
- val vcpu = vcpus[ctx.resource.id] ?: return SimResourceCommand.Idle()
-
- if (vcpu.isStarted) {
- vcpu.remainingWork = remainingWork
- vcpu.flush()
- } else {
- vcpu.isStarted = true
- vcpu.start()
- }
-
- if (vcpu.hasExited && vcpu != vcpus[ctx.resource.id]) {
- return onNext(ctx, remainingWork)
- }
-
- return vcpu.activeCommand
- }
-
- /**
- * A virtual machine running on the hypervisor.
- *
- * @property model The machine model of the virtual machine.
- * @property performanceInterferenceModel The performance interference model to utilize.
- */
- private inner class SimVm(
- override val model: SimMachineModel,
- val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- ) : SimMachine {
- /**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
- /**
- * A [StateFlow] representing the CPU usage of the simulated machine.
- */
- override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- /**
- * The current active workload.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
- * The physical CPUs that have been allocated.
- */
- private val pCPUs = model.cpus.map { availableCpus.poll() }.toIntArray()
-
- /**
- * The active CPUs of this virtual machine.
- */
- private var cpus: List<VCpu> = emptyList()
-
- /**
- * The execution context in which the workload runs.
- */
- inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val cpus: List<SimProcessingUnit>
- get() = model.cpus
-
- override val memory: List<SimMemoryUnit>
- get() = model.memory
-
- override val clock: Clock
- get() = this@SimSpaceSharedHypervisor.ctx.clock
-
- override fun interrupt(resource: SimResource) {
- TODO()
- }
- }
-
- lateinit var ctx: SimMachineContext
-
- /**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- require(!isTerminated) { "Machine is terminated" }
- require(cont == null) { "Run should not be called concurrently" }
-
- ctx = Context(meta)
- workload.onStart(ctx)
-
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
- try {
- this.cpus = model.cpus.map { model -> VCpu(this, model, workload.getConsumer(ctx, model), ctx.clock) }
-
- for ((index, pCPU) in pCPUs.withIndex()) {
- vcpus[pCPU] = cpus[index]
- this@SimSpaceSharedHypervisor.ctx.interrupt(this@SimSpaceSharedHypervisor.ctx.cpus[pCPU])
- }
- } catch (e: Throwable) {
- cont.resumeWithException(e)
- }
- }
- }
-
- override fun close() {
- isTerminated = true
- for (pCPU in pCPUs) {
- vcpus[pCPU] = null
- availableCpus.add(pCPU)
- }
-
- val cont = cont
- this.cont = null
- cont?.resume(Unit)
- }
-
- /**
- * Update the usage of the VM.
- */
- fun updateUsage() {
- usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.resource.frequency }
- }
-
- /**
- * This method is invoked when one of the CPUs has exited.
- */
- fun onCpuExit() {
- // Check whether all other CPUs have finished
- if (cpus.all { it.hasExited }) {
- val cont = cont
- this.cont = null
- cont?.resume(Unit)
- }
- }
-
- /**
- * This method is invoked when one of the CPUs failed.
- */
- fun onCpuFailure(e: Throwable) {
- // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
- // tasks.
- val cont = cont
- this.cont = null
- cont?.resumeWithException(e)
- }
- }
-
- /**
- * A CPU of the virtual machine.
- */
- private inner class VCpu(
- val vm: SimVm,
- resource: SimProcessingUnit,
- consumer: SimResourceConsumer<SimProcessingUnit>,
- clock: Clock
- ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer) {
- /**
- * Indicates that the vCPU was started.
- */
- var isStarted: Boolean = false
-
- /**
- * The current command that is processed by the vCPU.
- */
- var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The processing speed of the vCPU.
- */
- var speed: Double = 0.0
- set(value) {
- field = value
- vm.updateUsage()
- }
-
- /**
- * The amount of work remaining from the previous consumption.
- */
- var remainingWork: Double = 0.0
-
- /**
- * A flag to indicate that the CPU has exited.
- */
- var hasExited: Boolean = false
-
- override fun onIdle(deadline: Long) {
- speed = 0.0
- activeCommand = SimResourceCommand.Idle(deadline)
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- speed = getSpeed(limit)
- activeCommand = SimResourceCommand.Consume(work, speed, deadline)
- }
-
- override fun onFinish() {
- speed = 0.0
- hasExited = true
- activeCommand = SimResourceCommand.Idle()
- vm.onCpuExit()
- }
-
- override fun onFailure(cause: Throwable) {
- speed = 0.0
- hasExited = true
- activeCommand = SimResourceCommand.Idle()
- vm.onCpuFailure(cause)
- }
-
- override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
- return remainingWork
- }
+ override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> {
+ return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext)
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index 3d49e544..e2044d05 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -28,5 +28,5 @@ package org.opendc.simulator.compute
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor(listener)
+ override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor()
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index edef3843..31f58a0f 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -27,6 +27,7 @@ import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.consumer.SimConsumerBarrier
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -36,10 +37,10 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
private var offset = 0L
private val iterator = trace.iterator()
private var fragment: Fragment? = null
- private lateinit var barrier: SimWorkloadBarrier
+ private lateinit var barrier: SimConsumerBarrier
override fun onStart(ctx: SimMachineContext) {
- barrier = SimWorkloadBarrier(ctx.cpus.size)
+ barrier = SimConsumerBarrier(ctx.cpus.size)
fragment = nextFragment()
offset = ctx.clock.millis()
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index 4b4d7eca..4ac8cf63 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -23,8 +23,9 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -35,24 +36,18 @@ import org.opendc.simulator.compute.model.SimProcessingNode
import org.opendc.simulator.compute.model.SimProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.time.Clock
/**
* Test suite for the [SimHypervisor] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHypervisorTest {
- private lateinit var scope: TestCoroutineScope
- private lateinit var clock: Clock
- private lateinit var machineModel: SimMachineModel
+ private lateinit var model: SimMachineModel
@BeforeEach
fun setUp() {
- scope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(scope)
-
val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
- machineModel = SimMachineModel(
+ model = SimMachineModel(
cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
@@ -62,7 +57,8 @@ internal class SimHypervisorTest {
* Test overcommitting of resources via the hypervisor with a single VM.
*/
@Test
- fun testOvercommittedSingle() {
+ fun testOvercommittedSingle() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val listener = object : SimHypervisor.Listener {
var totalRequestedWork = 0L
var totalGrantedWork = 0L
@@ -83,38 +79,34 @@ internal class SimHypervisorTest {
}
}
- scope.launch {
- val duration = 5 * 60L
- val workloadA =
- SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
- ),
- )
-
- val machine = SimBareMetalMachine(scope, clock, machineModel)
- val hypervisor = SimFairShareHypervisor(listener)
-
- launch {
- machine.run(hypervisor)
- }
-
- yield()
- launch { hypervisor.createMachine(machineModel).run(workloadA) }
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ ),
+ )
+
+ val machine = SimBareMetalMachine(coroutineContext, clock, model)
+ val hypervisor = SimFairShareHypervisor(listener)
+
+ launch {
+ machine.run(hypervisor)
+ println("Hypervisor finished")
}
-
- scope.advanceUntilIdle()
- scope.uncaughtExceptions.forEach { it.printStackTrace() }
+ yield()
+ hypervisor.createMachine(model).run(workloadA)
+ yield()
+ machine.close()
assertAll(
- { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
{ assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
{ assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
{ assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
- { assertEquals(1200000, scope.currentTime) }
+ { assertEquals(1200000, currentTime) }
)
}
@@ -122,7 +114,8 @@ internal class SimHypervisorTest {
* Test overcommitting of resources via the hypervisor with two VMs.
*/
@Test
- fun testOvercommittedDual() {
+ fun testOvercommittedDual() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val listener = object : SimHypervisor.Listener {
var totalRequestedWork = 0L
var totalGrantedWork = 0L
@@ -143,48 +136,53 @@ internal class SimHypervisorTest {
}
}
- scope.launch {
- val duration = 5 * 60L
- val workloadA =
- SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
- ),
- )
- val workloadB =
- SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
- )
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ ),
+ )
+ val workloadB =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 1)
)
+ )
+
+ val machine = SimBareMetalMachine(coroutineContext, clock, model)
+ val hypervisor = SimFairShareHypervisor(listener)
- val machine = SimBareMetalMachine(scope, clock, machineModel)
- val hypervisor = SimFairShareHypervisor(listener)
+ launch {
+ machine.run(hypervisor)
+ }
+ yield()
+ coroutineScope {
launch {
- machine.run(hypervisor)
+ val vm = hypervisor.createMachine(model)
+ vm.run(workloadA)
+ vm.close()
}
-
- yield()
- launch { hypervisor.createMachine(machineModel).run(workloadA) }
- launch { hypervisor.createMachine(machineModel).run(workloadB) }
+ val vm = hypervisor.createMachine(model)
+ vm.run(workloadB)
+ vm.close()
}
-
- scope.advanceUntilIdle()
- scope.uncaughtExceptions.forEach { it.printStackTrace() }
+ yield()
+ machine.close()
+ yield()
assertAll(
- { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
{ assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
{ assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
{ assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
- { assertEquals(1200000, scope.currentTime) }
+ { assertEquals(1200000, currentTime) }
)
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 00efba53..6adc41d0 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
-import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -53,33 +52,35 @@ class SimMachineTest {
}
@Test
- fun testFlopsWorkload() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
+ fun testFlopsWorkload() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
- testScope.runBlockingTest {
+ try {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
// Two cores execute 1000 MFlOps per second (1000 ms)
- assertEquals(1000, testScope.currentTime)
+ assertEquals(1000, currentTime)
+ } finally {
+ machine.close()
}
}
@Test
- fun testUsage() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val machine = SimBareMetalMachine(testScope, clock, machineModel)
+ fun testUsage() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
- testScope.runBlockingTest {
- val res = mutableListOf<Double>()
- val job = launch { machine.usage.toList(res) }
+ val res = mutableListOf<Double>()
+ val job = launch { machine.usage.toList(res) }
+ try {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
job.cancel()
assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" }
+ } finally {
+ machine.close()
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
index 583d989c..8428a0a7 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -25,7 +25,7 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
@@ -38,22 +38,16 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.time.Clock
/**
* A test suite for the [SimSpaceSharedHypervisor].
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimSpaceSharedHypervisorTest {
- private lateinit var scope: TestCoroutineScope
- private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@BeforeEach
fun setUp() {
- scope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(scope)
-
val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1)
machineModel = SimMachineModel(
cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) },
@@ -65,42 +59,45 @@ internal class SimSpaceSharedHypervisorTest {
* Test a trace workload.
*/
@Test
- fun testTrace() {
+ fun testTrace() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val usagePm = mutableListOf<Double>()
val usageVm = mutableListOf<Double>()
- scope.launch {
- val duration = 5 * 60L
- val workloadA =
- SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
- ),
- )
-
- val machine = SimBareMetalMachine(scope, clock, machineModel)
- val hypervisor = SimSpaceSharedHypervisor()
-
- launch { machine.usage.toList(usagePm) }
- launch { machine.run(hypervisor) }
-
- yield()
- launch {
- val vm = hypervisor.createMachine(machineModel)
- launch { vm.usage.toList(usageVm) }
- vm.run(workloadA)
- }
- }
-
- scope.advanceUntilIdle()
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ ),
+ )
+
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ val colA = launch { machine.usage.toList(usagePm) }
+ launch { machine.run(hypervisor) }
+
+ yield()
+
+ val vm = hypervisor.createMachine(machineModel)
+ val colB = launch { vm.usage.toList(usageVm) }
+ vm.run(workloadA)
+ yield()
+
+ vm.close()
+ machine.close()
+ colA.cancel()
+ colB.cancel()
assertAll(
{ assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usagePm) { "Correct PM usage" } },
- { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } },
- { assertEquals(5 * 60L * 4000, scope.currentTime) { "Took enough time" } }
+ // Temporary limitation is that VMs do not emit usage information
+ // { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } },
+ { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } }
)
}
@@ -108,119 +105,111 @@ internal class SimSpaceSharedHypervisorTest {
* Test runtime workload on hypervisor.
*/
@Test
- fun testRuntimeWorkload() {
+ fun testRuntimeWorkload() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val duration = 5 * 60L * 1000
val workload = SimRuntimeWorkload(duration)
- val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
val hypervisor = SimSpaceSharedHypervisor()
- scope.launch {
- launch { machine.run(hypervisor) }
-
- yield()
- launch { hypervisor.createMachine(machineModel).run(workload) }
- }
+ launch { machine.run(hypervisor) }
+ yield()
+ val vm = hypervisor.createMachine(machineModel)
+ vm.run(workload)
+ vm.close()
+ machine.close()
- scope.advanceUntilIdle()
-
- assertEquals(duration, scope.currentTime) { "Took enough time" }
+ assertEquals(duration, currentTime) { "Took enough time" }
}
/**
* Test FLOPs workload on hypervisor.
*/
@Test
- fun testFlopsWorkload() {
+ fun testFlopsWorkload() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+
val duration = 5 * 60L * 1000
val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
- val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
val hypervisor = SimSpaceSharedHypervisor()
- scope.launch {
- launch { machine.run(hypervisor) }
-
- yield()
- launch { hypervisor.createMachine(machineModel).run(workload) }
- }
+ launch { machine.run(hypervisor) }
+ yield()
+ val vm = hypervisor.createMachine(machineModel)
+ vm.run(workload)
+ machine.close()
- scope.advanceUntilIdle()
-
- assertEquals(duration, scope.currentTime) { "Took enough time" }
+ assertEquals(duration, currentTime) { "Took enough time" }
}
/**
* Test two workloads running sequentially.
*/
@Test
- fun testTwoWorkloads() {
+ fun testTwoWorkloads() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val duration = 5 * 60L * 1000
- val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
val hypervisor = SimSpaceSharedHypervisor()
- scope.launch {
- launch { machine.run(hypervisor) }
-
- yield()
- launch {
- val vm = hypervisor.createMachine(machineModel)
- vm.run(SimRuntimeWorkload(duration))
- vm.close()
+ launch { machine.run(hypervisor) }
+ yield()
- val vm2 = hypervisor.createMachine(machineModel)
- vm2.run(SimRuntimeWorkload(duration))
- }
- }
+ val vm = hypervisor.createMachine(machineModel)
+ vm.run(SimRuntimeWorkload(duration))
+ vm.close()
- scope.advanceUntilIdle()
+ val vm2 = hypervisor.createMachine(machineModel)
+ vm2.run(SimRuntimeWorkload(duration))
+ vm2.close()
+ machine.close()
- assertEquals(duration * 2, scope.currentTime) { "Took enough time" }
+ assertEquals(duration * 2, currentTime) { "Took enough time" }
}
/**
* Test concurrent workloads on the machine.
*/
@Test
- fun testConcurrentWorkloadFails() {
- val machine = SimBareMetalMachine(scope, clock, machineModel)
- val hypervisor = SimSpaceSharedHypervisor()
+ fun testConcurrentWorkloadFails() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
- scope.launch {
- launch { machine.run(hypervisor) }
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
- yield()
+ launch { machine.run(hypervisor) }
+ yield()
- hypervisor.createMachine(machineModel)
+ hypervisor.createMachine(machineModel)
- assertAll(
- { assertFalse(hypervisor.canFit(machineModel)) },
- { assertThrows<IllegalStateException> { hypervisor.createMachine(machineModel) } }
- )
- }
+ assertAll(
+ { assertFalse(hypervisor.canFit(machineModel)) },
+ { assertThrows<IllegalArgumentException> { hypervisor.createMachine(machineModel) } }
+ )
- scope.advanceUntilIdle()
+ machine.close()
}
/**
* Test concurrent workloads on the machine.
*/
@Test
- fun testConcurrentWorkloadSucceeds() {
- val machine = SimBareMetalMachine(scope, clock, machineModel)
+ fun testConcurrentWorkloadSucceeds() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val machine = SimBareMetalMachine(coroutineContext, clock, machineModel)
val hypervisor = SimSpaceSharedHypervisor()
- scope.launch {
- launch { machine.run(hypervisor) }
-
- yield()
+ launch { machine.run(hypervisor) }
+ yield()
- hypervisor.createMachine(machineModel).close()
+ hypervisor.createMachine(machineModel).close()
- assertAll(
- { assertTrue(hypervisor.canFit(machineModel)) },
- { assertDoesNotThrow { hypervisor.createMachine(machineModel) } }
- )
- }
+ assertAll(
+ { assertTrue(hypervisor.canFit(machineModel)) },
+ { assertDoesNotThrow { hypervisor.createMachine(machineModel) } }
+ )
- scope.advanceUntilIdle()
+ machine.close()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index f9da74c7..52251bff 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -138,10 +138,11 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
}
try {
- val (timestamp, command) = activeCommand ?: return
+ val activeCommand = activeCommand ?: return
+ val (timestamp, command) = activeCommand
isProcessing = true
- activeCommand = null
+ this.activeCommand = null
val duration = now - timestamp
assert(duration >= 0) { "Flush in the past" }
@@ -153,6 +154,8 @@ public abstract class SimAbstractResourceContext<R : SimResource>(
// 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
if (command.deadline <= now || !isIntermediate) {
next(remainingWork = 0.0)
+ } else {
+ this.activeCommand = activeCommand
}
}
is SimResourceCommand.Consume -> {
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
new file mode 100644
index 00000000..ca23557c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+
+/**
+ * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer].
+ */
+public class SimResourceForwarder<R : SimResource>(override val resource: R) :
+ SimResourceProvider<R>, SimResourceConsumer<R> {
+ /**
+ * The [SimResourceContext] in which the forwarder runs.
+ */
+ private var ctx: SimResourceContext<R>? = null
+
+ /**
+ * A flag to indicate that the forwarder is closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The continuation to resume after consumption.
+ */
+ private var cont: Continuation<Unit>? = null
+
+ /**
+ * The delegate [SimResourceConsumer].
+ */
+ private var delegate: SimResourceConsumer<R>? = null
+
+ /**
+ * A flag to indicate that the delegate was started.
+ */
+ private var hasDelegateStarted: Boolean = false
+
+ /**
+ * The remaining amount of work last cycle.
+ */
+ private var remainingWork: Double = 0.0
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) {
+ check(!isClosed) { "Lifecycle of forwarder has ended" }
+ check(cont == null) { "Run should not be called concurrently" }
+
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ this.delegate = consumer
+
+ cont.invokeOnCancellation { reset() }
+
+ ctx?.interrupt()
+ }
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ override fun close() {
+ isClosed = true
+ interrupt()
+ ctx = null
+ }
+
+ override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ this.ctx = ctx
+
+ return onNext(ctx, 0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ this.remainingWork = remainingWork
+
+ return if (isClosed) {
+ SimResourceCommand.Exit
+ } else if (!hasDelegateStarted) {
+ start()
+ } else {
+ next()
+ }
+ }
+
+ /**
+ * Start the delegate.
+ */
+ private fun start(): SimResourceCommand {
+ val delegate = delegate ?: return SimResourceCommand.Idle()
+ val command = delegate.onStart(checkNotNull(ctx))
+
+ hasDelegateStarted = true
+
+ return forward(command)
+ }
+
+ /**
+ * Obtain the next command to process.
+ */
+ private fun next(): SimResourceCommand {
+ val delegate = delegate
+ return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle())
+ }
+
+ /**
+ * Forward the specified [command].
+ */
+ private fun forward(command: SimResourceCommand): SimResourceCommand {
+ return if (command == SimResourceCommand.Exit) {
+ val cont = checkNotNull(cont)
+
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+ cont.resume(Unit)
+
+ if (isClosed)
+ SimResourceCommand.Exit
+ else
+ start()
+ } else {
+ command
+ }
+ }
+
+ /**
+ * Reset the delegate.
+ */
+ private fun reset() {
+ cont = null
+ delegate = null
+ hasDelegateStarted = false
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 91a745ab..e35aa683 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -37,6 +37,11 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable {
public suspend fun consume(consumer: SimResourceConsumer<R>)
/**
+ * Interrupt the resource.
+ */
+ public fun interrupt()
+
+ /**
* End the lifetime of the resource.
*
* This operation terminates the existing resource consumer.
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 4445df86..540a17c9 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -50,14 +50,32 @@ public class SimResourceSource<R : SimResource>(
get() = _speed
private val _speed = MutableStateFlow(0.0)
+ /**
+ * A flag to indicate that the resource was closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The current active consumer.
+ */
+ private var cont: CancellableContinuation<Unit>? = null
+
+ /**
+ * The [Context] that is currently running.
+ */
+ private var ctx: Context? = null
+
override suspend fun consume(consumer: SimResourceConsumer<R>) {
check(!isClosed) { "Lifetime of resource has ended." }
check(cont == null) { "Run should not be called concurrently" }
try {
return suspendCancellableCoroutine { cont ->
- this.cont = cont
val ctx = Context(consumer, cont)
+
+ this.cont = cont
+ this.ctx = ctx
+
ctx.start()
cont.invokeOnCancellation {
ctx.stop()
@@ -65,6 +83,7 @@ public class SimResourceSource<R : SimResource>(
}
} finally {
cont = null
+ ctx = null
}
}
@@ -72,17 +91,12 @@ public class SimResourceSource<R : SimResource>(
isClosed = true
cont?.cancel()
cont = null
+ ctx = null
}
- /**
- * A flag to indicate that the resource was closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * The current active consumer.
- */
- private var cont: CancellableContinuation<Unit>? = null
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
/**
* Internal implementation of [SimResourceContext] for this class.
@@ -113,7 +127,7 @@ public class SimResourceSource<R : SimResource>(
speed = getSpeed(limit)
val until = min(deadline, clock.millis() + getDuration(work, speed))
- scheduler.startSingleTimerTo(this, until) { flush() }
+ scheduler.startSingleTimerTo(this, until, ::flush)
}
override fun onFinish() {
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
new file mode 100644
index 00000000..cd1af3fc
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers.
+ */
+public interface SimResourceSwitch<R : SimResource> : AutoCloseable {
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
+ public val outputs: Set<SimResourceProvider<R>>
+
+ /**
+ * The input resources that will be switched between the output providers.
+ */
+ public val inputs: Set<SimResourceProvider<R>>
+
+ /**
+ * Add an output to the switch represented by [resource].
+ */
+ public fun addOutput(resource: R): SimResourceProvider<R>
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ public fun addInput(input: SimResourceProvider<R>)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
new file mode 100644
index 00000000..060d0ea2
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.launch
+import java.util.ArrayDeque
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
+ * a single output is directly connected to an input and that the switch can only support as much outputs as inputs.
+ */
+public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ private val _outputs = mutableSetOf<SimResourceProvider<R>>()
+ override val outputs: Set<SimResourceProvider<R>>
+ get() = _outputs
+
+ private val availableResources = ArrayDeque<SimResourceForwarder<R>>()
+ private val _inputs = mutableSetOf<SimResourceProvider<R>>()
+ override val inputs: Set<SimResourceProvider<R>>
+ get() = _inputs
+
+ override fun addOutput(resource: R): SimResourceProvider<R> {
+ check(availableResources.isNotEmpty()) { "No capacity to serve request" }
+ val forwarder = availableResources.poll()
+ val output = Provider(resource, forwarder)
+ _outputs += output
+ return output
+ }
+
+ override fun addInput(input: SimResourceProvider<R>) {
+ if (input in inputs) {
+ return
+ }
+
+ val forwarder = SimResourceForwarder(input.resource)
+
+ scope.launch { input.consume(forwarder) }
+
+ _inputs += input
+ availableResources += forwarder
+ }
+
+ override fun close() {
+ scope.cancel()
+ }
+
+ private inner class Provider(
+ override val resource: R,
+ private val forwarder: SimResourceForwarder<R>
+ ) : SimResourceProvider<R> {
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer)
+
+ override fun interrupt() {
+ forwarder.interrupt()
+ }
+
+ override fun close() {
+ _outputs -= this
+ availableResources += forwarder
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
new file mode 100644
index 00000000..bcf76d3c
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -0,0 +1,508 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import org.opendc.simulator.resources.consumer.SimConsumerBarrier
+import java.time.Clock
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
+ * fair sharing.
+ */
+public class SimResourceSwitchMaxMin<R : SimResource>(
+ private val clock: Clock,
+ context: CoroutineContext,
+ private val listener: Listener<R>? = null
+) : SimResourceSwitch<R> {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ private val inputConsumers = mutableSetOf<InputConsumer>()
+ private val _outputs = mutableSetOf<OutputProvider>()
+ override val outputs: Set<SimResourceProvider<R>>
+ get() = _outputs
+
+ private val _inputs = mutableSetOf<SimResourceProvider<R>>()
+ override val inputs: Set<SimResourceProvider<R>>
+ get() = _inputs
+
+ /**
+ * The commands to submit to the underlying host.
+ */
+ private val commands = mutableMapOf<R, SimResourceCommand>()
+
+ /**
+ * The active output contexts.
+ */
+ private val outputContexts: MutableList<OutputContext> = mutableListOf()
+
+ /**
+ * The total amount of remaining work (of all pCPUs).
+ */
+ private var totalRemainingWork: Double = 0.0
+
+ /**
+ * The total speed requested by the vCPUs.
+ */
+ private var totalRequestedSpeed = 0.0
+
+ /**
+ * The total amount of work requested by the vCPUs.
+ */
+ private var totalRequestedWork = 0.0
+
+ /**
+ * The total allocated speed for the vCPUs.
+ */
+ private var totalAllocatedSpeed = 0.0
+
+ /**
+ * The total allocated work requested for the vCPUs.
+ */
+ private var totalAllocatedWork = 0.0
+
+ /**
+ * The amount of work that could not be performed due to over-committing resources.
+ */
+ private var totalOvercommittedWork = 0.0
+
+ /**
+ * The amount of work that was lost due to interference.
+ */
+ private var totalInterferedWork = 0.0
+
+ /**
+ * A flag to indicate that the scheduler has submitted work that has not yet been completed.
+ */
+ private var isDirty: Boolean = false
+
+ /**
+ * The scheduler barrier.
+ */
+ private var barrier: SimConsumerBarrier = SimConsumerBarrier(0)
+
+ /**
+ * Add an output to the switch represented by [resource].
+ */
+ override fun addOutput(resource: R): SimResourceProvider<R> {
+ val provider = OutputProvider(resource)
+ _outputs.add(provider)
+ return provider
+ }
+
+ /**
+ * Add the specified [input] to the switch.
+ */
+ override fun addInput(input: SimResourceProvider<R>) {
+ val consumer = InputConsumer(input)
+ _inputs.add(input)
+ inputConsumers += consumer
+ }
+
+ override fun close() {
+ scope.cancel()
+ }
+
+ /**
+ * Indicate that the workloads should be re-scheduled.
+ */
+ private fun schedule() {
+ isDirty = true
+ interruptAll()
+ }
+
+ /**
+ * Schedule the work over the physical CPUs.
+ */
+ private fun doSchedule() {
+ // If there is no work yet, mark all inputs as idle.
+ if (outputContexts.isEmpty()) {
+ commands.replaceAll { _, _ -> SimResourceCommand.Idle() }
+ interruptAll()
+ }
+
+ val maxUsage = inputs.sumByDouble { it.resource.capacity }
+ var duration: Double = Double.MAX_VALUE
+ var deadline: Long = Long.MAX_VALUE
+ var availableSpeed = maxUsage
+ var totalRequestedSpeed = 0.0
+ var totalRequestedWork = 0.0
+
+ // Sort the outputs based on their requested usage
+ // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ outputContexts.sort()
+
+ // Divide the available input capacity fairly across the outputs using max-min fair sharing
+ val outputIterator = outputContexts.listIterator()
+ var remaining = outputContexts.size
+ while (outputIterator.hasNext()) {
+ val output = outputIterator.next()
+ val availableShare = availableSpeed / remaining--
+
+ when (val command = output.activeCommand) {
+ is SimResourceCommand.Idle -> {
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ output.actualSpeed = 0.0
+ }
+ is SimResourceCommand.Consume -> {
+ val grantedSpeed = min(output.allowedSpeed, availableShare)
+
+ // Take into account the minimum deadline of this slice before we possible continue
+ deadline = min(deadline, command.deadline)
+
+ // Ignore idle computation
+ if (grantedSpeed <= 0.0 || command.work <= 0.0) {
+ output.actualSpeed = 0.0
+ continue
+ }
+
+ totalRequestedSpeed += command.limit
+ totalRequestedWork += command.work
+
+ output.actualSpeed = grantedSpeed
+ availableSpeed -= grantedSpeed
+
+ // The duration that we want to run is that of the shortest request from an output
+ duration = min(duration, command.work / grantedSpeed)
+ }
+ SimResourceCommand.Exit -> {
+ // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ outputIterator.remove()
+ }
+ }
+ }
+
+ // Round the duration to milliseconds
+ duration = ceil(duration * 1000) / 1000
+
+ assert(deadline >= clock.millis()) { "Deadline already passed" }
+
+ val totalAllocatedSpeed = maxUsage - availableSpeed
+ var totalAllocatedWork = 0.0
+ availableSpeed = totalAllocatedSpeed
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (input in inputs.sortedByDescending { it.resource.capacity }) {
+ val maxResourceUsage = input.resource.capacity
+ val fraction = maxResourceUsage / maxUsage
+ val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction)
+ val grantedWork = duration * grantedSpeed
+
+ commands[input.resource] =
+ if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+
+ totalAllocatedWork += grantedWork
+ availableSpeed -= grantedSpeed
+ }
+
+ this.totalRequestedSpeed = totalRequestedSpeed
+ this.totalRequestedWork = totalRequestedWork
+ this.totalAllocatedSpeed = totalAllocatedSpeed
+ this.totalAllocatedWork = totalAllocatedWork
+
+ interruptAll()
+ }
+
+ /**
+ * Flush the progress of the vCPUs.
+ */
+ private fun flushGuests() {
+ // Flush all the outputs work
+ for (output in outputContexts) {
+ output.flush(isIntermediate = true)
+ }
+
+ // Report metrics
+ listener?.onSliceFinish(
+ this,
+ totalRequestedWork.toLong(),
+ (totalAllocatedWork - totalRemainingWork).toLong(),
+ totalOvercommittedWork.toLong(),
+ totalInterferedWork.toLong(),
+ totalRequestedSpeed,
+ totalAllocatedSpeed
+ )
+ totalRemainingWork = 0.0
+ totalInterferedWork = 0.0
+ totalOvercommittedWork = 0.0
+
+ // Force all inputs to re-schedule their work.
+ doSchedule()
+ }
+
+ /**
+ * Interrupt all inputs.
+ */
+ private fun interruptAll() {
+ for (input in inputConsumers) {
+ input.interrupt()
+ }
+ }
+
+ /**
+ * Event listener for hypervisor events.
+ */
+ public interface Listener<R : SimResource> {
+ /**
+ * This method is invoked when a slice is finished.
+ */
+ public fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin<R>,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ )
+ }
+
+ /**
+ * An internal [SimResourceProvider] implementation for switch outputs.
+ */
+ private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> {
+ /**
+ * A flag to indicate that the resource was closed.
+ */
+ private var isClosed: Boolean = false
+
+ /**
+ * The current active consumer.
+ */
+ private var cont: CancellableContinuation<Unit>? = null
+
+ /**
+ * The [OutputContext] that is currently running.
+ */
+ private var ctx: OutputContext? = null
+
+ override suspend fun consume(consumer: SimResourceConsumer<R>) {
+ check(!isClosed) { "Lifetime of resource has ended." }
+ check(cont == null) { "Run should not be called concurrently" }
+
+ try {
+ return suspendCancellableCoroutine { cont ->
+ val ctx = OutputContext(resource, consumer, cont)
+ ctx.start()
+ cont.invokeOnCancellation {
+ ctx.stop()
+ }
+
+ this.cont = cont
+ this.ctx = ctx
+
+ outputContexts += ctx
+ schedule()
+ }
+ } finally {
+ cont = null
+ ctx = null
+ }
+ }
+
+ override fun close() {
+ isClosed = true
+ cont?.cancel()
+ cont = null
+ ctx = null
+ _outputs.remove(this)
+ }
+
+ override fun interrupt() {
+ ctx?.interrupt()
+ }
+ }
+
+ /**
+ * A [SimAbstractResourceContext] for the output resources.
+ */
+ private inner class OutputContext(
+ resource: R,
+ consumer: SimResourceConsumer<R>,
+ private val cont: Continuation<Unit>
+ ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> {
+ /**
+ * The current command that is processed by the vCPU.
+ */
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
+
+ /**
+ * The processing speed that is allowed by the model constraints.
+ */
+ var allowedSpeed: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
+
+ /**
+ * A flag to indicate that the CPU has exited.
+ */
+ var hasExited: Boolean = false
+
+ override fun onIdle(deadline: Long) {
+ allowedSpeed = 0.0
+ activeCommand = SimResourceCommand.Idle(deadline)
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ allowedSpeed = getSpeed(limit)
+ activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ }
+
+ override fun onFinish() {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ cont.resume(Unit)
+ }
+
+ override fun onFailure(cause: Throwable) {
+ hasExited = true
+ activeCommand = SimResourceCommand.Exit
+ cont.resumeWithException(cause)
+ }
+
+ override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double {
+ // Apply performance interference model
+ val performanceScore = 1.0
+
+ // Compute the remaining amount of work
+ val remainingWork = if (work > 0.0) {
+ // Compute the fraction of compute time allocated to the VM
+ val fraction = actualSpeed / totalAllocatedSpeed
+
+ // Compute the work that was actually granted to the VM.
+ val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
+ val processed = processingAvailable * performanceScore
+
+ val interferedWork = processingAvailable - processed
+
+ totalInterferedWork += interferedWork
+
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+
+ if (!isInterrupted) {
+ totalOvercommittedWork += remainingWork
+ }
+
+ return remainingWork
+ }
+
+ override fun interrupt() {
+ // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
+ // to infinite recursion.
+ if (isProcessing) {
+ return
+ }
+
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ }
+
+ override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
+ }
+
+ /**
+ * An internal [SimResourceConsumer] implementation for switch inputs.
+ */
+ private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> {
+ /**
+ * The resource context of the consumer.
+ */
+ private lateinit var ctx: SimResourceContext<R>
+
+ init {
+ scope.launch {
+ try {
+ barrier = SimConsumerBarrier(barrier.parties + 1)
+ input.consume(this@InputConsumer)
+ } catch (e: CancellationException) {
+ // Cancel gracefully
+ throw e
+ } catch (e: Throwable) {
+ e.printStackTrace()
+ } finally {
+ barrier = SimConsumerBarrier(barrier.parties - 1)
+ inputConsumers -= this@InputConsumer
+ _inputs -= input
+ }
+ }
+ }
+
+ /**
+ * Interrupt the consumer
+ */
+ fun interrupt() {
+ ctx.interrupt()
+ }
+
+ override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand {
+ this.ctx = ctx
+ return commands[ctx.resource] ?: SimResourceCommand.Idle()
+ }
+
+ override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand {
+ totalRemainingWork += remainingWork
+ val isLast = barrier.enter()
+
+ // Flush the progress of the guest after the barrier has been reached.
+ if (isLast && isDirty) {
+ isDirty = false
+ flushGuests()
+ }
+
+ return if (isDirty) {
+ // Wait for the scheduler determine the work after the barrier has been reached by all CPUs.
+ SimResourceCommand.Idle()
+ } else {
+ // Indicate that the scheduler needs to run next call.
+ if (isLast) {
+ isDirty = true
+ }
+
+ commands[ctx.resource] ?: SimResourceCommand.Idle()
+ }
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
index 45a299be..7aa5a5aa 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
@@ -20,13 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload
+package org.opendc.simulator.resources.consumer
/**
- * The [SimWorkloadBarrier] is a barrier that allows workloads to wait for a select number of CPUs to complete, before
- * proceeding its operation.
+ * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to
+ * complete, before proceeding its operation.
*/
-public class SimWorkloadBarrier(public val parties: Int) {
+public class SimConsumerBarrier(public val parties: Int) {
private var counter = 0
/**
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
new file mode 100644
index 00000000..03a3cebd
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.consumer
+
+import org.opendc.simulator.resources.SimResource
+import org.opendc.simulator.resources.SimResourceCommand
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+
+/**
+ * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
+ * consumption for some period of time.
+ */
+public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> {
+ private val iterator = trace.iterator()
+
+ override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand {
+ return onNext(ctx, 0.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand {
+ return if (iterator.hasNext()) {
+ val now = ctx.clock.millis()
+ val fragment = iterator.next()
+ val work = (fragment.duration / 1000) * fragment.usage
+ val deadline = now + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ if (work > 0.0)
+ SimResourceCommand.Consume(work, fragment.usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+
+ /**
+ * A fragment of the workload.
+ */
+ public data class Fragment(val duration: Long, val usage: Double)
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
new file mode 100644
index 00000000..e7642dc1
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+
+/**
+ * A test suite for the [SimAbstractResourceContext] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceContextTest {
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testFlushWithoutCommand() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+
+ val resource = SimCpu(4200.0)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(10.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ }
+
+ override fun onFinish() {
+ }
+
+ override fun onFailure(cause: Throwable) {
+ }
+ }
+
+ context.flush()
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val resource = SimCpu(4200.0)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(10.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ var counter = 0
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ counter++
+ }
+
+ override fun onFinish() {
+ }
+
+ override fun onFailure(cause: Throwable) {
+ }
+ }
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ context.flush(isIntermediate = true)
+ assertEquals(2, counter)
+ }
+
+ @Test
+ fun testIntermediateFlushIdle() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val resource = SimCpu(4200.0)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle(10)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ var counter = 0
+ var isFinished = false
+ val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) {
+ override fun onIdle(deadline: Long) {
+ counter++
+ }
+
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {
+ }
+
+ override fun onFinish() {
+ isFinished = true
+ }
+
+ override fun onFailure(cause: Throwable) {
+ }
+ }
+
+ context.start()
+ delay(5)
+ context.flush(isIntermediate = true)
+ delay(5)
+ context.flush(isIntermediate = true)
+
+ assertAll(
+ { assertEquals(1, counter) },
+ { assertTrue(isFinished) }
+ )
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
new file mode 100644
index 00000000..ced1bd98
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceForwarder] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceForwarderTest {
+
+ data class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testExitImmediately() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ })
+ forwarder.close()
+ scheduler.close()
+ }
+
+ @Test
+ fun testExit() = runBlockingTest {
+ val forwarder = SimResourceForwarder(SimCpu(1000.0))
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(SimCpu(2000.0), clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ })
+
+ forwarder.close()
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 8b380efb..4f7825fc 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -24,38 +24,27 @@ package org.opendc.simulator.resources
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
-import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
-import java.time.Clock
/**
- * A test suite for the [SimResourceScheduler] class.
+ * A test suite for the [SimResourceSource] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceSourceTest {
-
- private lateinit var scope: TestCoroutineScope
- private lateinit var clock: Clock
-
data class SimCpu(val speed: Double) : SimResource {
override val capacity: Double
get() = speed
}
- @BeforeEach
- fun setUp() {
- scope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(scope)
- }
-
@Test
- fun testSpeed() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testSpeed() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -67,21 +56,25 @@ class SimResourceSourceTest {
}
}
- scope.runBlockingTest {
+ try {
val res = mutableListOf<Double>()
val job = launch { provider.speed.toList(res) }
provider.consume(consumer)
job.cancel()
- assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" }
+ assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
}
}
@Test
- fun testSpeedLimit() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testSpeedLimit() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -93,21 +86,29 @@ class SimResourceSourceTest {
}
}
- scope.runBlockingTest {
+ try {
val res = mutableListOf<Double>()
val job = launch { provider.speed.toList(res) }
provider.consume(consumer)
job.cancel()
- assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" }
+ assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
}
}
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or
+ * [SimResourceConsumer.onNext].
+ */
@Test
- fun testInterrupt() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testIntermediateInterrupt() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -120,17 +121,52 @@ class SimResourceSourceTest {
}
}
- assertDoesNotThrow {
- scope.runBlockingTest {
- provider.consume(consumer)
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+ lateinit var resCtx: SimResourceContext<SimCpu>
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ resCtx = ctx
+ return SimResourceCommand.Consume(4.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ assertEquals(0.0, remainingWork)
+ return SimResourceCommand.Exit
}
}
+
+ try {
+ launch {
+ yield()
+ resCtx.interrupt()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, currentTime)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
}
@Test
- fun testFailure() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testFailure() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -142,17 +178,21 @@ class SimResourceSourceTest {
}
}
- assertThrows<IllegalStateException> {
- scope.runBlockingTest {
+ try {
+ assertThrows<IllegalStateException> {
provider.consume(consumer)
}
+ } finally {
+ scheduler.close()
+ provider.close()
}
}
@Test
- fun testExceptionPropagationOnNext() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testExceptionPropagationOnNext() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -164,15 +204,21 @@ class SimResourceSourceTest {
}
}
- assertThrows<IllegalStateException> {
- scope.runBlockingTest { provider.consume(consumer) }
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
}
}
@Test
- fun testConcurrentConsumption() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testConcurrentConsumption() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -184,18 +230,24 @@ class SimResourceSourceTest {
}
}
- assertThrows<IllegalStateException> {
- scope.runBlockingTest {
- launch { provider.consume(consumer) }
- launch { provider.consume(consumer) }
+ try {
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
}
+ } finally {
+ scheduler.close()
+ provider.close()
}
}
@Test
- fun testClosedConsumption() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testClosedConsumption() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -207,18 +259,22 @@ class SimResourceSourceTest {
}
}
- assertThrows<IllegalStateException> {
- scope.runBlockingTest {
+ try {
+ assertThrows<IllegalStateException> {
provider.close()
provider.consume(consumer)
}
+ } finally {
+ scheduler.close()
+ provider.close()
}
}
@Test
- fun testCloseDuringConsumption() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testCloseDuringConsumption() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -230,19 +286,23 @@ class SimResourceSourceTest {
}
}
- scope.runBlockingTest {
+ try {
launch { provider.consume(consumer) }
delay(500)
provider.close()
- }
- assertEquals(500, scope.currentTime)
+ assertEquals(500, currentTime)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
}
@Test
- fun testIdle() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
+ fun testIdle() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
val consumer = object : SimResourceConsumer<SimCpu> {
override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
@@ -254,31 +314,40 @@ class SimResourceSourceTest {
}
}
- scope.runBlockingTest {
+ try {
provider.consume(consumer)
- }
- assertEquals(500, scope.currentTime)
+ assertEquals(500, currentTime)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
}
@Test
fun testInfiniteSleep() {
- val resource = SimCpu(4200.0)
- val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock))
-
- val consumer = object : SimResourceConsumer<SimCpu> {
- override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
- return SimResourceCommand.Idle()
- }
-
- override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
- return SimResourceCommand.Exit
- }
- }
-
assertThrows<IllegalStateException> {
- scope.runBlockingTest {
- provider.consume(consumer)
+ runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler)
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Idle()
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
new file mode 100644
index 00000000..ca6558bf
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+import java.lang.IllegalStateException
+
+/**
+ * Test suite for the [SimResourceSwitchExclusive] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchExclusiveTest {
+ class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(SimCpu(3200.0))
+ val job = launch { source.speed.toList(speed) }
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ job.cancel()
+ provider.close()
+ }
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration, currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration * 2, currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(duration.toDouble(), 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext)
+ val source = SimResourceSource(SimCpu(3200.0), clock, scheduler)
+
+ switch.addInput(source)
+
+ switch.addOutput(SimCpu(3200.0))
+ assertThrows<IllegalStateException> { switch.addOutput(SimCpu(3200.0)) }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
new file mode 100644
index 00000000..698c1700
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitch] implementations
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchMaxMinTest {
+ class SimCpu(val speed: Double) : SimResource {
+ override val capacity: Double
+ get() = speed
+ }
+
+ @Test
+ fun testSmoke() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext)
+
+ val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) }
+ sources.forEach { switch.addInput(it) }
+
+ val provider = switch.addOutput(SimCpu(1000.0))
+
+ val consumer = object : SimResourceConsumer<SimCpu> {
+ override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand {
+ return SimResourceCommand.Consume(1.0, 1.0)
+ }
+
+ override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin<SimCpu>,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val provider = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler))
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+
+ assertAll(
+ { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, currentTime) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin<SimCpu>,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3100.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener)
+ val providerA = switch.addOutput(SimCpu(3200.0))
+ val providerB = switch.addOutput(SimCpu(3200.0))
+
+ try {
+ switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler))
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ assertAll(
+ { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, currentTime) }
+ )
+ }
+}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index 9f40f26a..49964938 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -22,24 +22,28 @@
package org.opendc.utils
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.sendBlocking
-import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
import java.time.Clock
import java.util.*
+import kotlin.coroutines.CoroutineContext
import kotlin.math.max
/**
* A TimerScheduler facilitates scheduled execution of future tasks.
*
- * @property coroutineScope The [CoroutineScope] to run the tasks in.
+ * @property context The [CoroutineContext] to run the tasks with.
* @property clock The clock to keep track of the time.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, private val clock: Clock) : AutoCloseable {
+public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable {
+ /**
+ * The scope in which the scheduler runs.
+ */
+ private val scope = CoroutineScope(context + Job())
+
/**
* A priority queue containing the tasks to be scheduled in the future.
*/
@@ -58,7 +62,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
/**
* The scheduling job.
*/
- private val job = coroutineScope.launch {
+ private val job = scope.launch {
val timers = timers
val queue = queue
val clock = clock
@@ -71,7 +75,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
onTimeout(delay) {
- while (queue.isNotEmpty()) {
+ while (queue.isNotEmpty() && isActive) {
val timer = queue.peek()
val timestamp = clock.millis()
@@ -86,7 +90,11 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
if (!timer.isCancelled) {
timers.remove(timer.key)
- timer()
+ try {
+ timer()
+ } catch (e: Throwable) {
+ Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e)
+ }
}
}
@@ -101,7 +109,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva
*/
override fun close() {
cancelAll()
- job.cancel()
+ scope.cancel()
}
/**
diff --git a/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
index 3a4acc90..1fcb5d38 100644
--- a/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
+++ b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt
@@ -38,7 +38,7 @@ internal class TimerSchedulerTest {
fun testBasicTimer() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
scheduler.close()
@@ -51,7 +51,7 @@ internal class TimerSchedulerTest {
fun testCancelNonExisting() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.cancel(1)
scheduler.close()
@@ -62,7 +62,7 @@ internal class TimerSchedulerTest {
fun testCancelExisting() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
assertFalse(false)
@@ -81,7 +81,7 @@ internal class TimerSchedulerTest {
fun testCancelAll() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
assertFalse(false)
@@ -99,7 +99,7 @@ internal class TimerSchedulerTest {
fun testOverride() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
assertFalse(false)
@@ -117,7 +117,7 @@ internal class TimerSchedulerTest {
fun testStopped() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.close()
@@ -133,7 +133,7 @@ internal class TimerSchedulerTest {
fun testNegativeDelay() {
runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val scheduler = TimerScheduler<Int>(this, clock)
+ val scheduler = TimerScheduler<Int>(coroutineContext, clock)
assertThrows<IllegalArgumentException> {
scheduler.startSingleTimer(1, -1) {