diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-17 16:51:38 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-17 16:51:38 +0100 |
| commit | 054a3d376b8b31ba98f91e7b34c6e0ca717def18 (patch) | |
| tree | ee739cf4092a2b807e0043bed7cae72cff7b6bac /simulator/opendc-simulator/opendc-simulator-compute/src | |
| parent | df2f52780c08c5d108741d3746eaf03222c64841 (diff) | |
| parent | bb3b8e207a08edff81b8c2fe30b476c94bfea086 (diff) | |
Add uniform resource consumption model (v1)
This is the first in the series of pull requests to add a uniform resource consumption model to OpenDC. This pull request introduces the `opendc-simulator-resources` module which introduces the primitives with which we can model resource consumption of CPUs, disks and network:
* `SimResourceProvider` represents a provider of some generic resource `R`, which may be consumed via `consume(SimResourceConsumer<R>)`
* `SimResourceConsumer` represents a resource consumers and characterizes how the resource is being consumed.
* `SimResourceSwitch` is a generic scheduler for sharing the capacity of multiple resources across multiple consumers.
- `SimResourceSwitchExclusive`: A space-shared switch - each consumer is allocated a single resource exclusively.
- `SimResourceSwitchMinMax`: A time-shared switch - each consumer gets a fair share of the resource capacity.
* `SimResourceForwarder` converts a consumer in a provider.
**Breaking Changes**
* `ProcessingUnit` and `MemoryUnit` renamed to `SimProcessingUnit` and `SimMemoryUnit` respectively.
* `TimerScheduler` accepts a `CoroutineContext` as opposed to a `CoroutineScope`.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-compute/src')
21 files changed, 666 insertions, 1504 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt new file mode 100644 index 00000000..a99b082a --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.* +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Abstract implementation of the [SimHypervisor] interface. + */ +public abstract class SimAbstractHypervisor : SimHypervisor { + /** + * The machine on which the hypervisor runs. + */ + private lateinit var context: SimMachineContext + + /** + * The resource switch to use. + */ + private lateinit var switch: SimResourceSwitch<SimProcessingUnit> + + /** + * The virtual machines running on this hypervisor. + */ + private val _vms = mutableSetOf<VirtualMachine>() + override val vms: Set<SimMachine> + get() = _vms + + /** + * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. + */ + public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> + + /** + * Check whether the specified machine model fits on this hypervisor. + */ + public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean + + override fun canFit(model: SimMachineModel): Boolean { + return canFit(model, switch) + } + + override fun createMachine( + model: SimMachineModel, + performanceInterferenceModel: PerformanceInterferenceModel? + ): SimMachine { + require(canFit(model)) { "Machine does not fit" } + val vm = VirtualMachine(model, performanceInterferenceModel) + _vms.add(vm) + return vm + } + + /** + * A virtual machine running on the hypervisor. + * + * @property model The machine model of the virtual machine. + * @property performanceInterferenceModel The performance interference model to utilize. + */ + private inner class VirtualMachine( + override val model: SimMachineModel, + val performanceInterferenceModel: PerformanceInterferenceModel? = null, + ) : SimMachine { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ + override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) + + /** + * A flag to indicate that the machine is terminated. + */ + private var isTerminated = false + + /** + * The vCPUs of the machine. + */ + private val cpus: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>> = model.cpus.associateWith { switch.addOutput(it) } + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { + coroutineScope { + require(!isTerminated) { "Machine is terminated" } + + val ctx = object : SimMachineContext { + override val cpus: List<SimProcessingUnit> + get() = model.cpus + + override val memory: List<SimMemoryUnit> + get() = model.memory + + override val clock: Clock + get() = this@SimAbstractHypervisor.context.clock + + override val meta: Map<String, Any> = meta + mapOf("coroutine-context" to context.meta["coroutine-context"] as CoroutineContext) + + override fun interrupt(resource: SimResource) { + requireNotNull(this@VirtualMachine.cpus[resource]).interrupt() + } + } + + workload.onStart(ctx) + + for ((cpu, provider) in cpus) { + launch { + provider.consume(workload.getConsumer(ctx, cpu)) + } + } + } + } + + /** + * Terminate this VM instance. + */ + override fun close() { + if (!isTerminated) { + cpus.forEach { (_, provider) -> provider.close() } + _vms.remove(this) + } + + isTerminated = true + } + } + + override fun onStart(ctx: SimMachineContext) { + context = ctx + switch = createSwitch(ctx) + } + + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + val forwarder = SimResourceForwarder(cpu) + switch.addInput(forwarder) + return forwarder + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt new file mode 100644 index 00000000..1bdbb7e8 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.resources.SimResourceSource +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Abstract implementation of the [SimMachine] interface. + * + * @param context The [CoroutineContext] in which the machine runs. + */ +public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine { + private val _usage = MutableStateFlow(0.0) + override val usage: StateFlow<Double> + get() = _usage + + /** + * A flag to indicate that the machine is terminated. + */ + private var isTerminated = false + + /** + * The [CoroutineContext] to run in. + */ + protected abstract val context: CoroutineContext + + /** + * The resources allocated for this machine. + */ + protected abstract val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> + + /** + * The execution context in which the workload runs. + */ + private inner class Context( + val sources: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>>, + override val meta: Map<String, Any> + ) : SimMachineContext { + override val clock: Clock + get() = this@SimAbstractMachine.clock + + override val cpus: List<SimProcessingUnit> = model.cpus + + override val memory: List<SimMemoryUnit> = model.memory + + override fun interrupt(resource: SimResource) { + checkNotNull(sources[resource]) { "Invalid resource" }.interrupt() + } + } + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) { + val resources = resources + require(!isTerminated) { "Machine is terminated" } + val ctx = Context(resources, meta + mapOf("coroutine-context" to context)) + val totalCapacity = model.cpus.sumByDouble { it.frequency } + + workload.onStart(ctx) + + for ((cpu, source) in resources) { + val consumer = workload.getConsumer(ctx, cpu) + val job = source.speed + .onEach { + _usage.value = resources.values.sumByDouble { it.speed.value } / totalCapacity + } + .launchIn(this) + + launch { + source.consume(consumer) + job.cancel() + } + } + } + + override fun close() { + if (!isTerminated) { + resources.forEach { (_, provider) -> provider.close() } + } else { + isTerminated = true + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f74c5697..79982ea8 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -23,18 +23,11 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.* import org.opendc.utils.TimerScheduler import java.time.Clock -import java.util.* import kotlin.coroutines.* -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A simulated bare-metal machine that is able to run a single workload. @@ -42,271 +35,34 @@ import kotlin.math.min * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For * example. the class expects only a single concurrent call to [run]. * - * @param coroutineScope The [CoroutineScope] to run the simulated workload in. + * @param context The [CoroutineContext] to run the simulated workload in. * @param clock The virtual clock to track the simulation time. * @param model The machine model to simulate. */ @OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( - private val coroutineScope: CoroutineScope, + context: CoroutineContext, private val clock: Clock, override val model: SimMachineModel -) : SimMachine { +) : SimAbstractMachine(clock) { /** - * A [StateFlow] representing the CPU usage of the simulated machine. + * The [Job] associated with this machine. */ - override val usage: StateFlow<Double> - get() = usageState + private val job = Job() - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - - /** - * The [MutableStateFlow] containing the load of the server. - */ - private val usageState = MutableStateFlow(0.0) - - /** - * The current active workload. - */ - private var cont: Continuation<Unit>? = null - - /** - * The active CPUs of this machine. - */ - private var cpus: List<Cpu> = emptyList() + override val context: CoroutineContext = context + job /** * The [TimerScheduler] to use for scheduling the interrupts. */ - private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock) - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimBareMetalMachine.model - - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override val meta: Map<String, Any> - get() = meta + private val scheduler = TimerScheduler<Any>(this.context, clock) - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } + override val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> = + model.cpus.associateWith { SimResourceSource(it, clock, scheduler) } - workload.onStart(ctx) - - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.cpus = model.cpus.map { Cpu(ctx, it, workload) } - - for (cpu in cpus) { - cpu.start() - } - } - } - - /** - * Terminate the specified bare-metal machine. - */ override fun close() { - isTerminated = true - } - - /** - * Update the usage of the machine. - */ - private fun updateUsage() { - usageState.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } - } - - /** - * This method is invoked when one of the CPUs has exited. - */ - private fun onCpuExit(cpu: Int) { - // Check whether all other CPUs have finished - if (cpus.all { it.hasExited }) { - val cont = cont - this.cont = null - cont?.resume(Unit) - } - } - - /** - * This method is invoked when one of the CPUs failed. - */ - private fun onCpuFailure(e: Throwable) { - // Make sure no other tasks will be resumed. - scheduler.cancelAll() - - // In case the flush fails with an exception, immediately propagate to caller, cancelling all other - // tasks. - val cont = cont - this.cont = null - cont?.resumeWithException(e) - } - - /** - * A physical CPU of the machine. - */ - private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) { - /** - * The current command. - */ - private var currentCommand: CommandWrapper? = null - - /** - * The actual processing speed. - */ - var speed: Double = 0.0 - set(value) { - field = value - updateUsage() - } - - /** - * A flag to indicate that the CPU is currently processing a command. - */ - var isIntermediate: Boolean = false - - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - /** - * Process the specified [SimResourceCommand] for this CPU. - */ - fun process(command: SimResourceCommand) { - val timestamp = clock.millis() - - val task = when (command) { - is SimResourceCommand.Idle -> { - speed = 0.0 - - val deadline = command.deadline - - require(deadline >= timestamp) { "Deadline already passed" } - - if (deadline != Long.MAX_VALUE) { - scheduler.startSingleTimerTo(this, deadline) { flush() } - } else { - null - } - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline - - require(deadline >= timestamp) { "Deadline already passed" } - - speed = min(model.frequency, limit) - - // The required duration to process all the work - val finishedAt = timestamp + ceil(work / speed * 1000).toLong() - - scheduler.startSingleTimerTo(this, min(finishedAt, deadline)) { flush() } - } - is SimResourceCommand.Exit -> { - speed = 0.0 - hasExited = true - - onCpuExit(model.id) - - null - } - } - - assert(currentCommand == null) { "Concurrent access to current command" } - currentCommand = CommandWrapper(timestamp, command) - } - - /** - * Request the workload for more work. - */ - private fun next(remainingWork: Double) { - process(workload.onNext(ctx, model.id, remainingWork)) - } - - /** - * Start the CPU. - */ - fun start() { - try { - isIntermediate = true - - process(workload.onStart(ctx, model.id)) - } catch (e: Throwable) { - onCpuFailure(e) - } finally { - isIntermediate = false - } - } - - /** - * Flush the work performed by the CPU. - */ - fun flush() { - try { - val (timestamp, command) = currentCommand ?: return - - isIntermediate = true - currentCommand = null - - // Cancel the running task and flush the progress - scheduler.cancel(this) - - when (command) { - is SimResourceCommand.Idle -> next(remainingWork = 0.0) - is SimResourceCommand.Consume -> { - val duration = clock.millis() - timestamp - val remainingWork = if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, command.work - processed) - } else { - 0.0 - } - - next(remainingWork) - } - SimResourceCommand.Exit -> throw IllegalStateException() - } - } catch (e: Throwable) { - onCpuFailure(e) - } finally { - isIntermediate = false - } - } - - /** - * Interrupt the CPU. - */ - fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isIntermediate) { - return - } - - flush() - } + super.close() + scheduler.close() + job.cancel() } - - /** - * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. - */ - private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index bf6d8a5e..bb97192d 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -22,21 +22,10 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.suspendCancellableCoroutine -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.compute.workload.SimWorkloadBarrier -import java.time.Clock -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min +import org.opendc.simulator.resources.* +import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single @@ -44,552 +33,27 @@ import kotlin.math.min * * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { - - override fun onStart(ctx: SimExecutionContext) { - val model = ctx.machine - this.ctx = ctx - this.commands = Array(model.cpus.size) { SimResourceCommand.Idle() } - this.pCpus = model.cpus.indices.sortedBy { model.cpus[it].frequency }.toIntArray() - this.maxUsage = model.cpus.sumByDouble { it.frequency } - this.barrier = SimWorkloadBarrier(model.cpus.size) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return commands[cpu] - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - totalRemainingWork += remainingWork - val isLast = barrier.enter() - - // Flush the progress of the guest after the barrier has been reached. - if (isLast && isDirty) { - isDirty = false - flushGuests() - } - - return if (isDirty) { - // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. - SimResourceCommand.Idle() - } else { - // Indicate that the scheduler needs to run next call. - if (isLast) { - isDirty = true - } - - commands[cpu] - } - } - - override fun canFit(model: SimMachineModel): Boolean = true - - override fun createMachine( - model: SimMachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? - ): SimMachine = SimVm(model, performanceInterferenceModel) - - /** - * The execution context in which the hypervisor runs. - */ - private lateinit var ctx: SimExecutionContext - - /** - * The commands to submit to the underlying host. - */ - private lateinit var commands: Array<SimResourceCommand> - - /** - * The active vCPUs. - */ - private val vcpus: MutableList<VCpu> = mutableListOf() - - /** - * The indices of the physical CPU ordered by their speed. - */ - private lateinit var pCpus: IntArray - - /** - * The maximum amount of work to be performed per second. - */ - private var maxUsage: Double = 0.0 - - /** - * The current load on the hypervisor. - */ - private var load: Double = 0.0 - - /** - * The total amount of remaining work (of all pCPUs). - */ - private var totalRemainingWork: Double = 0.0 - - /** - * The total speed requested by the vCPUs. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The total amount of work requested by the vCPUs. - */ - private var totalRequestedWork = 0.0 - - /** - * The total allocated speed for the vCPUs. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total allocated work requested for the vCPUs. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the scheduler has submitted work that has not yet been completed. - */ - private var isDirty: Boolean = false - - /** - * The scheduler barrier. - */ - private lateinit var barrier: SimWorkloadBarrier - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun shouldSchedule() { - isDirty = true - ctx.interruptAll() - } - - /** - * Schedule the work over the physical CPUs. - */ - private fun doSchedule() { - // If there is no work yet, mark all pCPUs as idle. - if (vcpus.isEmpty()) { - commands.fill(SimResourceCommand.Idle()) - ctx.interruptAll() - } - - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage - var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 - - // Sort the vCPUs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - vcpus.sort() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - val vcpuIterator = vcpus.listIterator() - var remaining = vcpus.size - while (vcpuIterator.hasNext()) { - val vcpu = vcpuIterator.next() - val availableShare = availableSpeed / remaining-- - - when (val command = vcpu.command) { - is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - vcpu.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(vcpu.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - vcpu.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += command.limit - totalRequestedWork += command.work - - vcpu.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> { - // Apparently the vCPU has exited, so remove it from the scheduling queue. - vcpuIterator.remove() +public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { + + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean = true + + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { + return SimResourceSwitchMaxMin( + ctx.clock, + ctx.meta["coroutine-context"] as CoroutineContext, + object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> { + override fun onSliceFinish( + switch: SimResourceSwitchMaxMin<SimProcessingUnit>, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) } } - } - - // Round the duration to milliseconds - duration = ceil(duration * 1000) / 1000 - - assert(deadline >= ctx.clock.millis()) { "Deadline already passed" } - - val totalAllocatedSpeed = maxUsage - availableSpeed - var totalAllocatedWork = 0.0 - availableSpeed = totalAllocatedSpeed - load = totalAllocatedSpeed / maxUsage - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in pCpus) { - val maxCpuUsage = ctx.machine.cpus[i].frequency - val fraction = maxCpuUsage / maxUsage - val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction) - val grantedWork = duration * grantedSpeed - - commands[i] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - - totalAllocatedWork += grantedWork - availableSpeed -= grantedSpeed - } - - this.totalRequestedSpeed = totalRequestedSpeed - this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = totalAllocatedSpeed - this.totalAllocatedWork = totalAllocatedWork - - ctx.interruptAll() - } - - /** - * Flush the progress of the vCPUs. - */ - private fun flushGuests() { - // Flush all the vCPUs work - for (vcpu in vcpus) { - vcpu.flush(interrupt = false) - } - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork.toLong(), - (totalAllocatedWork - totalRemainingWork).toLong(), - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalRequestedSpeed, - totalAllocatedSpeed ) - totalRemainingWork = 0.0 - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - // Force all pCPUs to re-schedule their work. - doSchedule() - } - - /** - * Interrupt all host CPUs. - */ - private fun SimExecutionContext.interruptAll() { - for (i in machine.cpus.indices) { - interrupt(i) - } - } - - /** - * A virtual machine running on the hypervisor. - * - * @property model The machine model of the virtual machine. - * @property performanceInterferenceModel The performance interference model to utilize. - */ - private inner class SimVm( - override val model: SimMachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimMachine { - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ - override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - - /** - * The current active workload. - */ - private var cont: Continuation<Unit>? = null - - /** - * The active CPUs of this virtual machine. - */ - private var cpus: List<VCpu> = emptyList() - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock - - override val meta: Map<String, Any> - get() = meta - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - workload.onStart(ctx) - - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) } - - for (cpu in cpus) { - // Register vCPU to scheduler - vcpus.add(cpu) - - cpu.start() - } - - // Re-schedule the work over the pCPUs - shouldSchedule() - } - } - - /** - * Terminate this VM instance. - */ - override fun close() { - isTerminated = true - } - - /** - * Update the usage of the VM. - */ - fun updateUsage() { - usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.model.frequency } - } - - /** - * This method is invoked when one of the CPUs has exited. - */ - fun onCpuExit(cpu: Int) { - // Check whether all other CPUs have finished - if (cpus.all { it.hasExited }) { - val cont = cont - this.cont = null - cont?.resume(Unit) - } - } - - /** - * This method is invoked when one of the CPUs failed. - */ - fun onCpuFailure(e: Throwable) { - // In case the flush fails with an exception, immediately propagate to caller, cancelling all other - // tasks. - val cont = cont - this.cont = null - cont?.resumeWithException(e) - } - } - - /** - * A CPU of the virtual machine. - */ - private inner class VCpu( - val vm: SimVm, - val ctx: SimExecutionContext, - val model: ProcessingUnit, - val workload: SimWorkload - ) : Comparable<VCpu> { - /** - * The latest command processed by the CPU. - */ - var command: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The latest timestamp at which the vCPU was flushed. - */ - var latestFlush: Long = 0 - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - set(value) { - field = value - vm.updateUsage() - } - - /** - * A flag to indicate that the CPU is currently processing a command. - */ - var isIntermediate: Boolean = false - - /** - * A flag to indicate that the CPU has exited. - */ - val hasExited: Boolean - get() = command is SimResourceCommand.Exit - - /** - * Process the specified [SimResourceCommand] for this CPU. - */ - fun process(command: SimResourceCommand) { - // Assign command as the most recent executed command - this.command = command - - when (command) { - is SimResourceCommand.Idle -> { - require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } - - allowedSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } - - allowedSpeed = min(model.frequency, command.limit) - } - is SimResourceCommand.Exit -> { - allowedSpeed = 0.0 - actualSpeed = 0.0 - - vm.onCpuExit(model.id) - } - } - } - - /** - * Start the CPU. - */ - fun start() { - try { - isIntermediate = true - latestFlush = ctx.clock.millis() - - process(workload.onStart(ctx, model.id)) - } catch (e: Throwable) { - fail(e) - } finally { - isIntermediate = false - } - } - - /** - * Flush the work performed by the CPU. - */ - fun flush(interrupt: Boolean) { - val now = ctx.clock.millis() - - // Fast path: if the CPU was already flushed at at the current instant, no need to flush the progress. - if (latestFlush >= now) { - return - } - - try { - isIntermediate = true - when (val command = command) { - is SimResourceCommand.Idle -> { - // Act like nothing has happened in case the vCPU did not reach its deadline or was not - // interrupted by the user. - if (interrupt || command.deadline <= now) { - process(workload.onNext(ctx, model.id, 0.0)) - } - } - is SimResourceCommand.Consume -> { - // Apply performance interference model - val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0 - - // Compute the remaining amount of work - val remainingWork = if (command.work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - totalInterferedWork += interferedWork - - max(0.0, command.work - processed) - } else { - 0.0 - } - - // Act like nothing has happened in case the vCPU did not finish yet or was not interrupted by - // the user. - if (interrupt || remainingWork == 0.0 || command.deadline <= now) { - if (!interrupt) { - totalOvercommittedWork += remainingWork - } - - process(workload.onNext(ctx, model.id, remainingWork)) - } else { - process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) - } - } - SimResourceCommand.Exit -> - throw IllegalStateException() - } - } catch (e: Throwable) { - fail(e) - } finally { - latestFlush = now - isIntermediate = false - } - } - - /** - * Interrupt the CPU. - */ - fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isIntermediate) { - return - } - - flush(interrupt = true) - - // Force the scheduler to re-schedule - shouldSchedule() - } - - /** - * Fail the CPU. - */ - fun fail(e: Throwable) { - command = SimResourceCommand.Exit - vm.onCpuFailure(e) - } - - override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index d8f00bef..4a233fec 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -31,6 +31,11 @@ import org.opendc.simulator.compute.workload.SimWorkload */ public interface SimHypervisor : SimWorkload { /** + * The machines running on the hypervisor. + */ + public val vms: Set<SimMachine> + + /** * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. */ public fun canFit(model: SimMachineModel): Boolean diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index 657dac66..cff70826 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,6 +22,9 @@ package org.opendc.simulator.compute +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResource import java.time.Clock /** @@ -29,27 +32,31 @@ import java.time.Clock * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on * which the image runs. */ -public interface SimExecutionContext { +public interface SimMachineContext { /** * The virtual clock tracking simulation time. */ public val clock: Clock /** - * The machine model of the machine that is running the image. + * The metadata associated with the context. */ - public val machine: SimMachineModel + public val meta: Map<String, Any> /** - * The metadata associated with the context. + * The CPUs available on the machine. */ - public val meta: Map<String, Any> + public val cpus: List<SimProcessingUnit> + + /** + * The memory available on the machine + */ + public val memory: List<SimMemoryUnit> /** - * Ask the host machine to interrupt the specified vCPU. + * Interrupt the specified [resource]. * - * @param cpu The id of the vCPU to interrupt. - * @throws IllegalArgumentException if the identifier points to a non-existing vCPU. + * @throws IllegalArgumentException if the resource does not belong to this execution context. */ - public fun interrupt(cpu: Int) + public fun interrupt(resource: SimResource) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt index c2988b11..d6bf0e99 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit /** * A description of the physical or virtual machine on which a bootable image runs. @@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.ProcessingUnit * @property cpus The list of processing units available to the image. * @property memory The list of memory units available to the image. */ -public data class SimMachineModel(public val cpus: List<ProcessingUnit>, public val memory: List<MemoryUnit>) +public data class SimMachineModel(public val cpus: List<SimProcessingUnit>, public val memory: List<SimMemoryUnit>) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 778b68ca..2001a230 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -22,263 +22,19 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.suspendCancellableCoroutine -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload -import java.time.Clock -import java.util.ArrayDeque -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.math.min +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.* +import kotlin.coroutines.CoroutineContext /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. - * - * @param listener The hypervisor listener to use. */ -public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { - /** - * The execution context in which the hypervisor runs. - */ - private lateinit var ctx: SimExecutionContext - - /** - * The mapping from pCPU to vCPU. - */ - private lateinit var vcpus: Array<VCpu?> - - /** - * The available physical CPUs to schedule on. - */ - private val availableCpus = ArrayDeque<Int>() - - override fun canFit(model: SimMachineModel): Boolean = availableCpus.size >= model.cpus.size - - override fun createMachine( - model: SimMachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? - ): SimMachine { - require(canFit(model)) { "Cannot fit machine" } - return SimVm(model, performanceInterferenceModel) - } - - override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx - this.vcpus = arrayOfNulls(ctx.machine.cpus.size) - this.availableCpus.addAll(ctx.machine.cpus.indices) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return onNext(ctx, cpu, 0.0) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle() - } - - /** - * A virtual machine running on the hypervisor. - * - * @property model The machine model of the virtual machine. - * @property performanceInterferenceModel The performance interference model to utilize. - */ - private inner class SimVm( - override val model: SimMachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimMachine { - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ - override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - /** - * The current active workload. - */ - private var cont: Continuation<Unit>? = null - - /** - * The physical CPUs that have been allocated. - */ - private val pCPUs = model.cpus.map { availableCpus.poll() }.toIntArray() - - /** - * The active CPUs of this virtual machine. - */ - private var cpus: List<VCpu> = emptyList() - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model - - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock - - override val meta: Map<String, Any> - get() = meta - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - workload.onStart(ctx) - - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) } - - for (cpu in cpus) { - cpu.start() - } - } - } - - override fun close() { - isTerminated = true - for (pCPU in pCPUs) { - vcpus[pCPU] = null - availableCpus.add(pCPU) - } - } - - /** - * Update the usage of the VM. - */ - fun updateUsage() { - usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } - } - - /** - * This method is invoked when one of the CPUs has exited. - */ - fun onCpuExit(cpu: Int) { - // Check whether all other CPUs have finished - if (cpus.all { it.hasExited }) { - val cont = cont - this.cont = null - cont?.resume(Unit) - } - } - - /** - * This method is invoked when one of the CPUs failed. - */ - fun onCpuFailure(e: Throwable) { - // In case the flush fails with an exception, immediately propagate to caller, cancelling all other - // tasks. - val cont = cont - this.cont = null - cont?.resumeWithException(e) - } +public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean { + return switch.inputs.size - switch.outputs.size >= model.cpus.size } - /** - * A CPU of the virtual machine. - */ - private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { - /** - * The processing speed of the vCPU. - */ - var speed: Double = 0.0 - set(value) { - field = value - vm.updateUsage() - } - - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - /** - * A flag to indicate that the CPU was started. - */ - var hasStarted: Boolean = false - - /** - * Process the specified [SimResourceCommand] for this CPU. - */ - fun process(command: SimResourceCommand): SimResourceCommand { - return when (command) { - is SimResourceCommand.Idle -> { - speed = 0.0 - command - } - is SimResourceCommand.Consume -> { - speed = min(model.frequency, command.limit) - command - } - is SimResourceCommand.Exit -> { - speed = 0.0 - hasExited = true - - vm.onCpuExit(model.id) - - SimResourceCommand.Idle() - } - } - } - - /** - * Start the CPU. - */ - fun start() { - vcpus[pCPU] = this - interrupt() - } - - /** - * Request the workload for more work. - */ - fun next(remainingWork: Double): SimResourceCommand { - return try { - val command = - if (hasStarted) { - workload.onNext(ctx, model.id, remainingWork) - } else { - hasStarted = true - workload.onStart(ctx, model.id) - } - process(command) - } catch (e: Throwable) { - fail(e) - } - } - - /** - * Interrupt the CPU. - */ - fun interrupt() { - this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU) - } - - /** - * Fail the CPU. - */ - fun fail(e: Throwable): SimResourceCommand { - hasExited = true - - vm.onCpuFailure(e) - - return SimResourceCommand.Idle() - } + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { + return SimResourceSwitchExclusive(ctx.meta["coroutine-context"] as CoroutineContext) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index 3d49e544..e2044d05 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -28,5 +28,5 @@ package org.opendc.simulator.compute public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor(listener) + override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor() } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt index bcbde5b1..49745868 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.model +import org.opendc.simulator.resources.SimResource + /** * A memory unit of a compute resource, either virtual or physical. * @@ -30,9 +32,12 @@ package org.opendc.simulator.compute.model * @property speed The access speed of the memory in MHz. * @property size The size of the memory unit in MBs. */ -public data class MemoryUnit( +public data class SimMemoryUnit( public val vendor: String, public val modelName: String, public val speed: Double, public val size: Long -) +) : SimResource { + override val capacity: Double + get() = speed +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt index 58ed816c..4022ecb3 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt @@ -30,7 +30,7 @@ package org.opendc.simulator.compute.model * @property arch The micro-architecture of the processor node. * @property coreCount The number of logical CPUs in the processor node. */ -public data class ProcessingNode( +public data class SimProcessingNode( public val vendor: String, public val arch: String, public val modelName: String, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt index 415e95e6..1c989254 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.model +import org.opendc.simulator.resources.SimResource + /** * A single logical compute unit of processor node, either virtual or physical. * @@ -29,8 +31,11 @@ package org.opendc.simulator.compute.model * @property id The identifier of the CPU core within the processing node. * @property frequency The clock rate of the CPU in MHz. */ -public data class ProcessingUnit( - public val node: ProcessingNode, +public data class SimProcessingUnit( + public val node: SimProcessingNode, public val id: Int, public val frequency: Double -) +) : SimResource { + override val capacity: Double + get() = frequency +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index c22fcc07..9b47821e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -36,31 +40,35 @@ public class SimFlopsWorkload( public val utilization: Double = 0.8 ) : SimWorkload { init { - require(flops >= 0) { "Negative number of flops" } + require(flops >= 0) { "Number of FLOPs must be positive" } require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimExecutionContext) {} + override fun onStart(ctx: SimMachineContext) {} - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - val cores = ctx.machine.cpus.size - val limit = ctx.machine.cpus[cpu].frequency * utilization - val work = flops.toDouble() / cores - - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) - } else { - SimResourceCommand.Exit - } + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return CpuConsumer(ctx) } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.machine.cpus[cpu].frequency * utilization + private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> { + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + val limit = ctx.resource.frequency * utilization + val work = flops.toDouble() / machine.cpus.size + + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } - return SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.resource.frequency * utilization + return SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt deleted file mode 100644 index 41a5028e..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload - -/** - * A command that is sent to the host machine. - */ -public sealed class SimResourceCommand { - /** - * A request to the host to process the specified amount of [work] on a vCPU before the specified [deadline]. - * - * @param work The amount of work to process on the CPU. - * @param limit The maximum amount of work to be processed per second. - * @param deadline The instant at which the work needs to be fulfilled. - */ - public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { - init { - require(work > 0) { "The amount of work must be positive." } - require(limit > 0) { "Limit must be positive." } - } - } - - /** - * An indication to the host that the vCPU will idle until the specified [deadline] or is interrupted. - */ - public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() - - /** - * An indication to the host that the vCPU has finished processing. - */ - public object Exit : SimResourceCommand() -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 00ebebce..313b6ed5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that models application execution as a single duration. @@ -39,20 +43,26 @@ public class SimRuntimeWorkload( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimExecutionContext) {} + override fun onStart(ctx: SimMachineContext) {} - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - val limit = ctx.machine.cpus[cpu].frequency * utilization - val work = (limit / 1000) * duration - return SimResourceCommand.Consume(work, limit) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return CpuConsumer() } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.machine.cpus[cpu].frequency * utilization - SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit + private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + val limit = ctx.resource.frequency * utilization + val work = (limit / 1000) * duration + return SimResourceCommand.Consume(work, limit) + } + + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.resource.frequency * utilization + SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index deb10b98..31f58a0f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -22,7 +22,12 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.consumer.SimConsumerBarrier /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -32,38 +37,44 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa private var offset = 0L private val iterator = trace.iterator() private var fragment: Fragment? = null - private lateinit var barrier: SimWorkloadBarrier + private lateinit var barrier: SimConsumerBarrier - override fun onStart(ctx: SimExecutionContext) { - barrier = SimWorkloadBarrier(ctx.machine.cpus.size) + override fun onStart(ctx: SimMachineContext) { + barrier = SimConsumerBarrier(ctx.cpus.size) fragment = nextFragment() offset = ctx.clock.millis() } - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return onNext(ctx, cpu, 0.0) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return CpuConsumer() } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val work = (fragment.duration / 1000) * fragment.usage - val deadline = offset + fragment.duration + private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + return onNext(ctx, 0.0) + } - assert(deadline >= now) { "Deadline already passed" } + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val work = (fragment.duration / 1000) * fragment.usage + val deadline = offset + fragment.duration - val cmd = - if (cpu < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + assert(deadline >= now) { "Deadline already passed" } - if (barrier.enter()) { - this.fragment = nextFragment() - this.offset += fragment.duration - } + val cmd = + if (ctx.resource.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) - return cmd + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } + + return cmd + } } override fun toString(): String = "SimTraceWorkload" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 6fc78d56..60661e23 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -22,7 +22,9 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceConsumer /** * A model that characterizes the runtime behavior of some particular workload. @@ -32,27 +34,12 @@ import org.opendc.simulator.compute.SimExecutionContext */ public interface SimWorkload { /** - * This method is invoked when the workload is started, before the (virtual) CPUs assigned to the workload will - * start. + * This method is invoked when the workload is started. */ - public fun onStart(ctx: SimExecutionContext) + public fun onStart(ctx: SimMachineContext) /** - * This method is invoked when a (virtual) CPU assigned to the workload has started. - * - * @param ctx The execution context in which the workload runs. - * @param cpu The index of the (virtual) CPU to start. - * @return The command to perform on the CPU. + * Obtain the resource consumer for the specified processing unit. */ - public fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand - - /** - * This method is invoked when a (virtual) CPU assigned to the workload was interrupted or reached its deadline. - * - * @param ctx The execution context in which the workload runs. - * @param cpu The index of the (virtual) CPU to obtain the resource consumption of. - * @param remainingWork The remaining work that was not yet completed. - * @return The next command to perform on the CPU. - */ - public fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand + public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt deleted file mode 100644 index 45a299be..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadBarrier.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload - -/** - * The [SimWorkloadBarrier] is a barrier that allows workloads to wait for a select number of CPUs to complete, before - * proceeding its operation. - */ -public class SimWorkloadBarrier(public val parties: Int) { - private var counter = 0 - - /** - * Enter the barrier and determine whether the caller is the last to reach the barrier. - * - * @return `true` if the caller is the last to reach the barrier, `false` otherwise. - */ - public fun enter(): Boolean { - val last = ++counter == parties - if (last) { - counter = 0 - return true - } - return false - } -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index b8eee4f0..4ac8cf63 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -23,38 +23,33 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock /** * Test suite for the [SimHypervisor] class. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimHypervisorTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock - private lateinit var machineModel: SimMachineModel + private lateinit var model: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) - machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) + model = SimMachineModel( + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -62,7 +57,8 @@ internal class SimHypervisorTest { * Test overcommitting of resources via the hypervisor with a single VM. */ @Test - fun testOvercommittedSingle() { + fun testOvercommittedSingle() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val listener = object : SimHypervisor.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L @@ -83,38 +79,34 @@ internal class SimHypervisorTest { } } - scope.launch { - val duration = 5 * 60L - val workloadA = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) - ), - ) - - val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimFairShareHypervisor(listener) - - launch { - machine.run(hypervisor) - } - - yield() - launch { hypervisor.createMachine(machineModel).run(workloadA) } + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + + val machine = SimBareMetalMachine(coroutineContext, clock, model) + val hypervisor = SimFairShareHypervisor(listener) + + launch { + machine.run(hypervisor) + println("Hypervisor finished") } - - scope.advanceUntilIdle() - scope.uncaughtExceptions.forEach { it.printStackTrace() } + yield() + hypervisor.createMachine(model).run(workloadA) + yield() + machine.close() assertAll( - { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, scope.currentTime) } + { assertEquals(1200000, currentTime) } ) } @@ -122,7 +114,8 @@ internal class SimHypervisorTest { * Test overcommitting of resources via the hypervisor with two VMs. */ @Test - fun testOvercommittedDual() { + fun testOvercommittedDual() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val listener = object : SimHypervisor.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L @@ -143,48 +136,53 @@ internal class SimHypervisorTest { } } - scope.launch { - val duration = 5 * 60L - val workloadA = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) - ), - ) - val workloadB = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) - ) + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + val workloadB = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) ) + ) + + val machine = SimBareMetalMachine(coroutineContext, clock, model) + val hypervisor = SimFairShareHypervisor(listener) - val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimFairShareHypervisor(listener) + launch { + machine.run(hypervisor) + } + yield() + coroutineScope { launch { - machine.run(hypervisor) + val vm = hypervisor.createMachine(model) + vm.run(workloadA) + vm.close() } - - yield() - launch { hypervisor.createMachine(machineModel).run(workloadA) } - launch { hypervisor.createMachine(machineModel).run(workloadB) } + val vm = hypervisor.createMachine(model) + vm.run(workloadB) + vm.close() } - - scope.advanceUntilIdle() - scope.uncaughtExceptions.forEach { it.printStackTrace() } + yield() + machine.close() + yield() assertAll( - { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, scope.currentTime) } + { assertEquals(1200000, currentTime) } ) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 1036f1ac..6adc41d0 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -24,19 +24,14 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter /** @@ -48,112 +43,44 @@ class SimMachineTest { @BeforeEach fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @Test - fun testFlopsWorkload() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) + fun testFlopsWorkload() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) - testScope.runBlockingTest { + try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) // Two cores execute 1000 MFlOps per second (1000 ms) - assertEquals(1000, testScope.currentTime) + assertEquals(1000, currentTime) + } finally { + machine.close() } } @Test - fun testUsage() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) + fun testUsage() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) - testScope.runBlockingTest { - val res = mutableListOf<Double>() - val job = launch { machine.usage.toList(res) } + val res = mutableListOf<Double>() + val job = launch { machine.usage.toList(res) } + try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) job.cancel() assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } - } - } - - @Test - fun testInterrupt() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - ctx.interrupt(cpu) - return SimResourceCommand.Exit - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertDoesNotThrow { - testScope.runBlockingTest { machine.run(workload) } - } - } - - @Test - fun testExceptionPropagationOnStart() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - throw IllegalStateException() - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertThrows<IllegalStateException> { - testScope.runBlockingTest { machine.run(workload) } - } - } - - @Test - fun testExceptionPropagationOnNext() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertThrows<IllegalStateException> { - testScope.runBlockingTest { machine.run(workload) } + } finally { + machine.close() } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 1a9faf11..8428a0a7 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -25,38 +25,33 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock /** * A test suite for the [SimSpaceSharedHypervisor]. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimSpaceSharedHypervisorTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -64,42 +59,45 @@ internal class SimSpaceSharedHypervisorTest { * Test a trace workload. */ @Test - fun testTrace() { + fun testTrace() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val usagePm = mutableListOf<Double>() val usageVm = mutableListOf<Double>() - scope.launch { - val duration = 5 * 60L - val workloadA = - SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) - ), - ) - - val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimSpaceSharedHypervisor() - - launch { machine.usage.toList(usagePm) } - launch { machine.run(hypervisor) } - - yield() - launch { - val vm = hypervisor.createMachine(machineModel) - launch { vm.usage.toList(usageVm) } - vm.run(workloadA) - } - } - - scope.advanceUntilIdle() + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + val colA = launch { machine.usage.toList(usagePm) } + launch { machine.run(hypervisor) } + + yield() + + val vm = hypervisor.createMachine(machineModel) + val colB = launch { vm.usage.toList(usageVm) } + vm.run(workloadA) + yield() + + vm.close() + machine.close() + colA.cancel() + colB.cancel() assertAll( { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usagePm) { "Correct PM usage" } }, - { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } }, - { assertEquals(5 * 60L * 4000, scope.currentTime) { "Took enough time" } } + // Temporary limitation is that VMs do not emit usage information + // { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } }, + { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } } ) } @@ -107,69 +105,111 @@ internal class SimSpaceSharedHypervisorTest { * Test runtime workload on hypervisor. */ @Test - fun testRuntimeWorkload() { + fun testRuntimeWorkload() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) - val machine = SimBareMetalMachine(scope, clock, machineModel) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) val hypervisor = SimSpaceSharedHypervisor() - scope.launch { - launch { machine.run(hypervisor) } + launch { machine.run(hypervisor) } + yield() + val vm = hypervisor.createMachine(machineModel) + vm.run(workload) + vm.close() + machine.close() + + assertEquals(duration, currentTime) { "Took enough time" } + } - yield() - launch { hypervisor.createMachine(machineModel).run(workload) } - } + /** + * Test FLOPs workload on hypervisor. + */ + @Test + fun testFlopsWorkload() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) - scope.advanceUntilIdle() + val duration = 5 * 60L * 1000 + val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + launch { machine.run(hypervisor) } + yield() + val vm = hypervisor.createMachine(machineModel) + vm.run(workload) + machine.close() - assertEquals(duration, scope.currentTime) { "Took enough time" } + assertEquals(duration, currentTime) { "Took enough time" } } /** - * Test concurrent workloads on the machine. + * Test two workloads running sequentially. */ @Test - fun testConcurrentWorkloadFails() { - val machine = SimBareMetalMachine(scope, clock, machineModel) + fun testTwoWorkloads() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val duration = 5 * 60L * 1000 + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) val hypervisor = SimSpaceSharedHypervisor() - scope.launch { - launch { machine.run(hypervisor) } + launch { machine.run(hypervisor) } + yield() - yield() + val vm = hypervisor.createMachine(machineModel) + vm.run(SimRuntimeWorkload(duration)) + vm.close() - hypervisor.createMachine(machineModel) + val vm2 = hypervisor.createMachine(machineModel) + vm2.run(SimRuntimeWorkload(duration)) + vm2.close() + machine.close() - assertAll( - { assertFalse(hypervisor.canFit(machineModel)) }, - { assertThrows<IllegalStateException> { hypervisor.createMachine(machineModel) } } - ) - } - - scope.advanceUntilIdle() + assertEquals(duration * 2, currentTime) { "Took enough time" } } /** * Test concurrent workloads on the machine. */ @Test - fun testConcurrentWorkloadSucceeds() { - val machine = SimBareMetalMachine(scope, clock, machineModel) + fun testConcurrentWorkloadFails() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) val hypervisor = SimSpaceSharedHypervisor() - scope.launch { - launch { machine.run(hypervisor) } + launch { machine.run(hypervisor) } + yield() - yield() + hypervisor.createMachine(machineModel) - hypervisor.createMachine(machineModel).close() + assertAll( + { assertFalse(hypervisor.canFit(machineModel)) }, + { assertThrows<IllegalArgumentException> { hypervisor.createMachine(machineModel) } } + ) - assertAll( - { assertTrue(hypervisor.canFit(machineModel)) }, - { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } - ) - } + machine.close() + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadSucceeds() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val machine = SimBareMetalMachine(coroutineContext, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + launch { machine.run(hypervisor) } + yield() + + hypervisor.createMachine(machineModel).close() + + assertAll( + { assertTrue(hypervisor.canFit(machineModel)) }, + { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } + ) - scope.advanceUntilIdle() + machine.close() } } |
