diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-12-30 14:03:12 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-16 12:06:40 +0100 |
| commit | 6a2a5423479696e8dc28885be27cc3e3252f28b0 (patch) | |
| tree | e23dd1d7ab3a15969da5f7e02baf24a9434b9912 /simulator/opendc-simulator/opendc-simulator-compute/src | |
| parent | df2f52780c08c5d108741d3746eaf03222c64841 (diff) | |
simulator: Add generic framework for resource consumption modeling
This change adds a generic framework for modeling resource consumptions and
adapts opendc-simulator-compute to model machines and VMs on top of
this framework.
This framework anticipates the addition of additional resource types
such as memory, disk and network to the OpenDC codebase.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-compute/src')
16 files changed, 436 insertions, 729 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f74c5697..b1d1c0b7 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -25,16 +25,16 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.* import org.opendc.utils.TimerScheduler import java.time.Clock import java.util.* import kotlin.coroutines.* -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A simulated bare-metal machine that is able to run a single workload. @@ -52,11 +52,9 @@ public class SimBareMetalMachine( private val clock: Clock, override val model: SimMachineModel ) : SimMachine { - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ + private val _usage = MutableStateFlow(0.0) override val usage: StateFlow<Double> - get() = usageState + get() = _usage /** * A flag to indicate that the machine is terminated. @@ -64,249 +62,63 @@ public class SimBareMetalMachine( private var isTerminated = false /** - * The [MutableStateFlow] containing the load of the server. - */ - private val usageState = MutableStateFlow(0.0) - - /** - * The current active workload. - */ - private var cont: Continuation<Unit>? = null - - /** - * The active CPUs of this machine. - */ - private var cpus: List<Cpu> = emptyList() - - /** * The [TimerScheduler] to use for scheduling the interrupts. */ - private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock) + private val scheduler = TimerScheduler<Any>(coroutineScope, clock) /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The execution context in which the workload runs. */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = this@SimBareMetalMachine.model - - override val clock: Clock - get() = this@SimBareMetalMachine.clock - - override val meta: Map<String, Any> - get() = meta - - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } - } - - workload.onStart(ctx) + private inner class Context(val map: Map<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>, + override val meta: Map<String, Any>) : SimMachineContext { + override val clock: Clock + get() = this@SimBareMetalMachine.clock - return suspendCancellableCoroutine { cont -> - this.cont = cont - this.cpus = model.cpus.map { Cpu(ctx, it, workload) } + override val cpus: List<SimProcessingUnit> = model.cpus - for (cpu in cpus) { - cpu.start() - } - } - } + override val memory: List<SimMemoryUnit> = model.memory - /** - * Terminate the specified bare-metal machine. - */ - override fun close() { - isTerminated = true - } - - /** - * Update the usage of the machine. - */ - private fun updateUsage() { - usageState.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } - } - - /** - * This method is invoked when one of the CPUs has exited. - */ - private fun onCpuExit(cpu: Int) { - // Check whether all other CPUs have finished - if (cpus.all { it.hasExited }) { - val cont = cont - this.cont = null - cont?.resume(Unit) + override fun interrupt(resource: SimResource) { + val context = map[resource] + checkNotNull(context) { "Invalid resource" } + context.interrupt() } } /** - * This method is invoked when one of the CPUs failed. - */ - private fun onCpuFailure(e: Throwable) { - // Make sure no other tasks will be resumed. - scheduler.cancelAll() - - // In case the flush fails with an exception, immediately propagate to caller, cancelling all other - // tasks. - val cont = cont - this.cont = null - cont?.resumeWithException(e) - } - - /** - * A physical CPU of the machine. + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) { - /** - * The current command. - */ - private var currentCommand: CommandWrapper? = null - - /** - * The actual processing speed. - */ - var speed: Double = 0.0 - set(value) { - field = value - updateUsage() - } - - /** - * A flag to indicate that the CPU is currently processing a command. - */ - var isIntermediate: Boolean = false - - /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - /** - * Process the specified [SimResourceCommand] for this CPU. - */ - fun process(command: SimResourceCommand) { - val timestamp = clock.millis() - - val task = when (command) { - is SimResourceCommand.Idle -> { - speed = 0.0 - - val deadline = command.deadline - - require(deadline >= timestamp) { "Deadline already passed" } - - if (deadline != Long.MAX_VALUE) { - scheduler.startSingleTimerTo(this, deadline) { flush() } - } else { - null - } - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline - - require(deadline >= timestamp) { "Deadline already passed" } - - speed = min(model.frequency, limit) - - // The required duration to process all the work - val finishedAt = timestamp + ceil(work / speed * 1000).toLong() - - scheduler.startSingleTimerTo(this, min(finishedAt, deadline)) { flush() } - } - is SimResourceCommand.Exit -> { - speed = 0.0 - hasExited = true + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = coroutineScope { + require(!isTerminated) { "Machine is terminated" } + val map = mutableMapOf<SimProcessingUnit, SimResourceContext<SimProcessingUnit>>() + val ctx = Context(map, meta) + val sources = model.cpus.map { SimResourceSource(it, clock, scheduler) } + val totalCapacity = model.cpus.sumByDouble { it.frequency } - onCpuExit(model.id) + workload.onStart(ctx) - null + for (source in sources) { + val consumer = workload.getConsumer(ctx, source.resource) + val job = source.speed + .onEach { + _usage.value = sources.sumByDouble { it.speed.value } / totalCapacity } - } - - assert(currentCommand == null) { "Concurrent access to current command" } - currentCommand = CommandWrapper(timestamp, command) - } + .launchIn(this) - /** - * Request the workload for more work. - */ - private fun next(remainingWork: Double) { - process(workload.onNext(ctx, model.id, remainingWork)) - } - - /** - * Start the CPU. - */ - fun start() { - try { - isIntermediate = true - - process(workload.onStart(ctx, model.id)) - } catch (e: Throwable) { - onCpuFailure(e) - } finally { - isIntermediate = false - } - } - - /** - * Flush the work performed by the CPU. - */ - fun flush() { - try { - val (timestamp, command) = currentCommand ?: return - - isIntermediate = true - currentCommand = null - - // Cancel the running task and flush the progress - scheduler.cancel(this) - - when (command) { - is SimResourceCommand.Idle -> next(remainingWork = 0.0) - is SimResourceCommand.Consume -> { - val duration = clock.millis() - timestamp - val remainingWork = if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, command.work - processed) - } else { - 0.0 - } - - next(remainingWork) + launch { + source.consume(object : SimResourceConsumer<SimProcessingUnit> by consumer { + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + map[ctx.resource] = ctx + return consumer.onStart(ctx) } - SimResourceCommand.Exit -> throw IllegalStateException() - } - } catch (e: Throwable) { - onCpuFailure(e) - } finally { - isIntermediate = false + }) + job.cancel() } } - - /** - * Interrupt the CPU. - */ - fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isIntermediate) { - return - } - - flush() - } } - /** - * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. - */ - private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) + override fun close() { + isTerminated = true + scheduler.close() + } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index bf6d8a5e..12b3b428 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -26,10 +26,11 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.compute.workload.SimWorkloadBarrier +import org.opendc.simulator.resources.* import java.time.Clock import kotlin.coroutines.Continuation import kotlin.coroutines.resume @@ -44,22 +45,22 @@ import kotlin.math.min * * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { +public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> { - override fun onStart(ctx: SimExecutionContext) { - val model = ctx.machine + override fun onStart(ctx: SimMachineContext) { this.ctx = ctx - this.commands = Array(model.cpus.size) { SimResourceCommand.Idle() } - this.pCpus = model.cpus.indices.sortedBy { model.cpus[it].frequency }.toIntArray() - this.maxUsage = model.cpus.sumByDouble { it.frequency } - this.barrier = SimWorkloadBarrier(model.cpus.size) + this.commands = Array(ctx.cpus.size) { SimResourceCommand.Idle() } + this.pCpus = ctx.cpus.indices.sortedBy { ctx.cpus[it].frequency }.toIntArray() + this.maxUsage = ctx.cpus.sumByDouble { it.frequency } + this.barrier = SimWorkloadBarrier(ctx.cpus.size) } - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return commands[cpu] + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return this } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + val cpu = ctx.resource.id totalRemainingWork += remainingWork val isLast = barrier.enter() @@ -82,6 +83,10 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener } } + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + return commands[ctx.resource.id] + } + override fun canFit(model: SimMachineModel): Boolean = true override fun createMachine( @@ -92,7 +97,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * The execution context in which the hypervisor runs. */ - private lateinit var ctx: SimExecutionContext + private lateinit var ctx: SimMachineContext /** * The commands to submit to the underlying host. @@ -199,7 +204,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener val vcpu = vcpuIterator.next() val availableShare = availableSpeed / remaining-- - when (val command = vcpu.command) { + when (val command = vcpu.activeCommand) { is SimResourceCommand.Idle -> { // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) @@ -246,7 +251,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener // Divide the requests over the available capacity of the pCPUs fairly for (i in pCpus) { - val maxCpuUsage = ctx.machine.cpus[i].frequency + val maxCpuUsage = ctx.cpus[i].frequency val fraction = maxCpuUsage / maxUsage val grantedSpeed = min(maxCpuUsage, totalAllocatedSpeed * fraction) val grantedWork = duration * grantedSpeed @@ -275,7 +280,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener private fun flushGuests() { // Flush all the vCPUs work for (vcpu in vcpus) { - vcpu.flush(interrupt = false) + vcpu.flush(isIntermediate = true) } // Report metrics @@ -299,9 +304,9 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener /** * Interrupt all host CPUs. */ - private fun SimExecutionContext.interruptAll() { - for (i in machine.cpus.indices) { - interrupt(i) + private fun SimMachineContext.interruptAll() { + for (cpu in ctx.cpus) { + interrupt(cpu) } } @@ -336,33 +341,38 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener private var cpus: List<VCpu> = emptyList() /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The execution context in which the workload runs. */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model + inner class Context(override val meta: Map<String, Any>) : SimMachineContext { + override val cpus: List<SimProcessingUnit> + get() = model.cpus - override val clock: Clock - get() = this@SimFairShareHypervisor.ctx.clock + override val memory: List<SimMemoryUnit> + get() = model.memory - override val meta: Map<String, Any> - get() = meta + override val clock: Clock + get() = this@SimFairShareHypervisor.ctx.clock - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } + override fun interrupt(resource: SimResource) { + TODO() } + } + + lateinit var ctx: SimMachineContext + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { + require(!isTerminated) { "Machine is terminated" } + require(cont == null) { "Run should not be called concurrently" } + ctx = Context(meta) workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) } + this.cpus = model.cpus.map { VCpu(this, it, workload.getConsumer(ctx, it), ctx.clock) } for (cpu in cpus) { // Register vCPU to scheduler @@ -387,13 +397,13 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener * Update the usage of the VM. */ fun updateUsage() { - usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.model.frequency } + usage.value = cpus.sumByDouble { it.actualSpeed } / cpus.sumByDouble { it.resource.frequency } } /** * This method is invoked when one of the CPUs has exited. */ - fun onCpuExit(cpu: Int) { + fun onCpuExit() { // Check whether all other CPUs have finished if (cpus.all { it.hasExited }) { val cont = cont @@ -419,19 +429,14 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener */ private inner class VCpu( val vm: SimVm, - val ctx: SimExecutionContext, - val model: ProcessingUnit, - val workload: SimWorkload - ) : Comparable<VCpu> { + resource: SimProcessingUnit, + consumer: SimResourceConsumer<SimProcessingUnit>, + clock: Clock + ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer), Comparable<VCpu> { /** - * The latest command processed by the CPU. + * The current command that is processed by the vCPU. */ - var command: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The latest timestamp at which the vCPU was flushed. - */ - var latestFlush: Long = 0 + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() /** * The processing speed that is allowed by the model constraints. @@ -448,148 +453,74 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener } /** - * A flag to indicate that the CPU is currently processing a command. - */ - var isIntermediate: Boolean = false - - /** * A flag to indicate that the CPU has exited. */ - val hasExited: Boolean - get() = command is SimResourceCommand.Exit - - /** - * Process the specified [SimResourceCommand] for this CPU. - */ - fun process(command: SimResourceCommand) { - // Assign command as the most recent executed command - this.command = command - - when (command) { - is SimResourceCommand.Idle -> { - require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } + var hasExited: Boolean = false - allowedSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - require(command.deadline >= ctx.clock.millis()) { "Deadline already passed" } + override fun onIdle(deadline: Long) { + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } - allowedSpeed = min(model.frequency, command.limit) - } - is SimResourceCommand.Exit -> { - allowedSpeed = 0.0 - actualSpeed = 0.0 + override fun onConsume(work: Double, limit: Double, deadline: Long) { + allowedSpeed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } - vm.onCpuExit(model.id) - } - } + override fun onFinish() { + hasExited = true + activeCommand = SimResourceCommand.Exit + vm.onCpuExit() } - /** - * Start the CPU. - */ - fun start() { - try { - isIntermediate = true - latestFlush = ctx.clock.millis() - - process(workload.onStart(ctx, model.id)) - } catch (e: Throwable) { - fail(e) - } finally { - isIntermediate = false - } + override fun onFailure(cause: Throwable) { + hasExited = true + activeCommand = SimResourceCommand.Exit + vm.onCpuFailure(cause) } - /** - * Flush the work performed by the CPU. - */ - fun flush(interrupt: Boolean) { - val now = ctx.clock.millis() + override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + // Apply performance interference model + val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0 - // Fast path: if the CPU was already flushed at at the current instant, no need to flush the progress. - if (latestFlush >= now) { - return + // Compute the remaining amount of work + val remainingWork = if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 } - try { - isIntermediate = true - when (val command = command) { - is SimResourceCommand.Idle -> { - // Act like nothing has happened in case the vCPU did not reach its deadline or was not - // interrupted by the user. - if (interrupt || command.deadline <= now) { - process(workload.onNext(ctx, model.id, 0.0)) - } - } - is SimResourceCommand.Consume -> { - // Apply performance interference model - val performanceScore = vm.performanceInterferenceModel?.apply(load) ?: 1.0 - - // Compute the remaining amount of work - val remainingWork = if (command.work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - totalInterferedWork += interferedWork - - max(0.0, command.work - processed) - } else { - 0.0 - } - - // Act like nothing has happened in case the vCPU did not finish yet or was not interrupted by - // the user. - if (interrupt || remainingWork == 0.0 || command.deadline <= now) { - if (!interrupt) { - totalOvercommittedWork += remainingWork - } - - process(workload.onNext(ctx, model.id, remainingWork)) - } else { - process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) - } - } - SimResourceCommand.Exit -> - throw IllegalStateException() - } - } catch (e: Throwable) { - fail(e) - } finally { - latestFlush = now - isIntermediate = false + if (!isInterrupted) { + totalOvercommittedWork += remainingWork } + + return remainingWork } - /** - * Interrupt the CPU. - */ - fun interrupt() { + override fun interrupt() { // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead // to infinite recursion. - if (isIntermediate) { + if (isProcessing) { return } - flush(interrupt = true) + super.interrupt() // Force the scheduler to re-schedule shouldSchedule() } - /** - * Fail the CPU. - */ - fun fail(e: Throwable) { - command = SimResourceCommand.Exit - vm.onCpuFailure(e) - } - override fun compareTo(other: VCpu): Int = allowedSpeed.compareTo(other.allowedSpeed) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index 657dac66..5c67b990 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,6 +22,9 @@ package org.opendc.simulator.compute +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResource import java.time.Clock /** @@ -29,27 +32,31 @@ import java.time.Clock * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on * which the image runs. */ -public interface SimExecutionContext { +public interface SimMachineContext { /** * The virtual clock tracking simulation time. */ public val clock: Clock + + /** + * The metadata associated with the context. + */ + public val meta: Map<String, Any> /** - * The machine model of the machine that is running the image. + * The CPUs available on the machine. */ - public val machine: SimMachineModel + public val cpus: List<SimProcessingUnit> /** - * The metadata associated with the context. + * The memory available on the machine */ - public val meta: Map<String, Any> + public val memory: List<SimMemoryUnit> /** - * Ask the host machine to interrupt the specified vCPU. + * Interrupt the specified [resource]. * - * @param cpu The id of the vCPU to interrupt. - * @throws IllegalArgumentException if the identifier points to a non-existing vCPU. + * @throws IllegalArgumentException if the resource does not belong to this execution context. */ - public fun interrupt(cpu: Int) + public fun interrupt(resource: SimResource) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt index c2988b11..d6bf0e99 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit /** * A description of the physical or virtual machine on which a bootable image runs. @@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.ProcessingUnit * @property cpus The list of processing units available to the image. * @property memory The list of memory units available to the image. */ -public data class SimMachineModel(public val cpus: List<ProcessingUnit>, public val memory: List<MemoryUnit>) +public data class SimMachineModel(public val cpus: List<SimProcessingUnit>, public val memory: List<SimMemoryUnit>) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 778b68ca..751873a5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -26,26 +26,26 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.* import java.time.Clock import java.util.ArrayDeque import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException -import kotlin.math.min /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. * * @param listener The hypervisor listener to use. */ -public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { +public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor, SimResourceConsumer<SimProcessingUnit> { /** * The execution context in which the hypervisor runs. */ - private lateinit var ctx: SimExecutionContext + private lateinit var ctx: SimMachineContext /** * The mapping from pCPU to vCPU. @@ -67,18 +67,36 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen return SimVm(model, performanceInterferenceModel) } - override fun onStart(ctx: SimExecutionContext) { + override fun onStart(ctx: SimMachineContext) { this.ctx = ctx - this.vcpus = arrayOfNulls(ctx.machine.cpus.size) - this.availableCpus.addAll(ctx.machine.cpus.indices) + this.vcpus = arrayOfNulls(ctx.cpus.size) + this.availableCpus.addAll(ctx.cpus.indices) } - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return onNext(ctx, cpu, 0.0) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return this } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle() + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + return onNext(ctx, 0.0) + } + + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + val vcpu = vcpus[ctx.resource.id] ?: return SimResourceCommand.Idle() + + if (vcpu.isStarted) { + vcpu.remainingWork = remainingWork + vcpu.flush() + } else { + vcpu.isStarted = true + vcpu.start() + } + + if (vcpu.hasExited && vcpu != vcpus[ctx.resource.id]) { + return onNext(ctx, remainingWork) + } + + return vcpu.activeCommand } /** @@ -117,36 +135,46 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen private var cpus: List<VCpu> = emptyList() /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * The execution context in which the workload runs. */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - require(!isTerminated) { "Machine is terminated" } - require(cont == null) { "Run should not be called concurrently" } - - val ctx = object : SimExecutionContext { - override val machine: SimMachineModel - get() = model + inner class Context(override val meta: Map<String, Any>) : SimMachineContext { + override val cpus: List<SimProcessingUnit> + get() = model.cpus - override val clock: Clock - get() = this@SimSpaceSharedHypervisor.ctx.clock + override val memory: List<SimMemoryUnit> + get() = model.memory - override val meta: Map<String, Any> - get() = meta + override val clock: Clock + get() = this@SimSpaceSharedHypervisor.ctx.clock - override fun interrupt(cpu: Int) { - require(cpu < cpus.size) { "Invalid CPU identifier" } - cpus[cpu].interrupt() - } + override fun interrupt(resource: SimResource) { + TODO() } + } + + lateinit var ctx: SimMachineContext + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { + require(!isTerminated) { "Machine is terminated" } + require(cont == null) { "Run should not be called concurrently" } + + ctx = Context(meta) workload.onStart(ctx) return suspendCancellableCoroutine { cont -> this.cont = cont - this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) } + try { + this.cpus = model.cpus.map { model -> VCpu(this, model, workload.getConsumer(ctx, model), ctx.clock) } - for (cpu in cpus) { - cpu.start() + for ((index, pCPU) in pCPUs.withIndex()) { + vcpus[pCPU] = cpus[index] + this@SimSpaceSharedHypervisor.ctx.interrupt(this@SimSpaceSharedHypervisor.ctx.cpus[pCPU]) + } + } catch (e: Throwable) { + cont.resumeWithException(e) } } } @@ -157,19 +185,23 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen vcpus[pCPU] = null availableCpus.add(pCPU) } + + val cont = cont + this.cont = null + cont?.resume(Unit) } /** * Update the usage of the VM. */ fun updateUsage() { - usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } + usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.resource.frequency } } /** * This method is invoked when one of the CPUs has exited. */ - fun onCpuExit(cpu: Int) { + fun onCpuExit() { // Check whether all other CPUs have finished if (cpus.all { it.hasExited }) { val cont = cont @@ -193,7 +225,22 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen /** * A CPU of the virtual machine. */ - private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { + private inner class VCpu( + val vm: SimVm, + resource: SimProcessingUnit, + consumer: SimResourceConsumer<SimProcessingUnit>, + clock: Clock + ) : SimAbstractResourceContext<SimProcessingUnit>(resource, clock, consumer) { + /** + * Indicates that the vCPU was started. + */ + var isStarted: Boolean = false + + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + /** * The processing speed of the vCPU. */ @@ -204,81 +251,41 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen } /** - * A flag to indicate that the CPU has exited. - */ - var hasExited: Boolean = false - - /** - * A flag to indicate that the CPU was started. + * The amount of work remaining from the previous consumption. */ - var hasStarted: Boolean = false + var remainingWork: Double = 0.0 /** - * Process the specified [SimResourceCommand] for this CPU. + * A flag to indicate that the CPU has exited. */ - fun process(command: SimResourceCommand): SimResourceCommand { - return when (command) { - is SimResourceCommand.Idle -> { - speed = 0.0 - command - } - is SimResourceCommand.Consume -> { - speed = min(model.frequency, command.limit) - command - } - is SimResourceCommand.Exit -> { - speed = 0.0 - hasExited = true - - vm.onCpuExit(model.id) - - SimResourceCommand.Idle() - } - } - } + var hasExited: Boolean = false - /** - * Start the CPU. - */ - fun start() { - vcpus[pCPU] = this - interrupt() + override fun onIdle(deadline: Long) { + speed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) } - /** - * Request the workload for more work. - */ - fun next(remainingWork: Double): SimResourceCommand { - return try { - val command = - if (hasStarted) { - workload.onNext(ctx, model.id, remainingWork) - } else { - hasStarted = true - workload.onStart(ctx, model.id) - } - process(command) - } catch (e: Throwable) { - fail(e) - } + override fun onConsume(work: Double, limit: Double, deadline: Long) { + speed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, speed, deadline) } - /** - * Interrupt the CPU. - */ - fun interrupt() { - this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU) + override fun onFinish() { + speed = 0.0 + hasExited = true + activeCommand = SimResourceCommand.Idle() + vm.onCpuExit() } - /** - * Fail the CPU. - */ - fun fail(e: Throwable): SimResourceCommand { + override fun onFailure(cause: Throwable) { + speed = 0.0 hasExited = true + activeCommand = SimResourceCommand.Idle() + vm.onCpuFailure(cause) + } - vm.onCpuFailure(e) - - return SimResourceCommand.Idle() + override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + return remainingWork } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt index bcbde5b1..49745868 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.model +import org.opendc.simulator.resources.SimResource + /** * A memory unit of a compute resource, either virtual or physical. * @@ -30,9 +32,12 @@ package org.opendc.simulator.compute.model * @property speed The access speed of the memory in MHz. * @property size The size of the memory unit in MBs. */ -public data class MemoryUnit( +public data class SimMemoryUnit( public val vendor: String, public val modelName: String, public val speed: Double, public val size: Long -) +) : SimResource { + override val capacity: Double + get() = speed +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt index 58ed816c..4022ecb3 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt @@ -30,7 +30,7 @@ package org.opendc.simulator.compute.model * @property arch The micro-architecture of the processor node. * @property coreCount The number of logical CPUs in the processor node. */ -public data class ProcessingNode( +public data class SimProcessingNode( public val vendor: String, public val arch: String, public val modelName: String, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt index 415e95e6..1c989254 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.model +import org.opendc.simulator.resources.SimResource + /** * A single logical compute unit of processor node, either virtual or physical. * @@ -29,8 +31,11 @@ package org.opendc.simulator.compute.model * @property id The identifier of the CPU core within the processing node. * @property frequency The clock rate of the CPU in MHz. */ -public data class ProcessingUnit( - public val node: ProcessingNode, +public data class SimProcessingUnit( + public val node: SimProcessingNode, public val id: Int, public val frequency: Double -) +) : SimResource { + override val capacity: Double + get() = frequency +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index c22fcc07..9b47821e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -36,31 +40,35 @@ public class SimFlopsWorkload( public val utilization: Double = 0.8 ) : SimWorkload { init { - require(flops >= 0) { "Negative number of flops" } + require(flops >= 0) { "Number of FLOPs must be positive" } require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimExecutionContext) {} + override fun onStart(ctx: SimMachineContext) {} - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - val cores = ctx.machine.cpus.size - val limit = ctx.machine.cpus[cpu].frequency * utilization - val work = flops.toDouble() / cores - - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) - } else { - SimResourceCommand.Exit - } + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return CpuConsumer(ctx) } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.machine.cpus[cpu].frequency * utilization + private inner class CpuConsumer(private val machine: SimMachineContext) : SimResourceConsumer<SimProcessingUnit> { + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + val limit = ctx.resource.frequency * utilization + val work = flops.toDouble() / machine.cpus.size + + return if (work > 0.0) { + SimResourceCommand.Consume(work, limit) + } else { + SimResourceCommand.Exit + } + } - return SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.resource.frequency * utilization + return SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt deleted file mode 100644 index 41a5028e..00000000 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimResourceCommand.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload - -/** - * A command that is sent to the host machine. - */ -public sealed class SimResourceCommand { - /** - * A request to the host to process the specified amount of [work] on a vCPU before the specified [deadline]. - * - * @param work The amount of work to process on the CPU. - * @param limit The maximum amount of work to be processed per second. - * @param deadline The instant at which the work needs to be fulfilled. - */ - public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { - init { - require(work > 0) { "The amount of work must be positive." } - require(limit > 0) { "Limit must be positive." } - } - } - - /** - * An indication to the host that the vCPU will idle until the specified [deadline] or is interrupted. - */ - public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() - - /** - * An indication to the host that the vCPU has finished processing. - */ - public object Exit : SimResourceCommand() -} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 00ebebce..313b6ed5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that models application execution as a single duration. @@ -39,20 +43,26 @@ public class SimRuntimeWorkload( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimExecutionContext) {} + override fun onStart(ctx: SimMachineContext) {} - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - val limit = ctx.machine.cpus[cpu].frequency * utilization - val work = (limit / 1000) * duration - return SimResourceCommand.Consume(work, limit) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return CpuConsumer() } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return if (remainingWork > 0.0) { - val limit = ctx.machine.cpus[cpu].frequency * utilization - SimResourceCommand.Consume(remainingWork, limit) - } else { - SimResourceCommand.Exit + private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + val limit = ctx.resource.frequency * utilization + val work = (limit / 1000) * duration + return SimResourceCommand.Consume(work, limit) + } + + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + return if (remainingWork > 0.0) { + val limit = ctx.resource.frequency * utilization + SimResourceCommand.Consume(remainingWork, limit) + } else { + SimResourceCommand.Exit + } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index deb10b98..edef3843 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -22,7 +22,11 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceCommand +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -34,36 +38,42 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa private var fragment: Fragment? = null private lateinit var barrier: SimWorkloadBarrier - override fun onStart(ctx: SimExecutionContext) { - barrier = SimWorkloadBarrier(ctx.machine.cpus.size) + override fun onStart(ctx: SimMachineContext) { + barrier = SimWorkloadBarrier(ctx.cpus.size) fragment = nextFragment() offset = ctx.clock.millis() } - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return onNext(ctx, cpu, 0.0) + override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + return CpuConsumer() } - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val work = (fragment.duration / 1000) * fragment.usage - val deadline = offset + fragment.duration + private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { + override fun onStart(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { + return onNext(ctx, 0.0) + } - assert(deadline >= now) { "Deadline already passed" } + override fun onNext(ctx: SimResourceContext<SimProcessingUnit>, remainingWork: Double): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val work = (fragment.duration / 1000) * fragment.usage + val deadline = offset + fragment.duration - val cmd = - if (cpu < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + assert(deadline >= now) { "Deadline already passed" } - if (barrier.enter()) { - this.fragment = nextFragment() - this.offset += fragment.duration - } + val cmd = + if (ctx.resource.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) - return cmd + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } + + return cmd + } } override fun toString(): String = "SimTraceWorkload" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 6fc78d56..60661e23 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -22,7 +22,9 @@ package org.opendc.simulator.compute.workload -import org.opendc.simulator.compute.SimExecutionContext +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.resources.SimResourceConsumer /** * A model that characterizes the runtime behavior of some particular workload. @@ -32,27 +34,12 @@ import org.opendc.simulator.compute.SimExecutionContext */ public interface SimWorkload { /** - * This method is invoked when the workload is started, before the (virtual) CPUs assigned to the workload will - * start. + * This method is invoked when the workload is started. */ - public fun onStart(ctx: SimExecutionContext) + public fun onStart(ctx: SimMachineContext) /** - * This method is invoked when a (virtual) CPU assigned to the workload has started. - * - * @param ctx The execution context in which the workload runs. - * @param cpu The index of the (virtual) CPU to start. - * @return The command to perform on the CPU. + * Obtain the resource consumer for the specified processing unit. */ - public fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand - - /** - * This method is invoked when a (virtual) CPU assigned to the workload was interrupted or reached its deadline. - * - * @param ctx The execution context in which the workload runs. - * @param cpu The index of the (virtual) CPU to obtain the resource consumption of. - * @param remainingWork The remaining work that was not yet completed. - * @return The next command to perform on the CPU. - */ - public fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand + public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index b8eee4f0..4b4d7eca 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -30,9 +30,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import java.time.Clock @@ -51,10 +51,10 @@ internal class SimHypervisorTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 1036f1ac..00efba53 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -29,14 +29,10 @@ import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter /** @@ -48,11 +44,11 @@ class SimMachineTest { @BeforeEach fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -86,74 +82,4 @@ class SimMachineTest { assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } } } - - @Test - fun testInterrupt() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - ctx.interrupt(cpu) - return SimResourceCommand.Exit - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertDoesNotThrow { - testScope.runBlockingTest { machine.run(workload) } - } - } - - @Test - fun testExceptionPropagationOnStart() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - throw IllegalStateException() - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertThrows<IllegalStateException> { - testScope.runBlockingTest { machine.run(workload) } - } - } - - @Test - fun testExceptionPropagationOnNext() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val machine = SimBareMetalMachine(testScope, clock, machineModel) - - val workload = object : SimWorkload { - override fun onStart(ctx: SimExecutionContext) {} - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return SimResourceCommand.Consume(1.0, 1.0) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - throw IllegalStateException() - } - } - - assertThrows<IllegalStateException> { - testScope.runBlockingTest { machine.run(workload) } - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 1a9faf11..583d989c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -31,9 +31,10 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.model.SimMemoryUnit +import org.opendc.simulator.compute.model.SimProcessingNode +import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -53,10 +54,10 @@ internal class SimSpaceSharedHypervisorTest { scope = TestCoroutineScope() clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -126,6 +127,56 @@ internal class SimSpaceSharedHypervisorTest { } /** + * Test FLOPs workload on hypervisor. + */ + @Test + fun testFlopsWorkload() { + val duration = 5 * 60L * 1000 + val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + scope.launch { + launch { machine.run(hypervisor) } + + yield() + launch { hypervisor.createMachine(machineModel).run(workload) } + } + + scope.advanceUntilIdle() + + assertEquals(duration, scope.currentTime) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() { + val duration = 5 * 60L * 1000 + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + scope.launch { + launch { machine.run(hypervisor) } + + yield() + launch { + val vm = hypervisor.createMachine(machineModel) + vm.run(SimRuntimeWorkload(duration)) + vm.close() + + val vm2 = hypervisor.createMachine(machineModel) + vm2.run(SimRuntimeWorkload(duration)) + } + } + + scope.advanceUntilIdle() + + assertEquals(duration * 2, scope.currentTime) { "Took enough time" } + } + + /** * Test concurrent workloads on the machine. */ @Test |
