diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-17 16:23:48 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-17 16:23:48 +0100 |
| commit | bb3b8e207a08edff81b8c2fe30b476c94bfea086 (patch) | |
| tree | ee739cf4092a2b807e0043bed7cae72cff7b6bac /simulator | |
| parent | 9ab482d0afd773703f78d51a2ba8a160896f03c6 (diff) | |
simulator: Make hypervisors generic for the resource type
This change moves the hypervisor implementations to the
opendc-simulator-resources module and makes them generic to the resource
type that is being used (e.g., CPU, disk or networking).
Diffstat (limited to 'simulator')
32 files changed, 2232 insertions, 1118 deletions
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 2c38f7cb..aa7e0aa1 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -125,7 +125,7 @@ public class ComputeServiceImpl( /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ - private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope, clock) + private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) override val hosts: Set<Host> get() = hostToView.keys diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 0da81152..9cc1bf54 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -84,7 +84,7 @@ public class SimHost( /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(scope, clock, model) + public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model) /** * The hypervisor to run multiple workloads. @@ -206,6 +206,7 @@ public class SimHost( override fun close() { scope.cancel() + machine.close() _state = HostState.DOWN } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a45ab9fc..6929b06c 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -136,8 +136,8 @@ internal class SimHostTest { assertAll( { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, - { assertEquals(4197600, requestedWork, "Requested work does not match") }, - { assertEquals(3057600, grantedWork, "Granted work does not match") }, + { assertEquals(4273200, requestedWork, "Requested work does not match") }, + { assertEquals(3133200, grantedWork, "Granted work does not match") }, { assertEquals(1140000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1200006, scope.currentTime) } ) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 4e6cfddc..59ce895f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.yield import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -142,8 +143,8 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1678587333640, monitor.totalRequestedBurst) }, - { assertEquals(438118200924, monitor.totalGrantedBurst) }, + { assertEquals(1707132711051, monitor.totalRequestedBurst) }, + { assertEquals(457881474296, monitor.totalGrantedBurst) }, { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) @@ -176,6 +177,8 @@ class CapelinIntegrationTest { monitor ) + yield() + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") scheduler.close() @@ -186,8 +189,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(711464322955, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(175226276978, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt new file mode 100644 index 00000000..a99b082a --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.* +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Abstract implementation of the [SimHypervisor] interface. + */ +public abstract class SimAbstractHypervisor : SimHypervisor { + /** + * The machine on which the hypervisor runs. + */ + private lateinit var context: SimMachineContext + + /** + * The resource switch to use. + */ + private lateinit var switch: SimResourceSwitch<SimProcessingUnit> + + /** + * The virtual machines running on this hypervisor. + */ + private val _vms = mutableSetOf<VirtualMachine>() + override val vms: Set<SimMachine> + get() = _vms + + /** + * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. + */ + public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> + + /** + * Check whether the specified machine model fits on this hypervisor. + */ + public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean + + override fun canFit(model: SimMachineModel): Boolean { + return canFit(model, switch) + } + + override fun createMachine( + model: SimMachineModel, + performanceInterferenceModel: PerformanceInterferenceModel? + ): SimMachine { + require(canFit(model)) { "Machine does not fit" } + val vm = VirtualMachine(model, performanceInterferenceModel) + _vms.add(vm) + return vm + } + + /** + * A virtual machine running on the hypervisor. + * + * @property model The machine model of the virtual machine. + * @property performanceInterferenceModel The performance interference model to utilize. + */ + private inner class VirtualMachine( + override val model: SimMachineModel, + val performanceInterferenceModel: PerformanceInterferenceModel? = null, + ) : SimMachine { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ + override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) + + /** + * A flag to indicate that the machine is terminated. + */ + private var isTerminated = false + + /** + * The vCPUs of the machine. + */ + private val cpus: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>> = model.cpus.associateWith { switch.addOutput(it) } + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { + coroutineScope { + require(!isTerminated) { "Machine is terminated" } + + val ctx = object : SimMachineContext { + override val cpus: List<SimProcessingUnit> + get() = model.cpus + + override val memory: List<SimMemoryUnit> + get() = model.memory + + override val clock: Clock + get() = this@SimAbstractHypervisor.context.clock + + override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext) + + override fun interrupt(resource: SimResource) { + requireNotNull(this@VirtualMachine.cpus[resource]).interrupt() + } + } + + workload.onStart(ctx) + + for ((cpu, provider) in cpus) { + launch { + provider.consume(workload.getConsumer(ctx, cpu)) + } + } + } + } + + /** + * Terminate this VM instance. + */ + override fun close() { + if (!isTerminated) { + cpus.forEach { (_, provider) -> provider.close() } + _vms.remove(this) + } + + isTerminated = true + } + } + + override fun onStart(ctx: SimMachineContext) { + context = ctx + switch = createSwitch(ctx) + } + + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + val forwarder = SimResourceForwarder(cpu) + switch.addInput(forwarder) + return forwarder + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt new file mode 100644 index 00000000..1bdbb7e8 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.resources.SimResourceSource +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Abstract implementation of the [SimMachine] interface. + * + * @param context The [CoroutineContext] in which the machine runs. + */ +public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine { + private val _usage = MutableStateFlow(0.0) + override val usage: StateFlow<Double> + get() = _usage + + /** + * A flag to indicate that the machine is terminated. + */ + private var isTerminated = false + + /** + * The [CoroutineContext] to run in. + */ + protected abstract val context: CoroutineContext + + /** + * The resources allocated for this machine. + */ + protected abstract val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> + + /** + * The execution context in which the workload runs. + */ + private inner class Context( + val sources: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>>, + override val meta: Map<String, Any> + ) : SimMachineContext { + override val clock: Clock + get() = this@SimAbstractMachine.clock + + override val cpus: List<SimProcessingUnit> = model.cpus + + override val memory: List<SimMemoryUnit> = model.memory + + override fun interrupt(resource: SimResource) { + checkNotNull(sources[resource]) { "Invalid resource" }.interrupt() + } + } + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) { + val resources = resources + require(!isTerminated) { "Machine is terminated" } + val ctx = Context(resources, meta + mapOf("coroutine-context" to context)) + val totalCapacity = model.cpus.sumByDouble { it.frequency } + + workload.onStart(ctx) + + for ((cpu, source) in resources) { + val consumer = workload.getConsumer(ctx, cpu) + val job = source.speed + .onEach { + _usage.value = resources.values.sumByDouble { it.speed.value } / totalCapacity + } + .launchIn(this) + + launch { + source.consume(consumer) + job.cancel() + } + } + } + + override fun close() { + if (!isTerminated) { + resources.forEach { (_, provider) -> provider.close() } + } else { + isTerminated = true + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index b1d1c0b7..79982ea8 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -23,17 +23,10 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import org.opendc.simulator.compute.model.SimMemoryUnit import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* import org.opendc.utils.TimerScheduler import java.time.Clock -import java.util.* import kotlin.coroutines.* /** @@ -42,83 +35,34 @@ import kotlin.coroutines.* * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For * example. the class expects only a single concurrent call to [run]. * - * @param coroutineScope The [CoroutineScope] to run the simulated workload in. + * @param context The [CoroutineContext] to run the simulated workload in. * @param clock The virtual clock to track the simulation time. * @param model The machine model to simulate. */ @OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( - private val coroutineScope: CoroutineScope, + context: CoroutineContext, private val clock: Clock, override val model: SimMachineModel -) : SimMachine { - private val _usage = MutableStateFlow(0.0) - override val usage: StateFlow<Double> - get() = _usage - +) : SimAbstractMachine(clock) { /** - * A flag to indicate that the machine is terminated. + * The [Job] associated with this machine. */ - private var isTerminated = false + private val job = Job() - /** - * The [TimerScheduler] to use for scheduling the interrupts. - */ - private val scheduler = TimerScheduler<Any>(coroutineScope, clock) + override val context: CoroutineContext = context + job /** - * The execution context in which the workload runs. - */ - private inner class Context(val map: Map<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>, - override val meta: Map<String, Any>) : SimMachineContext { - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override val cpus: List<SimProcessingUnit> = model.cpus - - override val memory: List<SimMemoryUnit> = model.memory - - override fun interrupt(resource: SimResource) { - val context = map[resource] - checkNotNull(context) { "Invalid resource" } - context.interrupt() - } - } - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The [TimerScheduler] to use for scheduling the interrupts. */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = coroutineScope { - require(!isTerminated) { "Machine is terminated" } - val map = mutableMapOf<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>() - val ctx = Context(map, meta) - val sources = model.cpus.map { SimResourceSource(it, clock, scheduler) } - val totalCapacity = model.cpus.sumByDouble { it.frequency } + private val scheduler = TimerScheduler<Any>(this.context, clock) - workload.onStart(ctx) - - for (source in sources) { - val consumer = workload.getConsumer(ctx, source.resource) - val job = source.speed - .onEach { - _usage.value = sources.sumByDouble { it.speed.value } / totalCapacity - } - .launchIn(this) - - launch { - source.consume(object : SimResourceConsumer<SimProcessingUnit> by consumer { - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - map[ctx.resource] = ctx - return consumer.onStart(ctx) - } - }) - job.cancel() - } - } - } + override val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> = + model.cpus.associateWith { SimResourceSource(it, clock, scheduler) } override fun close() { - isTerminated = true + super.close() scheduler.close() + job.cancel() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 12b3b428..bb97192d 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -22,22 +22,10 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.suspendCancellableCoroutine -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.SimMemoryUnit import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.compute.workload.SimWorkloadBarrier import org.opendc.simulator.resources.* -import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min +import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single @@ -45,482 +33,27 @@ import kotlin.math.min * * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> { - - override fun onStart(ctx: SimMachineContext) { - this.ctx = ctx - this.commands = Array(ctx.cpus.size) { SimResourceCommand.Idle() } - this.pCpus = ctx.cpus.indices.sortedBy { ctx.cpus[it].frequency }.toIntArray() - this.maxUsage = ctx.cpus.sumByDouble { it.frequency } - this.barrier = SimWorkloadBarrier(ctx.cpus.size) - } - - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return this - } - - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { - val cpu = ctx.resource.id - totalRemainingWork += remainingWork - val isLast = barrier.enter() - - // Flush the progress of the guest after the barrier has been reached. - if (isLast && isDirty) { - isDirty = false - flushGuests() - } - - return if (isDirty) { - // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. - SimResourceCommand.Idle() - } else { - // Indicate that the scheduler needs to run next call. - if (isLast) { - isDirty = true - } - - commands[cpu] - } - } - - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - return commands[ctx.resource.id] - } - - override fun canFit(model: SimMachineModel): Boolean = true - - override fun createMachine( - model: SimMachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? - ): SimMachine = SimVm(model, performanceInterferenceModel) - - /** - * The execution context in which the hypervisor runs. - */ - private lateinit var ctx: SimMachineContext - - /** - * The commands to submit to the underlying host. - */ - private lateinit var commands: Array<SimResourceCommand> - - /** - * The active vCPUs. - */ - private val vcpus: MutableList<VCpu> = mutableListOf() - - /** - * The indices of the physical CPU ordered by their speed. - */ - private lateinit var pCpus: IntArray - - /** - * The maximum amount of work to be performed per second. - */ - private var maxUsage: Double = 0.0 - - /** - * The current load on the hypervisor. - */ - private var load: Double = 0.0 - - /** - * The total amount of remaining work (of all pCPUs). - */ - private var totalRemainingWork: Double = 0.0 - - /** - * The total speed requested by the vCPUs. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The total amount of work requested by the vCPUs. - */ - private var totalRequestedWork = 0.0 - - /** - * The total allocated speed for the vCPUs. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total allocated work requested for the vCPUs. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the scheduler has submitted work that has not yet been completed. - */ - private var isDirty: Boolean = false - - /** - * The scheduler barrier. - */ - private lateinit var barrier: SimWorkloadBarrier - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun shouldSchedule() { - isDirty = true - ctx.interruptAll() - } - - /** - * Schedule the work over the physical CPUs. - */ - private fun doSchedule() { - // If there is no work yet, mark all pCPUs as idle. - if (vcpus.isEmpty()) { - commands.fill(SimResourceCommand.Idle()) - ctx.interruptAll() - } - - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage - var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 - - // Sort the vCPUs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - vcpus.sort() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - val vcpuIterator = vcpus.listIterator() - var remaining = vcpus.size - while (vcpuIterator.hasNext()) { - val vcpu = vcpuIterator.next() - val availableShare = availableSpeed / remaining-- - - when (val command = vcpu.activeCommand) { - is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - vcpu.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(vcpu.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - vcpu.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += command.limit - totalRequestedWork += command.work - - vcpu.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> { - // Apparently the vCPU has exited, so remove it from the scheduling queue. - vcpuIterator.remove() +public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { + + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean = true + + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { + return SimResourceSwitchMaxMin( + ctx.clock, + ctx.meta["coroutine-context"] as CoroutineContext, + object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> { + override fun onSliceFinish( + switch: SimResourceSwitchMaxMin<SimProcessingUnit>, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) } } - } - - // Round the duration to milliseconds - duration = ceil(duration * 1000) / 1000 - - assert(deadline >= ctx.clock.millis()) { "Deadline already passed" } - - val totalAllocatedSpeed = maxUsage - availableSpeed - var totalAllocatedWork = 0.0 - availableSpeed = totalAllocatedSpeed - load = totalAllocatedSpeed / maxUsage - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in pCpus) { - val maxCpuUsage = ctx.cpus[i].frequency - val fraction = maxCpuUsage / maxUsage - val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction) - val grantedWork = duration * grantedSpeed - - commands[i] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - - totalAllocatedWork += grantedWork - availableSpeed -= grantedSpeed - } - - this.totalRequestedSpeed = totalRequestedSpeed - this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = totalAllocatedSpeed - this.totalAllocatedWork = totalAllocatedWork - - ctx.interruptAll() - } - - /** - * Flush the progress of the vCPUs. - */ - private fun flushGuests() { - // Flush all the vCPUs work - for (vcpu in vcpus) { - vcpu.flush(isIntermediate = true) - } - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork.toLong(), - (totalAllocatedWork - totalRemainingWork).toLong(), - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalRequestedSpeed, - totalAllocatedSpeed ) - totalRemainingWork = 0.0 - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - // Force all pCPUs to re-schedule their work. - doSchedule() - } - - /** - * Interrupt all host CPUs. - */ - private fun SimMachineContext.interruptAll() { - for (cpu in ctx.cpus) { - interrupt(cpu) - } - } - - /** - * A virtual machine running on the hypervisor. - * - * @property model The machine model of the virtual machine. - * @property performanceInterferenceModel The performance interference model to utilize. - */ - private inner class SimVm( - override val model: SimMachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimMachine { - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ - override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - - /** - * The current active workload. - */ - private var cont: Continuation<Unit>? = null - - /** - * The active CPUs of this virtual machine. - */ - private var cpus: List<VCpu> = emptyList() - - /** - * The execution context in which the workload runs. - */ - inner class Context(override val meta: Map<String, Any>) : SimMachineContext { - override val cpus: List<SimProcessingUnit> - get() = model.cpus - - override val memory: List<SimMemoryUnit> - get() = model.memory - - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock - - override fun interrupt(resource: SimResource) { - TODO() - } - } - - lateinit var ctx: SimMachineContext - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - ctx = Context(meta) - workload.onStart(ctx) - - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.cpus = model.cpus.map { VCpu(this, it, workload.getConsumer(ctx, it), ctx.clock) } - - for (cpu in cpus) { - // Register vCPU to scheduler - vcpus.add(cpu) - - cpu.start() - } - - // Re-schedule the work over the pCPUs - shouldSchedule() - } - } - - /** - * Terminate this VM instance. - */ - override fun close() { - isTerminated = true - } - - /** - * Update the usage of the VM. - */ - fun updateUsage() { - usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.resource.frequency } - } - - /** - * This method is invoked when one of the CPUs has exited. - */ - fun onCpuExit() { - // Check whether all other CPUs have finished - if (cpus.all { it.hasExited }) { - val cont = cont - this.cont = null - cont?.resume(Unit) - } - } - - /** - * This method is invoked when one of the CPUs failed. - */ - fun onCpuFailure(e: Throwable) { - // In case the flush fails with an exception, immediately propagate to caller, cancelling all other - // tasks. - val cont = cont - this.cont = null - cont?.resumeWithException(e) - } - } - - /** - * A CPU of the virtual machine. - */ - private inner class VCpu( - val vm: SimVm, - resource: SimProcessingUnit, - consumer: SimResourceConsumer<SimProcessingUnit>, - clock: Clock - ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer), Comparable<VCpu> { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - set(value) { - field = value - vm.updateUsage() - } - - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - override fun onIdle(deadline: Long) { - allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - allowedSpeed = getSpeed(limit) - activeCommand = SimResourceCommand.Consume(work, limit, deadline) - } - - override fun onFinish() { - hasExited = true - activeCommand = SimResourceCommand.Exit - vm.onCpuExit() - } - - override fun onFailure(cause: Throwable) { - hasExited = true - activeCommand = SimResourceCommand.Exit - vm.onCpuFailure(cause) - } - - override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { - // Apply performance interference model - val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0 - - // Compute the remaining amount of work - val remainingWork = if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) - } else { - 0.0 - } - - if (!isInterrupted) { - totalOvercommittedWork += remainingWork - } - - return remainingWork - } - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - super.interrupt() - - // Force the scheduler to re-schedule - shouldSchedule() - } - - override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index d8f00bef..4a233fec 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -31,6 +31,11 @@ import org.opendc.simulator.compute.workload.SimWorkload */ public interface SimHypervisor : SimWorkload { /** + * The machines running on the hypervisor. + */ + public val vms: Set<SimMachine> + + /** * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. */ public fun canFit(model: SimMachineModel): Boolean diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index 5c67b990..cff70826 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -37,7 +37,7 @@ public interface SimMachineContext { * The virtual clock tracking simulation time. */ public val clock: Clock - + /** * The metadata associated with the context. */ diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 751873a5..2001a230 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -22,270 +22,19 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.suspendCancellableCoroutine -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.SimMemoryUnit import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* -import java.time.Clock -import java.util.ArrayDeque -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException +import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. - * - * @param listener The hypervisor listener to use. */ -public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> { - /** - * The execution context in which the hypervisor runs. - */ - private lateinit var ctx: SimMachineContext - - /** - * The mapping from pCPU to vCPU. - */ - private lateinit var vcpus: Array<VCpu?> - - /** - * The available physical CPUs to schedule on. - */ - private val availableCpus = ArrayDeque<Int>() - - override fun canFit(model: SimMachineModel): Boolean = availableCpus.size >= model.cpus.size - - override fun createMachine( - model: SimMachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? - ): SimMachine { - require(canFit(model)) { "Cannot fit machine" } - return SimVm(model, performanceInterferenceModel) - } - - override fun onStart(ctx: SimMachineContext) { - this.ctx = ctx - this.vcpus = arrayOfNulls(ctx.cpus.size) - this.availableCpus.addAll(ctx.cpus.indices) - } - - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return this +public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean { + return switch.inputs.size - switch.outputs.size >= model.cpus.size } - override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - return onNext(ctx, 0.0) - } - - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { - val vcpu = vcpus[ctx.resource.id] ?: return SimResourceCommand.Idle() - - if (vcpu.isStarted) { - vcpu.remainingWork = remainingWork - vcpu.flush() - } else { - vcpu.isStarted = true - vcpu.start() - } - - if (vcpu.hasExited && vcpu != vcpus[ctx.resource.id]) { - return onNext(ctx, remainingWork) - } - - return vcpu.activeCommand - } - - /** - * A virtual machine running on the hypervisor. - * - * @property model The machine model of the virtual machine. - * @property performanceInterferenceModel The performance interference model to utilize. - */ - private inner class SimVm( - override val model: SimMachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimMachine { - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ - override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - /** - * The current active workload. - */ - private var cont: Continuation<Unit>? = null - - /** - * The physical CPUs that have been allocated. - */ - private val pCPUs = model.cpus.map { availableCpus.poll() }.toIntArray() - - /** - * The active CPUs of this virtual machine. - */ - private var cpus: List<VCpu> = emptyList() - - /** - * The execution context in which the workload runs. - */ - inner class Context(override val meta: Map<String, Any>) : SimMachineContext { - override val cpus: List<SimProcessingUnit> - get() = model.cpus - - override val memory: List<SimMemoryUnit> - get() = model.memory - - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock - - override fun interrupt(resource: SimResource) { - TODO() - } - } - - lateinit var ctx: SimMachineContext - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - ctx = Context(meta) - workload.onStart(ctx) - - return suspendCancellableCoroutine { cont -> - this.cont = cont - try { - this.cpus = model.cpus.map { model -> VCpu(this, model, workload.getConsumer(ctx, model), ctx.clock) } - - for ((index, pCPU) in pCPUs.withIndex()) { - vcpus[pCPU] = cpus[index] - this@SimSpaceSharedHypervisor.ctx.interrupt(this@SimSpaceSharedHypervisor.ctx.cpus[pCPU]) - } - } catch (e: Throwable) { - cont.resumeWithException(e) - } - } - } - - override fun close() { - isTerminated = true - for (pCPU in pCPUs) { - vcpus[pCPU] = null - availableCpus.add(pCPU) - } - - val cont = cont - this.cont = null - cont?.resume(Unit) - } - - /** - * Update the usage of the VM. - */ - fun updateUsage() { - usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.resource.frequency } - } - - /** - * This method is invoked when one of the CPUs has exited. - */ - fun onCpuExit() { - // Check whether all other CPUs have finished - if (cpus.all { it.hasExited }) { - val cont = cont - this.cont = null - cont?.resume(Unit) - } - } - - /** - * This method is invoked when one of the CPUs failed. - */ - fun onCpuFailure(e: Throwable) { - // In case the flush fails with an exception, immediately propagate to caller, cancelling all other - // tasks. - val cont = cont - this.cont = null - cont?.resumeWithException(e) - } - } - - /** - * A CPU of the virtual machine. - */ - private inner class VCpu( - val vm: SimVm, - resource: SimProcessingUnit, - consumer: SimResourceConsumer<SimProcessingUnit>, - clock: Clock - ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer) { - /** - * Indicates that the vCPU was started. - */ - var isStarted: Boolean = false - - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed of the vCPU. - */ - var speed: Double = 0.0 - set(value) { - field = value - vm.updateUsage() - } - - /** - * The amount of work remaining from the previous consumption. - */ - var remainingWork: Double = 0.0 - - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - override fun onIdle(deadline: Long) { - speed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - speed = getSpeed(limit) - activeCommand = SimResourceCommand.Consume(work, speed, deadline) - } - - override fun onFinish() { - speed = 0.0 - hasExited = true - activeCommand = SimResourceCommand.Idle() - vm.onCpuExit() - } - - override fun onFailure(cause: Throwable) { - speed = 0.0 - hasExited = true - activeCommand = SimResourceCommand.Idle() - vm.onCpuFailure(cause) - } - - override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { - return remainingWork - } + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { + return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index 3d49e544..e2044d05 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -28,5 +28,5 @@ package org.opendc.simulator.compute public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor(listener) + override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor() } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index edef3843..31f58a0f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -27,6 +27,7 @@ import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimConsumerBarrier /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -36,10 +37,10 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa private var offset = 0L private val iterator = trace.iterator() private var fragment: Fragment? = null - private lateinit var barrier: SimWorkloadBarrier + private lateinit var barrier: SimConsumerBarrier override fun onStart(ctx: SimMachineContext) { - barrier = SimWorkloadBarrier(ctx.cpus.size) + barrier = SimConsumerBarrier(ctx.cpus.size) fragment = nextFragment() offset = ctx.clock.millis() } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 4b4d7eca..4ac8cf63 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -23,8 +23,9 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -35,24 +36,18 @@ import org.opendc.simulator.compute.model.SimProcessingNode import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock /** * Test suite for the [SimHypervisor] class. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimHypervisorTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock - private lateinit var machineModel: SimMachineModel + private lateinit var model: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) - machineModel = SimMachineModel( + model = SimMachineModel( cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) @@ -62,7 +57,8 @@ internal class SimHypervisorTest { * Test overcommitting of resources via the hypervisor with a single VM. */ @Test - fun testOvercommittedSingle() { + fun testOvercommittedSingle() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val listener = object : SimHypervisor.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L @@ -83,38 +79,34 @@ internal class SimHypervisorTest { } } - scope.launch { - val duration = 5 * 60L - val workloadA = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) - ), - ) - - val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimFairShareHypervisor(listener) - - launch { - machine.run(hypervisor) - } - - yield() - launch { hypervisor.createMachine(machineModel).run(workloadA) } + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + + val machine = SimBareMetalMachine(coroutineContext, clock, model) + val hypervisor = SimFairShareHypervisor(listener) + + launch { + machine.run(hypervisor) + println("Hypervisor finished") } - - scope.advanceUntilIdle() - scope.uncaughtExceptions.forEach { it.printStackTrace() } + yield() + hypervisor.createMachine(model).run(workloadA) + yield() + machine.close() assertAll( - { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, scope.currentTime) } + { assertEquals(1200000, currentTime) } ) } @@ -122,7 +114,8 @@ internal class SimHypervisorTest { * Test overcommitting of resources via the hypervisor with two VMs. */ @Test - fun testOvercommittedDual() { + fun testOvercommittedDual() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val listener = object : SimHypervisor.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L @@ -143,48 +136,53 @@ internal class SimHypervisorTest { } } - scope.launch { - val duration = 5 * 60L - val workloadA = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) - ), - ) - val workloadB = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) - ) + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + val workloadB = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) ) + ) + + val machine = SimBareMetalMachine(coroutineContext, clock, model) + val hypervisor = SimFairShareHypervisor(listener) - val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimFairShareHypervisor(listener) + launch { + machine.run(hypervisor) + } + yield() + coroutineScope { launch { - machine.run(hypervisor) + val vm = hypervisor.createMachine(model) + vm.run(workloadA) + vm.close() } - - yield() - launch { hypervisor.createMachine(machineModel).run(workloadA) } - launch { hypervisor.createMachine(machineModel).run(workloadB) } + val vm = hypervisor.createMachine(model) + vm.run(workloadB) + vm.close() } - - scope.advanceUntilIdle() - scope.uncaughtExceptions.forEach { it.printStackTrace() } + yield() + machine.close() + yield() assertAll( - { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, scope.currentTime) } + { assertEquals(1200000, currentTime) } ) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 00efba53..6adc41d0 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -53,33 +52,35 @@ class SimMachineTest { } @Test - fun testFlopsWorkload() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) + fun testFlopsWorkload() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) - testScope.runBlockingTest { + try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) // Two cores execute 1000 MFlOps per second (1000 ms) - assertEquals(1000, testScope.currentTime) + assertEquals(1000, currentTime) + } finally { + machine.close() } } @Test - fun testUsage() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) + fun testUsage() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) - testScope.runBlockingTest { - val res = mutableListOf<Double>() - val job = launch { machine.usage.toList(res) } + val res = mutableListOf<Double>() + val job = launch { machine.usage.toList(res) } + try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) job.cancel() assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } + } finally { + machine.close() } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 583d989c..8428a0a7 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -25,7 +25,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach @@ -38,22 +38,16 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock /** * A test suite for the [SimSpaceSharedHypervisor]. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimSpaceSharedHypervisorTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, @@ -65,42 +59,45 @@ internal class SimSpaceSharedHypervisorTest { * Test a trace workload. */ @Test - fun testTrace() { + fun testTrace() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val usagePm = mutableListOf<Double>() val usageVm = mutableListOf<Double>() - scope.launch { - val duration = 5 * 60L - val workloadA = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) - ), - ) - - val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimSpaceSharedHypervisor() - - launch { machine.usage.toList(usagePm) } - launch { machine.run(hypervisor) } - - yield() - launch { - val vm = hypervisor.createMachine(machineModel) - launch { vm.usage.toList(usageVm) } - vm.run(workloadA) - } - } - - scope.advanceUntilIdle() + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + val colA = launch { machine.usage.toList(usagePm) } + launch { machine.run(hypervisor) } + + yield() + + val vm = hypervisor.createMachine(machineModel) + val colB = launch { vm.usage.toList(usageVm) } + vm.run(workloadA) + yield() + + vm.close() + machine.close() + colA.cancel() + colB.cancel() assertAll( { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usagePm) { "Correct PM usage" } }, - { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } }, - { assertEquals(5 * 60L * 4000, scope.currentTime) { "Took enough time" } } + // Temporary limitation is that VMs do not emit usage information + // { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } }, + { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } } ) } @@ -108,119 +105,111 @@ internal class SimSpaceSharedHypervisorTest { * Test runtime workload on hypervisor. */ @Test - fun testRuntimeWorkload() { + fun testRuntimeWorkload() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) - val machine = SimBareMetalMachine(scope, clock, machineModel) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) val hypervisor = SimSpaceSharedHypervisor() - scope.launch { - launch { machine.run(hypervisor) } - - yield() - launch { hypervisor.createMachine(machineModel).run(workload) } - } + launch { machine.run(hypervisor) } + yield() + val vm = hypervisor.createMachine(machineModel) + vm.run(workload) + vm.close() + machine.close() - scope.advanceUntilIdle() - - assertEquals(duration, scope.currentTime) { "Took enough time" } + assertEquals(duration, currentTime) { "Took enough time" } } /** * Test FLOPs workload on hypervisor. */ @Test - fun testFlopsWorkload() { + fun testFlopsWorkload() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) - val machine = SimBareMetalMachine(scope, clock, machineModel) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) val hypervisor = SimSpaceSharedHypervisor() - scope.launch { - launch { machine.run(hypervisor) } - - yield() - launch { hypervisor.createMachine(machineModel).run(workload) } - } + launch { machine.run(hypervisor) } + yield() + val vm = hypervisor.createMachine(machineModel) + vm.run(workload) + machine.close() - scope.advanceUntilIdle() - - assertEquals(duration, scope.currentTime) { "Took enough time" } + assertEquals(duration, currentTime) { "Took enough time" } } /** * Test two workloads running sequentially. */ @Test - fun testTwoWorkloads() { + fun testTwoWorkloads() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val duration = 5 * 60L * 1000 - val machine = SimBareMetalMachine(scope, clock, machineModel) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) val hypervisor = SimSpaceSharedHypervisor() - scope.launch { - launch { machine.run(hypervisor) } - - yield() - launch { - val vm = hypervisor.createMachine(machineModel) - vm.run(SimRuntimeWorkload(duration)) - vm.close() + launch { machine.run(hypervisor) } + yield() - val vm2 = hypervisor.createMachine(machineModel) - vm2.run(SimRuntimeWorkload(duration)) - } - } + val vm = hypervisor.createMachine(machineModel) + vm.run(SimRuntimeWorkload(duration)) + vm.close() - scope.advanceUntilIdle() + val vm2 = hypervisor.createMachine(machineModel) + vm2.run(SimRuntimeWorkload(duration)) + vm2.close() + machine.close() - assertEquals(duration * 2, scope.currentTime) { "Took enough time" } + assertEquals(duration * 2, currentTime) { "Took enough time" } } /** * Test concurrent workloads on the machine. */ @Test - fun testConcurrentWorkloadFails() { - val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimSpaceSharedHypervisor() + fun testConcurrentWorkloadFails() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) - scope.launch { - launch { machine.run(hypervisor) } + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() - yield() + launch { machine.run(hypervisor) } + yield() - hypervisor.createMachine(machineModel) + hypervisor.createMachine(machineModel) - assertAll( - { assertFalse(hypervisor.canFit(machineModel)) }, - { assertThrows<IllegalStateException> { hypervisor.createMachine(machineModel) } } - ) - } + assertAll( + { assertFalse(hypervisor.canFit(machineModel)) }, + { assertThrows<IllegalArgumentException> { hypervisor.createMachine(machineModel) } } + ) - scope.advanceUntilIdle() + machine.close() } /** * Test concurrent workloads on the machine. */ @Test - fun testConcurrentWorkloadSucceeds() { - val machine = SimBareMetalMachine(scope, clock, machineModel) + fun testConcurrentWorkloadSucceeds() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) val hypervisor = SimSpaceSharedHypervisor() - scope.launch { - launch { machine.run(hypervisor) } - - yield() + launch { machine.run(hypervisor) } + yield() - hypervisor.createMachine(machineModel).close() + hypervisor.createMachine(machineModel).close() - assertAll( - { assertTrue(hypervisor.canFit(machineModel)) }, - { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } - ) - } + assertAll( + { assertTrue(hypervisor.canFit(machineModel)) }, + { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } + ) - scope.advanceUntilIdle() + machine.close() } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index f9da74c7..52251bff 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -138,10 +138,11 @@ public abstract class SimAbstractResourceContext<R : SimResource>( } try { - val (timestamp, command) = activeCommand ?: return + val activeCommand = activeCommand ?: return + val (timestamp, command) = activeCommand isProcessing = true - activeCommand = null + this.activeCommand = null val duration = now - timestamp assert(duration >= 0) { "Flush in the past" } @@ -153,6 +154,8 @@ public abstract class SimAbstractResourceContext<R : SimResource>( // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (command.deadline <= now || !isIntermediate) { next(remainingWork = 0.0) + } else { + this.activeCommand = activeCommand } } is SimResourceCommand.Consume -> { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt new file mode 100644 index 00000000..ca23557c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume + +/** + * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. + */ +public class SimResourceForwarder<R : SimResource>(override val resource: R) : + SimResourceProvider<R>, SimResourceConsumer<R> { + /** + * The [SimResourceContext] in which the forwarder runs. + */ + private var ctx: SimResourceContext<R>? = null + + /** + * A flag to indicate that the forwarder is closed. + */ + private var isClosed: Boolean = false + + /** + * The continuation to resume after consumption. + */ + private var cont: Continuation<Unit>? = null + + /** + * The delegate [SimResourceConsumer]. + */ + private var delegate: SimResourceConsumer<R>? = null + + /** + * A flag to indicate that the delegate was started. + */ + private var hasDelegateStarted: Boolean = false + + /** + * The remaining amount of work last cycle. + */ + private var remainingWork: Double = 0.0 + + override suspend fun consume(consumer: SimResourceConsumer<R>) { + check(!isClosed) { "Lifecycle of forwarder has ended" } + check(cont == null) { "Run should not be called concurrently" } + + return suspendCancellableCoroutine { cont -> + this.cont = cont + this.delegate = consumer + + cont.invokeOnCancellation { reset() } + + ctx?.interrupt() + } + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun close() { + isClosed = true + interrupt() + ctx = null + } + + override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { + this.ctx = ctx + + return onNext(ctx, 0.0) + } + + override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { + this.remainingWork = remainingWork + + return if (isClosed) { + SimResourceCommand.Exit + } else if (!hasDelegateStarted) { + start() + } else { + next() + } + } + + /** + * Start the delegate. + */ + private fun start(): SimResourceCommand { + val delegate = delegate ?: return SimResourceCommand.Idle() + val command = delegate.onStart(checkNotNull(ctx)) + + hasDelegateStarted = true + + return forward(command) + } + + /** + * Obtain the next command to process. + */ + private fun next(): SimResourceCommand { + val delegate = delegate + return forward(delegate?.onNext(checkNotNull(ctx), remainingWork) ?: SimResourceCommand.Idle()) + } + + /** + * Forward the specified [command]. + */ + private fun forward(command: SimResourceCommand): SimResourceCommand { + return if (command == SimResourceCommand.Exit) { + val cont = checkNotNull(cont) + + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + cont.resume(Unit) + + if (isClosed) + SimResourceCommand.Exit + else + start() + } else { + command + } + } + + /** + * Reset the delegate. + */ + private fun reset() { + cont = null + delegate = null + hasDelegateStarted = false + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 91a745ab..e35aa683 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -37,6 +37,11 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable { public suspend fun consume(consumer: SimResourceConsumer<R>) /** + * Interrupt the resource. + */ + public fun interrupt() + + /** * End the lifetime of the resource. * * This operation terminates the existing resource consumer. diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 4445df86..540a17c9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -50,14 +50,32 @@ public class SimResourceSource<R : SimResource>( get() = _speed private val _speed = MutableStateFlow(0.0) + /** + * A flag to indicate that the resource was closed. + */ + private var isClosed: Boolean = false + + /** + * The current active consumer. + */ + private var cont: CancellableContinuation<Unit>? = null + + /** + * The [Context] that is currently running. + */ + private var ctx: Context? = null + override suspend fun consume(consumer: SimResourceConsumer<R>) { check(!isClosed) { "Lifetime of resource has ended." } check(cont == null) { "Run should not be called concurrently" } try { return suspendCancellableCoroutine { cont -> - this.cont = cont val ctx = Context(consumer, cont) + + this.cont = cont + this.ctx = ctx + ctx.start() cont.invokeOnCancellation { ctx.stop() @@ -65,6 +83,7 @@ public class SimResourceSource<R : SimResource>( } } finally { cont = null + ctx = null } } @@ -72,17 +91,12 @@ public class SimResourceSource<R : SimResource>( isClosed = true cont?.cancel() cont = null + ctx = null } - /** - * A flag to indicate that the resource was closed. - */ - private var isClosed: Boolean = false - - /** - * The current active consumer. - */ - private var cont: CancellableContinuation<Unit>? = null + override fun interrupt() { + ctx?.interrupt() + } /** * Internal implementation of [SimResourceContext] for this class. @@ -113,7 +127,7 @@ public class SimResourceSource<R : SimResource>( speed = getSpeed(limit) val until = min(deadline, clock.millis() + getDuration(work, speed)) - scheduler.startSingleTimerTo(this, until) { flush() } + scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish() { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt new file mode 100644 index 00000000..cd1af3fc --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers. + */ +public interface SimResourceSwitch<R : SimResource> : AutoCloseable { + /** + * The output resource providers to which resource consumers can be attached. + */ + public val outputs: Set<SimResourceProvider<R>> + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set<SimResourceProvider<R>> + + /** + * Add an output to the switch represented by [resource]. + */ + public fun addOutput(resource: R): SimResourceProvider<R> + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider<R>) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt new file mode 100644 index 00000000..060d0ea2 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import java.util.ArrayDeque +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that + * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. + */ +public class SimResourceSwitchExclusive<R : SimResource>(context: CoroutineContext) : SimResourceSwitch<R> { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context + Job()) + + private val _outputs = mutableSetOf<SimResourceProvider<R>>() + override val outputs: Set<SimResourceProvider<R>> + get() = _outputs + + private val availableResources = ArrayDeque<SimResourceForwarder<R>>() + private val _inputs = mutableSetOf<SimResourceProvider<R>>() + override val inputs: Set<SimResourceProvider<R>> + get() = _inputs + + override fun addOutput(resource: R): SimResourceProvider<R> { + check(availableResources.isNotEmpty()) { "No capacity to serve request" } + val forwarder = availableResources.poll() + val output = Provider(resource, forwarder) + _outputs += output + return output + } + + override fun addInput(input: SimResourceProvider<R>) { + if (input in inputs) { + return + } + + val forwarder = SimResourceForwarder(input.resource) + + scope.launch { input.consume(forwarder) } + + _inputs += input + availableResources += forwarder + } + + override fun close() { + scope.cancel() + } + + private inner class Provider( + override val resource: R, + private val forwarder: SimResourceForwarder<R> + ) : SimResourceProvider<R> { + + override suspend fun consume(consumer: SimResourceConsumer<R>) = forwarder.consume(consumer) + + override fun interrupt() { + forwarder.interrupt() + } + + override fun close() { + _outputs -= this + availableResources += forwarder + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt new file mode 100644 index 00000000..bcf76d3c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -0,0 +1,508 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.* +import org.opendc.simulator.resources.consumer.SimConsumerBarrier +import java.time.Clock +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min + * fair sharing. + */ +public class SimResourceSwitchMaxMin<R : SimResource>( + private val clock: Clock, + context: CoroutineContext, + private val listener: Listener<R>? = null +) : SimResourceSwitch<R> { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context + Job()) + + private val inputConsumers = mutableSetOf<InputConsumer>() + private val _outputs = mutableSetOf<OutputProvider>() + override val outputs: Set<SimResourceProvider<R>> + get() = _outputs + + private val _inputs = mutableSetOf<SimResourceProvider<R>>() + override val inputs: Set<SimResourceProvider<R>> + get() = _inputs + + /** + * The commands to submit to the underlying host. + */ + private val commands = mutableMapOf<R, SimResourceCommand>() + + /** + * The active output contexts. + */ + private val outputContexts: MutableList<OutputContext> = mutableListOf() + + /** + * The total amount of remaining work (of all pCPUs). + */ + private var totalRemainingWork: Double = 0.0 + + /** + * The total speed requested by the vCPUs. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the vCPUs. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the vCPUs. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the vCPUs. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the scheduler has submitted work that has not yet been completed. + */ + private var isDirty: Boolean = false + + /** + * The scheduler barrier. + */ + private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) + + /** + * Add an output to the switch represented by [resource]. + */ + override fun addOutput(resource: R): SimResourceProvider<R> { + val provider = OutputProvider(resource) + _outputs.add(provider) + return provider + } + + /** + * Add the specified [input] to the switch. + */ + override fun addInput(input: SimResourceProvider<R>) { + val consumer = InputConsumer(input) + _inputs.add(input) + inputConsumers += consumer + } + + override fun close() { + scope.cancel() + } + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun schedule() { + isDirty = true + interruptAll() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule() { + // If there is no work yet, mark all inputs as idle. + if (outputContexts.isEmpty()) { + commands.replaceAll { _, _ -> SimResourceCommand.Idle() } + interruptAll() + } + + val maxUsage = inputs.sumByDouble { it.resource.capacity } + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Sort the outputs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + outputContexts.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + val outputIterator = outputContexts.listIterator() + var remaining = outputContexts.size + while (outputIterator.hasNext()) { + val output = outputIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = output.activeCommand) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + output.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(output.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from an output + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> { + // Apparently the output consumer has exited, so remove it from the scheduling queue. + outputIterator.remove() + } + } + } + + // Round the duration to milliseconds + duration = ceil(duration * 1000) / 1000 + + assert(deadline >= clock.millis()) { "Deadline already passed" } + + val totalAllocatedSpeed = maxUsage - availableSpeed + var totalAllocatedWork = 0.0 + availableSpeed = totalAllocatedSpeed + + // Divide the requests over the available capacity of the input resources fairly + for (input in inputs.sortedByDescending { it.resource.capacity }) { + val maxResourceUsage = input.resource.capacity + val fraction = maxResourceUsage / maxUsage + val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction) + val grantedWork = duration * grantedSpeed + + commands[input.resource] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + + totalAllocatedWork += grantedWork + availableSpeed -= grantedSpeed + } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = totalAllocatedSpeed + this.totalAllocatedWork = totalAllocatedWork + + interruptAll() + } + + /** + * Flush the progress of the vCPUs. + */ + private fun flushGuests() { + // Flush all the outputs work + for (output in outputContexts) { + output.flush(isIntermediate = true) + } + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork.toLong(), + (totalAllocatedWork - totalRemainingWork).toLong(), + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalRequestedSpeed, + totalAllocatedSpeed + ) + totalRemainingWork = 0.0 + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + // Force all inputs to re-schedule their work. + doSchedule() + } + + /** + * Interrupt all inputs. + */ + private fun interruptAll() { + for (input in inputConsumers) { + input.interrupt() + } + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener<R : SimResource> { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceSwitchMaxMin<R>, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> { + /** + * A flag to indicate that the resource was closed. + */ + private var isClosed: Boolean = false + + /** + * The current active consumer. + */ + private var cont: CancellableContinuation<Unit>? = null + + /** + * The [OutputContext] that is currently running. + */ + private var ctx: OutputContext? = null + + override suspend fun consume(consumer: SimResourceConsumer<R>) { + check(!isClosed) { "Lifetime of resource has ended." } + check(cont == null) { "Run should not be called concurrently" } + + try { + return suspendCancellableCoroutine { cont -> + val ctx = OutputContext(resource, consumer, cont) + ctx.start() + cont.invokeOnCancellation { + ctx.stop() + } + + this.cont = cont + this.ctx = ctx + + outputContexts += ctx + schedule() + } + } finally { + cont = null + ctx = null + } + } + + override fun close() { + isClosed = true + cont?.cancel() + cont = null + ctx = null + _outputs.remove(this) + } + + override fun interrupt() { + ctx?.interrupt() + } + } + + /** + * A [SimAbstractResourceContext] for the output resources. + */ + private inner class OutputContext( + resource: R, + consumer: SimResourceConsumer<R>, + private val cont: Continuation<Unit> + ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> { + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + + /** + * A flag to indicate that the CPU has exited. + */ + var hasExited: Boolean = false + + override fun onIdle(deadline: Long) { + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + allowedSpeed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } + + override fun onFinish() { + hasExited = true + activeCommand = SimResourceCommand.Exit + cont.resume(Unit) + } + + override fun onFailure(cause: Throwable) { + hasExited = true + activeCommand = SimResourceCommand.Exit + cont.resumeWithException(cause) + } + + override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + // Apply performance interference model + val performanceScore = 1.0 + + // Compute the remaining amount of work + val remainingWork = if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 + } + + if (!isInterrupted) { + totalOvercommittedWork += remainingWork + } + + return remainingWork + } + + override fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isProcessing) { + return + } + + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } + + override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + } + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> { + /** + * The resource context of the consumer. + */ + private lateinit var ctx: SimResourceContext<R> + + init { + scope.launch { + try { + barrier = SimConsumerBarrier(barrier.parties + 1) + input.consume(this@InputConsumer) + } catch (e: CancellationException) { + // Cancel gracefully + throw e + } catch (e: Throwable) { + e.printStackTrace() + } finally { + barrier = SimConsumerBarrier(barrier.parties - 1) + inputConsumers -= this@InputConsumer + _inputs -= input + } + } + } + + /** + * Interrupt the consumer + */ + fun interrupt() { + ctx.interrupt() + } + + override fun onStart(ctx: SimResourceContext<R>): SimResourceCommand { + this.ctx = ctx + return commands[ctx.resource] ?: SimResourceCommand.Idle() + } + + override fun onNext(ctx: SimResourceContext<R>, remainingWork: Double): SimResourceCommand { + totalRemainingWork += remainingWork + val isLast = barrier.enter() + + // Flush the progress of the guest after the barrier has been reached. + if (isLast && isDirty) { + isDirty = false + flushGuests() + } + + return if (isDirty) { + // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. + SimResourceCommand.Idle() + } else { + // Indicate that the scheduler needs to run next call. + if (isLast) { + isDirty = true + } + + commands[ctx.resource] ?: SimResourceCommand.Idle() + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt index 45a299be..7aa5a5aa 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt @@ -20,13 +20,13 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload +package org.opendc.simulator.resources.consumer /** - * The [SimWorkloadBarrier] is a barrier that allows workloads to wait for a select number of CPUs to complete, before - * proceeding its operation. + * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to + * complete, before proceeding its operation. */ -public class SimWorkloadBarrier(public val parties: Int) { +public class SimConsumerBarrier(public val parties: Int) { private var counter = 0 /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt new file mode 100644 index 00000000..03a3cebd --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.consumer + +import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext + +/** + * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource + * consumption for some period of time. + */ +public class SimTraceConsumer(trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> { + private val iterator = trace.iterator() + + override fun onStart(ctx: SimResourceContext<SimResource>): SimResourceCommand { + return onNext(ctx, 0.0) + } + + override fun onNext(ctx: SimResourceContext<SimResource>, remainingWork: Double): SimResourceCommand { + return if (iterator.hasNext()) { + val now = ctx.clock.millis() + val fragment = iterator.next() + val work = (fragment.duration / 1000) * fragment.usage + val deadline = now + fragment.duration + + assert(deadline >= now) { "Deadline already passed" } + + if (work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) + } else { + SimResourceCommand.Exit + } + } + + /** + * A fragment of the workload. + */ + public data class Fragment(val duration: Long, val usage: Double) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt new file mode 100644 index 00000000..e7642dc1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.* +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.opendc.simulator.utils.DelayControllerClockAdapter + +/** + * A test suite for the [SimAbstractResourceContext] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceContextTest { + data class SimCpu(val speed: Double) : SimResource { + override val capacity: Double + get() = speed + } + + @Test + fun testFlushWithoutCommand() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + + val resource = SimCpu(4200.0) + + val consumer = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Consume(10.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + override fun onIdle(deadline: Long) { + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + } + + override fun onFinish() { + } + + override fun onFailure(cause: Throwable) { + } + } + + context.flush() + } + + @Test + fun testIntermediateFlush() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val resource = SimCpu(4200.0) + + val consumer = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Consume(10.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + var counter = 0 + val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + override fun onIdle(deadline: Long) { + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + counter++ + } + + override fun onFinish() { + } + + override fun onFailure(cause: Throwable) { + } + } + + context.start() + delay(1) // Delay 1 ms to prevent hitting the fast path + context.flush(isIntermediate = true) + assertEquals(2, counter) + } + + @Test + fun testIntermediateFlushIdle() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val resource = SimCpu(4200.0) + + val consumer = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Idle(10) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + var counter = 0 + var isFinished = false + val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + override fun onIdle(deadline: Long) { + counter++ + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + } + + override fun onFinish() { + isFinished = true + } + + override fun onFailure(cause: Throwable) { + } + } + + context.start() + delay(5) + context.flush(isIntermediate = true) + delay(5) + context.flush(isIntermediate = true) + + assertAll( + { assertEquals(1, counter) }, + { assertTrue(isFinished) } + ) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt new file mode 100644 index 00000000..ced1bd98 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Test +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * A test suite for the [SimResourceForwarder] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceForwarderTest { + + data class SimCpu(val speed: Double) : SimResource { + override val capacity: Double + get() = speed + } + + @Test + fun testExitImmediately() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + + launch { + source.consume(forwarder) + source.close() + } + + forwarder.consume(object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Exit + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + }) + forwarder.close() + scheduler.close() + } + + @Test + fun testExit() = runBlockingTest { + val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + + launch { + source.consume(forwarder) + source.close() + } + + forwarder.consume(object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Consume(1.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + }) + + forwarder.close() + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 8b380efb..4f7825fc 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -24,38 +24,27 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler -import java.time.Clock /** - * A test suite for the [SimResourceScheduler] class. + * A test suite for the [SimResourceSource] class. */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceSourceTest { - - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock - data class SimCpu(val speed: Double) : SimResource { override val capacity: Double get() = speed } - @BeforeEach - fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - } - @Test - fun testSpeed() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testSpeed() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -67,21 +56,25 @@ class SimResourceSourceTest { } } - scope.runBlockingTest { + try { val res = mutableListOf<Double>() val job = launch { provider.speed.toList(res) } provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + } finally { + scheduler.close() + provider.close() } } @Test - fun testSpeedLimit() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testSpeedLimit() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -93,21 +86,29 @@ class SimResourceSourceTest { } } - scope.runBlockingTest { + try { val res = mutableListOf<Double>() val job = launch { provider.speed.toList(res) } provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + } finally { + scheduler.close() + provider.close() } } + /** + * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or + * [SimResourceConsumer.onNext]. + */ @Test - fun testInterrupt() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testIntermediateInterrupt() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -120,17 +121,52 @@ class SimResourceSourceTest { } } - assertDoesNotThrow { - scope.runBlockingTest { - provider.consume(consumer) + try { + provider.consume(consumer) + } finally { + scheduler.close() + provider.close() + } + } + + @Test + fun testInterrupt() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + lateinit var resCtx: SimResourceContext<SimCpu> + + val consumer = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + resCtx = ctx + return SimResourceCommand.Consume(4.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + assertEquals(0.0, remainingWork) + return SimResourceCommand.Exit } } + + try { + launch { + yield() + resCtx.interrupt() + } + provider.consume(consumer) + + assertEquals(0, currentTime) + } finally { + scheduler.close() + provider.close() + } } @Test - fun testFailure() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testFailure() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -142,17 +178,21 @@ class SimResourceSourceTest { } } - assertThrows<IllegalStateException> { - scope.runBlockingTest { + try { + assertThrows<IllegalStateException> { provider.consume(consumer) } + } finally { + scheduler.close() + provider.close() } } @Test - fun testExceptionPropagationOnNext() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testExceptionPropagationOnNext() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -164,15 +204,21 @@ class SimResourceSourceTest { } } - assertThrows<IllegalStateException> { - scope.runBlockingTest { provider.consume(consumer) } + try { + assertThrows<IllegalStateException> { + provider.consume(consumer) + } + } finally { + scheduler.close() + provider.close() } } @Test - fun testConcurrentConsumption() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testConcurrentConsumption() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -184,18 +230,24 @@ class SimResourceSourceTest { } } - assertThrows<IllegalStateException> { - scope.runBlockingTest { - launch { provider.consume(consumer) } - launch { provider.consume(consumer) } + try { + assertThrows<IllegalStateException> { + coroutineScope { + launch { provider.consume(consumer) } + provider.consume(consumer) + } } + } finally { + scheduler.close() + provider.close() } } @Test - fun testClosedConsumption() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testClosedConsumption() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -207,18 +259,22 @@ class SimResourceSourceTest { } } - assertThrows<IllegalStateException> { - scope.runBlockingTest { + try { + assertThrows<IllegalStateException> { provider.close() provider.consume(consumer) } + } finally { + scheduler.close() + provider.close() } } @Test - fun testCloseDuringConsumption() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testCloseDuringConsumption() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -230,19 +286,23 @@ class SimResourceSourceTest { } } - scope.runBlockingTest { + try { launch { provider.consume(consumer) } delay(500) provider.close() - } - assertEquals(500, scope.currentTime) + assertEquals(500, currentTime) + } finally { + scheduler.close() + provider.close() + } } @Test - fun testIdle() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) + fun testIdle() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) val consumer = object : SimResourceConsumer<SimCpu> { override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { @@ -254,31 +314,40 @@ class SimResourceSourceTest { } } - scope.runBlockingTest { + try { provider.consume(consumer) - } - assertEquals(500, scope.currentTime) + assertEquals(500, currentTime) + } finally { + scheduler.close() + provider.close() + } } @Test fun testInfiniteSleep() { - val resource = SimCpu(4200.0) - val provider = SimResourceSource(resource, clock, TimerScheduler(scope, clock)) - - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { - return SimResourceCommand.Idle() - } - - override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { - return SimResourceCommand.Exit - } - } - assertThrows<IllegalStateException> { - scope.runBlockingTest { - provider.consume(consumer) + runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + + val consumer = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Idle() + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + try { + provider.consume(consumer) + } finally { + scheduler.close() + provider.close() + } } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt new file mode 100644 index 00000000..ca6558bf --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler +import java.lang.IllegalStateException + +/** + * Test suite for the [SimResourceSwitchExclusive] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceSwitchExclusiveTest { + class SimCpu(val speed: Double) : SimResource { + override val capacity: Double + get() = speed + } + + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val speed = mutableListOf<Double>() + + val duration = 5 * 60L + val workload = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3500.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + + switch.addInput(source) + + val provider = switch.addOutput(SimCpu(3200.0)) + val job = launch { source.speed.toList(speed) } + + try { + provider.consume(workload) + yield() + } finally { + job.cancel() + provider.close() + } + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Consume(duration / 1000.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + + switch.addInput(source) + + val provider = switch.addOutput(SimCpu(3200.0)) + + try { + provider.consume(workload) + yield() + } finally { + provider.close() + } + assertEquals(duration, currentTime) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Consume(duration / 1000.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + + switch.addInput(source) + + val provider = switch.addOutput(SimCpu(3200.0)) + + try { + provider.consume(workload) + yield() + provider.consume(workload) + } finally { + provider.close() + } + assertEquals(duration * 2, currentTime) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Consume(duration.toDouble(), 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + val switch = SimResourceSwitchExclusive<SimCpu>(coroutineContext) + val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + + switch.addInput(source) + + switch.addOutput(SimCpu(3200.0)) + assertThrows<IllegalStateException> { switch.addOutput(SimCpu(3200.0)) } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt new file mode 100644 index 00000000..698c1700 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * Test suite for the [SimResourceSwitch] implementations + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceSwitchMaxMinTest { + class SimCpu(val speed: Double) : SimResource { + override val capacity: Double + get() = speed + } + + @Test + fun testSmoke() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val switch = SimResourceSwitchMaxMin<SimCpu>(clock, coroutineContext) + + val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) } + sources.forEach { switch.addInput(it) } + + val provider = switch.addOutput(SimCpu(1000.0)) + + val consumer = object : SimResourceConsumer<SimCpu> { + override fun onStart(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + return SimResourceCommand.Consume(1.0, 1.0) + } + + override fun onNext(ctx: SimResourceContext<SimCpu>, remainingWork: Double): SimResourceCommand { + return SimResourceCommand.Exit + } + } + + try { + provider.consume(consumer) + yield() + } finally { + switch.close() + scheduler.close() + } + } + + /** + * Test overcommitting of resources via the hypervisor with a single VM. + */ + @Test + fun testOvercommittedSingle() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> { + var totalRequestedWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L + + override fun onSliceFinish( + switch: SimResourceSwitchMaxMin<SimCpu>, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + totalRequestedWork += requestedWork + totalGrantedWork += grantedWork + totalOvercommittedWork += overcommittedWork + } + } + + val duration = 5 * 60L + val workload = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3500.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) + val provider = switch.addOutput(SimCpu(3200.0)) + + try { + switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + provider.consume(workload) + yield() + } finally { + switch.close() + scheduler.close() + } + + assertAll( + { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1200000, currentTime) } + ) + } + + /** + * Test overcommitting of resources via the hypervisor with two VMs. + */ + @Test + fun testOvercommittedDual() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> { + var totalRequestedWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L + + override fun onSliceFinish( + switch: SimResourceSwitchMaxMin<SimCpu>, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + totalRequestedWork += requestedWork + totalGrantedWork += grantedWork + totalOvercommittedWork += overcommittedWork + } + } + + val duration = 5 * 60L + val workloadA = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3500.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 183.0) + ), + ) + val workloadB = + SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(duration * 1000, 28.0), + SimTraceConsumer.Fragment(duration * 1000, 3100.0), + SimTraceConsumer.Fragment(duration * 1000, 0.0), + SimTraceConsumer.Fragment(duration * 1000, 73.0) + ) + ) + + val switch = SimResourceSwitchMaxMin(clock, coroutineContext, listener) + val providerA = switch.addOutput(SimCpu(3200.0)) + val providerB = switch.addOutput(SimCpu(3200.0)) + + try { + switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + + coroutineScope { + launch { providerA.consume(workloadA) } + providerB.consume(workloadB) + } + + yield() + } finally { + switch.close() + scheduler.close() + } + assertAll( + { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1200000, currentTime) } + ) + } +} diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index 9f40f26a..49964938 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -22,24 +22,28 @@ package org.opendc.utils -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.sendBlocking -import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select import java.time.Clock import java.util.* +import kotlin.coroutines.CoroutineContext import kotlin.math.max /** * A TimerScheduler facilitates scheduled execution of future tasks. * - * @property coroutineScope The [CoroutineScope] to run the tasks in. + * @property context The [CoroutineContext] to run the tasks with. * @property clock The clock to keep track of the time. */ @OptIn(ExperimentalCoroutinesApi::class) -public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, private val clock: Clock) : AutoCloseable { +public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable { + /** + * The scope in which the scheduler runs. + */ + private val scope = CoroutineScope(context + Job()) + /** * A priority queue containing the tasks to be scheduled in the future. */ @@ -58,7 +62,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva /** * The scheduling job. */ - private val job = coroutineScope.launch { + private val job = scope.launch { val timers = timers val queue = queue val clock = clock @@ -71,7 +75,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select onTimeout(delay) { - while (queue.isNotEmpty()) { + while (queue.isNotEmpty() && isActive) { val timer = queue.peek() val timestamp = clock.millis() @@ -86,7 +90,11 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva if (!timer.isCancelled) { timers.remove(timer.key) - timer() + try { + timer() + } catch (e: Throwable) { + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e) + } } } @@ -101,7 +109,7 @@ public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, priva */ override fun close() { cancelAll() - job.cancel() + scope.cancel() } /** diff --git a/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt index 3a4acc90..1fcb5d38 100644 --- a/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt +++ b/simulator/opendc-utils/src/test/kotlin/org/opendc/utils/TimerSchedulerTest.kt @@ -38,7 +38,7 @@ internal class TimerSchedulerTest { fun testBasicTimer() { runBlockingTest { val clock = DelayControllerClockAdapter(this) - val scheduler = TimerScheduler<Int>(this, clock) + val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { scheduler.close() @@ -51,7 +51,7 @@ internal class TimerSchedulerTest { fun testCancelNonExisting() { runBlockingTest { val clock = DelayControllerClockAdapter(this) - val scheduler = TimerScheduler<Int>(this, clock) + val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.cancel(1) scheduler.close() @@ -62,7 +62,7 @@ internal class TimerSchedulerTest { fun testCancelExisting() { runBlockingTest { val clock = DelayControllerClockAdapter(this) - val scheduler = TimerScheduler<Int>(this, clock) + val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { assertFalse(false) @@ -81,7 +81,7 @@ internal class TimerSchedulerTest { fun testCancelAll() { runBlockingTest { val clock = DelayControllerClockAdapter(this) - val scheduler = TimerScheduler<Int>(this, clock) + val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { assertFalse(false) @@ -99,7 +99,7 @@ internal class TimerSchedulerTest { fun testOverride() { runBlockingTest { val clock = DelayControllerClockAdapter(this) - val scheduler = TimerScheduler<Int>(this, clock) + val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { assertFalse(false) @@ -117,7 +117,7 @@ internal class TimerSchedulerTest { fun testStopped() { runBlockingTest { val clock = DelayControllerClockAdapter(this) - val scheduler = TimerScheduler<Int>(this, clock) + val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.close() @@ -133,7 +133,7 @@ internal class TimerSchedulerTest { fun testNegativeDelay() { runBlockingTest { val clock = DelayControllerClockAdapter(this) - val scheduler = TimerScheduler<Int>(this, clock) + val scheduler = TimerScheduler<Int>(coroutineContext, clock) assertThrows<IllegalArgumentException> { scheduler.startSingleTimer(1, -1) { |
