diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 16:45:13 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 18:16:40 +0100 |
| commit | 3718c385f84b463ac799080bb5603e0011adcd7d (patch) | |
| tree | 414e4c9fa82ade602cfdae4384f39b0bdb6cb139 /simulator/opendc-simulator | |
| parent | f616b720406250b1415593ff04c9d910b1fda54c (diff) | |
simulator: Remove generic resource constraint from resource model
This change removes the generic resource constraint (e.g., SimResource)
and replaces it by a simple capacity property. In the future, users
should handle the resource properties on a higher level.
This change simplifies compositions of consumers and providers by not
requiring a translation from resource to capacity.
Diffstat (limited to 'simulator/opendc-simulator')
45 files changed, 1408 insertions, 902 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 281d43ae..81d09f12 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -27,8 +27,8 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.launch import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* import java.time.Clock @@ -45,7 +45,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * The resource switch to use. */ - private lateinit var switch: SimResourceSwitch<SimProcessingUnit> + private lateinit var switch: SimResourceSwitch /** * The virtual machines running on this hypervisor. @@ -57,12 +57,12 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. */ - public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> + public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch /** * Check whether the specified machine model fits on this hypervisor. */ - public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean + public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean override fun canFit(model: SimMachineModel): Boolean { return canFit(model, switch) @@ -101,7 +101,7 @@ public abstract class SimAbstractHypervisor : SimHypervisor { /** * The vCPUs of the machine. */ - private val cpus: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>> = model.cpus.associateWith { switch.addOutput(it) } + private val cpus: Map<ProcessingUnit, SimResourceProvider> = model.cpus.associateWith { switch.addOutput(it.frequency) } /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. @@ -111,10 +111,10 @@ public abstract class SimAbstractHypervisor : SimHypervisor { require(!isTerminated) { "Machine is terminated" } val ctx = object : SimMachineContext { - override val cpus: List<SimProcessingUnit> + override val cpus: List<ProcessingUnit> get() = model.cpus - override val memory: List<SimMemoryUnit> + override val memory: List<MemoryUnit> get() = model.memory override val clock: Clock @@ -122,8 +122,8 @@ public abstract class SimAbstractHypervisor : SimHypervisor { override val meta: Map<String, Any> = meta - override fun interrupt(resource: SimResource) { - requireNotNull(this@VirtualMachine.cpus[resource]).interrupt() + override fun interrupt(cpu: ProcessingUnit) { + requireNotNull(this@VirtualMachine.cpus[cpu]).interrupt() } } @@ -155,8 +155,8 @@ public abstract class SimAbstractHypervisor : SimHypervisor { switch = createSwitch(ctx) } - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - val forwarder = SimResourceForwarder(cpu) + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + val forwarder = SimResourceForwarder() switch.addInput(forwarder) return forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 44906c2b..52945354 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -27,10 +27,9 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource import org.opendc.simulator.resources.consume @@ -65,24 +64,24 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * The resources allocated for this machine. */ - protected abstract val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> + protected abstract val resources: Map<ProcessingUnit, SimResourceSource> /** * The execution context in which the workload runs. */ private inner class Context( - val sources: Map<SimProcessingUnit, SimResourceProvider<SimProcessingUnit>>, + val sources: Map<ProcessingUnit, SimResourceProvider>, override val meta: Map<String, Any> ) : SimMachineContext { override val clock: Clock get() = this@SimAbstractMachine.clock - override val cpus: List<SimProcessingUnit> = model.cpus + override val cpus: List<ProcessingUnit> = model.cpus - override val memory: List<SimMemoryUnit> = model.memory + override val memory: List<MemoryUnit> = model.memory - override fun interrupt(resource: SimResource) { - checkNotNull(sources[resource]) { "Invalid resource" }.interrupt() + override fun interrupt(cpu: ProcessingUnit) { + checkNotNull(sources[cpu]) { "Invalid resource" }.interrupt() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 79982ea8..19479719 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* import org.opendc.utils.TimerScheduler import java.time.Clock @@ -57,8 +57,8 @@ public class SimBareMetalMachine( */ private val scheduler = TimerScheduler<Any>(this.context, clock) - override val resources: Map<SimProcessingUnit, SimResourceSource<SimProcessingUnit>> = - model.cpus.associateWith { SimResourceSource(it, clock, scheduler) } + override val resources: Map<ProcessingUnit, SimResourceSource> = + model.cpus.associateWith { SimResourceSource(it.frequency, clock, scheduler) } override fun close() { super.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index c629fbd9..fa677de9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* @@ -34,14 +33,14 @@ import org.opendc.simulator.resources.* */ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { - override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean = true + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchMaxMin( ctx.clock, - object : SimResourceSwitchMaxMin.Listener<SimProcessingUnit> { + object : SimResourceSwitchMaxMin.Listener { override fun onSliceFinish( - switch: SimResourceSwitchMaxMin<SimProcessingUnit>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index cff70826..85404e6e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,9 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit -import org.opendc.simulator.resources.SimResource +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit import java.time.Clock /** @@ -46,17 +45,17 @@ public interface SimMachineContext { /** * The CPUs available on the machine. */ - public val cpus: List<SimProcessingUnit> + public val cpus: List<ProcessingUnit> /** * The memory available on the machine */ - public val memory: List<SimMemoryUnit> + public val memory: List<MemoryUnit> /** - * Interrupt the specified [resource]. + * Interrupt the specified [cpu]. * * @throws IllegalArgumentException if the resource does not belong to this execution context. */ - public fun interrupt(resource: SimResource) + public fun interrupt(cpu: ProcessingUnit) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt index d6bf0e99..2b414540 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineModel.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingUnit /** * A description of the physical or virtual machine on which a bootable image runs. @@ -31,4 +31,4 @@ import org.opendc.simulator.compute.model.SimProcessingUnit * @property cpus The list of processing units available to the image. * @property memory The list of memory units available to the image. */ -public data class SimMachineModel(public val cpus: List<SimProcessingUnit>, public val memory: List<SimMemoryUnit>) +public data class SimMachineModel(public val cpus: List<ProcessingUnit>, public val memory: List<MemoryUnit>) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index 5de69884..fd8e546f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -22,18 +22,17 @@ package org.opendc.simulator.compute -import org.opendc.simulator.compute.model.SimProcessingUnit import org.opendc.simulator.resources.* /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { - override fun canFit(model: SimMachineModel, switch: SimResourceSwitch<SimProcessingUnit>): Boolean { + override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch<SimProcessingUnit> { + override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchExclusive() } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt index 49745868..bcbde5b1 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimMemoryUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MemoryUnit.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.compute.model -import org.opendc.simulator.resources.SimResource - /** * A memory unit of a compute resource, either virtual or physical. * @@ -32,12 +30,9 @@ import org.opendc.simulator.resources.SimResource * @property speed The access speed of the memory in MHz. * @property size The size of the memory unit in MBs. */ -public data class SimMemoryUnit( +public data class MemoryUnit( public val vendor: String, public val modelName: String, public val speed: Double, public val size: Long -) : SimResource { - override val capacity: Double - get() = speed -} +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt index 4022ecb3..58ed816c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingNode.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingNode.kt @@ -30,7 +30,7 @@ package org.opendc.simulator.compute.model * @property arch The micro-architecture of the processor node. * @property coreCount The number of logical CPUs in the processor node. */ -public data class SimProcessingNode( +public data class ProcessingNode( public val vendor: String, public val arch: String, public val modelName: String, diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt index 1c989254..415e95e6 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/SimProcessingUnit.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/ProcessingUnit.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.compute.model -import org.opendc.simulator.resources.SimResource - /** * A single logical compute unit of processor node, either virtual or physical. * @@ -31,11 +29,8 @@ import org.opendc.simulator.resources.SimResource * @property id The identifier of the CPU core within the processing node. * @property frequency The clock rate of the CPU in MHz. */ -public data class SimProcessingUnit( - public val node: SimProcessingNode, +public data class ProcessingUnit( + public val node: ProcessingNode, public val id: Int, public val frequency: Double -) : SimResource { - override val capacity: Double - get() = frequency -} +) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index f1079ee6..63c9d28c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.consumer.SimWorkConsumer @@ -45,7 +45,7 @@ public class SimFlopsWorkload( override fun onStart(ctx: SimMachineContext) {} - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index d7aa8f80..a3420e32 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.consumer.SimWorkConsumer @@ -44,7 +44,7 @@ public class SimRuntimeWorkload( override fun onStart(ctx: SimMachineContext) {} - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { val limit = cpu.frequency * utilization return SimWorkConsumer((limit / 1000) * duration, utilization) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index e8050263..2442d748 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -45,31 +45,29 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa offset = ctx.clock.millis() } - override fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> { - return CpuConsumer() - } + override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { + return object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val work = (fragment.duration / 1000) * fragment.usage + val deadline = offset + fragment.duration - private inner class CpuConsumer : SimResourceConsumer<SimProcessingUnit> { - override fun onNext(ctx: SimResourceContext<SimProcessingUnit>): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val work = (fragment.duration / 1000) * fragment.usage - val deadline = offset + fragment.duration + assert(deadline >= now) { "Deadline already passed" } - assert(deadline >= now) { "Deadline already passed" } + val cmd = + if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, fragment.usage, deadline) + else + SimResourceCommand.Idle(deadline) - val cmd = - if (ctx.resource.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration + return cmd } - - return cmd } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index 60661e23..bdc12bb5 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.SimResourceConsumer /** @@ -41,5 +41,5 @@ public interface SimWorkload { /** * Obtain the resource consumer for the specified processing unit. */ - public fun getConsumer(ctx: SimMachineContext, cpu: SimProcessingUnit): SimResourceConsumer<SimProcessingUnit> + public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 4ac8cf63..0149024f 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -31,9 +31,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -46,10 +46,10 @@ internal class SimHypervisorTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) model = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 6adc41d0..7e014245 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -28,9 +28,9 @@ import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter @@ -43,11 +43,11 @@ class SimMachineTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 2) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 1000.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 8428a0a7..fb0523af 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -31,9 +31,9 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.model.SimMemoryUnit -import org.opendc.simulator.compute.model.SimProcessingNode -import org.opendc.simulator.compute.model.SimProcessingUnit +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -48,10 +48,10 @@ internal class SimSpaceSharedHypervisorTest { @BeforeEach fun setUp() { - val cpuNode = SimProcessingNode("Intel", "Xeon", "amd64", 1) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { SimProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { SimMemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt new file mode 100644 index 00000000..8d2587b1 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.consumer.SimTraceConsumer + +/** + * Helper function to create simple consumer workload. + */ +fun createSimpleConsumer(): SimResourceConsumer { + return SimTraceConsumer( + sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0), + SimTraceConsumer.Fragment(1000, 400.0), + SimTraceConsumer.Fragment(1000, 100.0), + SimTraceConsumer.Fragment(1000, 3000.0), + SimTraceConsumer.Fragment(1000, 4500.0), + ), + ) +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt new file mode 100644 index 00000000..f2eea97c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler +import org.openjdk.jmh.annotations.* +import java.time.Clock +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class SimResourceBenchmarks { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var scheduler: TimerScheduler<Any> + + @Setup + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + scheduler = TimerScheduler(scope.coroutineContext, clock) + } + + @State(Scope.Thread) + class Workload { + lateinit var consumers: Array<SimResourceConsumer> + + @Setup + fun setUp() { + consumers = Array(3) { createSimpleConsumer() } + } + } + + @Benchmark + fun benchmarkSource(state: Workload) { + return scope.runBlockingTest { + val provider = SimResourceSource(4200.0, clock, scheduler) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkForwardOverhead(state: Workload) { + return scope.runBlockingTest { + val provider = SimResourceSource(4200.0, clock, scheduler) + val forwarder = SimResourceForwarder() + provider.startConsumer(forwarder) + return@runBlockingTest forwarder.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchMaxMin(clock) + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(3) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } + + @Benchmark + fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + val provider = switch.addOutput(3500.0) + return@runBlockingTest provider.consume(state.consumers[0]) + } + } + + @Benchmark + fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { + return scope.runBlockingTest { + val switch = SimResourceSwitchExclusive() + + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + + repeat(2) { i -> + launch { + val provider = switch.addOutput(3500.0) + provider.consume(state.consumers[i]) + } + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt deleted file mode 100644 index 09246fe4..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSourceBenchmark.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.test.runBlockingTest -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.utils.TimerScheduler -import org.openjdk.jmh.annotations.* -import java.time.Clock -import java.util.concurrent.TimeUnit - -@State(Scope.Benchmark) -@Fork(1) -@Warmup(iterations = 0) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceSourceBenchmark { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock - private lateinit var scheduler: TimerScheduler<Any> - private lateinit var consumer: SimResourceConsumer<SimGenericResource> - - @Setup - fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - scheduler = TimerScheduler(scope.coroutineContext, clock) - consumer = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(1000, 28.0), - SimTraceConsumer.Fragment(1000, 3500.0), - SimTraceConsumer.Fragment(1000, 0.0), - SimTraceConsumer.Fragment(1000, 183.0), - SimTraceConsumer.Fragment(1000, 400.0), - SimTraceConsumer.Fragment(1000, 100.0), - SimTraceConsumer.Fragment(1000, 3000.0), - SimTraceConsumer.Fragment(1000, 4500.0), - ), - ) - } - - @Benchmark - fun benchmarkSource() { - return scope.runBlockingTest { - val provider = SimResourceSource(SimGenericResource(4200.0), clock, scheduler) - return@runBlockingTest provider.consume(consumer) - } - } - - data class SimGenericResource(override val capacity: Double) : SimResource -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt deleted file mode 100644 index be31e86d..00000000 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceSwitchBenchmark.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.test.runBlockingTest -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.utils.TimerScheduler -import org.openjdk.jmh.annotations.* -import java.time.Clock -import java.util.concurrent.TimeUnit - -@State(Scope.Benchmark) -@Fork(1) -@Warmup(iterations = 0) -@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) -class SimResourceSwitchBenchmark { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock - private lateinit var scheduler: TimerScheduler<Any> - private lateinit var consumer: SimResourceConsumer<SimGenericResource> - - @Setup - fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - scheduler = TimerScheduler(scope.coroutineContext, clock) - consumer = - SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(1000, 28.0), - SimTraceConsumer.Fragment(1000, 3500.0), - SimTraceConsumer.Fragment(1000, 0.0), - SimTraceConsumer.Fragment(1000, 183.0) - ), - ) - } - - @Benchmark - fun benchmarkSwitch() { - return scope.runBlockingTest { - val switch = SimResourceSwitchMaxMin<SimGenericResource>(clock) - - switch.addInput(SimResourceSource(SimGenericResource(3000.0), clock, scheduler)) - switch.addInput(SimResourceSource(SimGenericResource(3000.0), clock, scheduler)) - - val provider = switch.addOutput(SimGenericResource(3500.0)) - return@runBlockingTest provider.consume(consumer) - } - } - - data class SimGenericResource(override val capacity: Double) : SimResource -} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt new file mode 100644 index 00000000..18ac0cd8 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * Abstract implementation of [SimResourceAggregator]. + */ +public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { + /** + * The available resource provider contexts. + */ + protected val inputContexts: Set<SimResourceContext> + get() = _inputContexts + private val _inputContexts = mutableSetOf<SimResourceContext>() + + /** + * The output context. + */ + protected val outputContext: SimResourceContext + get() = context + + /** + * The commands to submit to the underlying input resources. + */ + protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf() + + /** + * This method is invoked when the resource consumer consumes resources. + */ + protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) + + /** + * This method is invoked when the resource consumer enters an idle state. + */ + protected open fun doIdle(deadline: Long) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Idle(deadline) + } + } + + /** + * This method is invoked when the resource consumer finishes processing. + */ + protected open fun doFinish(cause: Throwable?) { + for (input in inputContexts) { + commands[input] = SimResourceCommand.Exit + } + } + + /** + * This method is invoked when an input context is started. + */ + protected open fun onContextStarted(ctx: SimResourceContext) { + _inputContexts.add(ctx) + } + + protected open fun onContextFinished(ctx: SimResourceContext) { + assert(_inputContexts.remove(ctx)) { "Lost context" } + } + + override fun addInput(input: SimResourceProvider) { + check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + + val consumer = Consumer() + _inputs.add(input) + input.startConsumer(consumer) + } + + override fun close() { + output.close() + } + + override val output: SimResourceProvider + get() = _output + private val _output = SimResourceForwarder() + + override val inputs: Set<SimResourceProvider> + get() = _inputs + private val _inputs = mutableSetOf<SimResourceProvider>() + + private val context = object : SimAbstractResourceContext(clock, _output) { + override val capacity: Double + get() = inputContexts.sumByDouble { it.capacity } + + override val remainingWork: Double + get() = inputContexts.sumByDouble { it.remainingWork } + + override fun interrupt() { + super.interrupt() + + interruptAll() + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + doConsume(work, limit, deadline) + } + + override fun onIdle(deadline: Long) { + doIdle(deadline) + } + + override fun onFinish(cause: Throwable?) { + doFinish(cause) + + super.onFinish(cause) + + interruptAll() + } + } + + /** + * A flag to indicate that an interrupt is active. + */ + private var isInterrupting: Boolean = false + + /** + * Schedule the work over the input resources. + */ + private fun doSchedule() { + context.flush(isIntermediate = true) + interruptAll() + } + + /** + * Interrupt all inputs. + */ + private fun interruptAll() { + // Prevent users from interrupting the resource while they are constructing their next command, as this will + // only lead to infinite recursion. + if (isInterrupting) { + return + } + + try { + isInterrupting = true + + val iterator = _inputs.iterator() + while (iterator.hasNext()) { + val input = iterator.next() + input.interrupt() + + if (input.state != SimResourceState.Active) { + iterator.remove() + } + } + } finally { + isInterrupting = false + } + } + + /** + * An internal [SimResourceConsumer] implementation for aggregator inputs. + */ + private inner class Consumer : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { + onContextStarted(ctx) + + // Make sure we initialize the output if we have not done so yet + if (context.state == SimResourceState.Pending) { + context.start() + } + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + doSchedule() + + return commands[ctx] ?: SimResourceCommand.Idle() + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + onContextFinished(ctx) + + super.onFinish(ctx, cause) + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index dba334a2..f65cbaf4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -30,17 +30,10 @@ import kotlin.math.min /** * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ -public abstract class SimAbstractResourceContext<R : SimResource>( - override val resource: R, +public abstract class SimAbstractResourceContext( override val clock: Clock, - private val consumer: SimResourceConsumer<R> -) : SimResourceContext<R> { - /** - * The capacity of the resource. - */ - override val capacity: Double - get() = resource.capacity - + private val consumer: SimResourceConsumer +) : SimResourceContext { /** * The amount of work still remaining at this instant. */ @@ -51,6 +44,12 @@ public abstract class SimAbstractResourceContext<R : SimResource>( } /** + * A flag to indicate the state of the context. + */ + public var state: SimResourceState = SimResourceState.Pending + private set + + /** * This method is invoked when the resource will idle until the specified [deadline]. */ public abstract fun onIdle(deadline: Long) @@ -178,7 +177,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( if (command.deadline <= now || !isIntermediate) { next(now) } else { - activeCommand + interpret(SimResourceCommand.Idle(command.deadline), now) } } is SimResourceCommand.Consume -> { @@ -214,7 +213,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( flush() } - override fun toString(): String = "SimAbstractResourceContext[resource=$resource]" + override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" /** * A flag to indicate that the resource is currently processing a command. @@ -222,11 +221,6 @@ public abstract class SimAbstractResourceContext<R : SimResource>( protected var isProcessing: Boolean = false /** - * A flag to indicate the state of the context. - */ - private var state: SimResourceState = SimResourceState.Pending - - /** * The current command that is being processed. */ private var activeCommand: CommandWrapper? = null diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt new file mode 100644 index 00000000..bb4e6a2c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. + */ +public interface SimResourceAggregator : AutoCloseable { + /** + * The output resource provider to which resource consumers can be attached. + */ + public val output: SimResourceProvider + + /** + * The input resources that will be switched between the output providers. + */ + public val inputs: Set<SimResourceProvider> + + /** + * Add the specified [input] to the switch. + */ + public fun addInput(input: SimResourceProvider) + + /** + * End the lifecycle of the aggregator. + */ + public override fun close() +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt new file mode 100644 index 00000000..08bc064e --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A [SimResourceAggregator] that distributes the load equally across the input resources. + */ +public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { + private val consumers = mutableListOf<SimResourceContext>() + + override fun doConsume(work: Double, limit: Double, deadline: Long) { + // Sort all consumers by their capacity + consumers.sortWith(compareBy { it.capacity }) + + // Divide the requests over the available capacity of the input resources fairly + for (input in consumers) { + val inputCapacity = input.capacity + val fraction = inputCapacity / outputContext.capacity + val grantedSpeed = limit * fraction + val grantedWork = fraction * work + + commands[input] = + if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + } + + override fun onContextStarted(ctx: SimResourceContext) { + super.onContextStarted(ctx) + + consumers.add(ctx) + } + + override fun onContextFinished(ctx: SimResourceContext) { + super.onContextFinished(ctx) + + consumers.remove(ctx) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 01b56488..04c7fcaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -28,13 +28,13 @@ package org.opendc.simulator.resources * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) * for multiple resource providers, unless explicitly said otherwise. */ -public interface SimResourceConsumer<in R : SimResource> { +public interface SimResourceConsumer { /** * This method is invoked when the consumer is started for some resource. * * @param ctx The execution context in which the consumer runs. */ - public fun onStart(ctx: SimResourceContext<R>) {} + public fun onStart(ctx: SimResourceContext) {} /** * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because @@ -43,7 +43,7 @@ public interface SimResourceConsumer<in R : SimResource> { * @param ctx The execution context in which the consumer runs. * @return The next command that the resource should execute. */ - public fun onNext(ctx: SimResourceContext<R>): SimResourceCommand + public fun onNext(ctx: SimResourceContext): SimResourceCommand /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], @@ -54,5 +54,5 @@ public interface SimResourceConsumer<in R : SimResource> { * @param ctx The execution context in which the consumer ran. * @param cause The cause of the finish in case the resource finished exceptionally. */ - public fun onFinish(ctx: SimResourceContext<R>, cause: Throwable? = null) {} + public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index f13764fb..11dbb09f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -28,12 +28,7 @@ import java.time.Clock * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a * resource and a resource consumer. */ -public interface SimResourceContext<out R : SimResource> { - /** - * The resource that is managed by this context. - */ - public val resource: R - +public interface SimResourceContext { /** * The virtual clock tracking simulation time. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index 31b0a175..b2759b7f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -23,11 +23,21 @@ package org.opendc.simulator.resources /** - * A generic representation of resource that may be consumed. + * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ -public interface SimResource { +public interface SimResourceDistributor : AutoCloseable { /** - * The capacity of the resource. + * The output resource providers to which resource consumers can be attached. */ - public val capacity: Double + public val outputs: Set<SimResourceProvider> + + /** + * The input resource that will be distributed over the consumers. + */ + public val input: SimResourceProvider + + /** + * Add an output to the switch with the specified [capacity]. + */ + public fun addOutput(capacity: Double): SimResourceProvider } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt new file mode 100644 index 00000000..b0f27b9d --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -0,0 +1,423 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. + */ +public class SimResourceDistributorMaxMin( + override val input: SimResourceProvider, + private val clock: Clock, + private val listener: Listener? = null +) : SimResourceDistributor { + override val outputs: Set<SimResourceProvider> + get() = _outputs + private val _outputs = mutableSetOf<OutputProvider>() + + /** + * The active output contexts. + */ + private val outputContexts: MutableList<OutputContext> = mutableListOf() + + /** + * The total speed requested by the output resources. + */ + private var totalRequestedSpeed = 0.0 + + /** + * The total amount of work requested by the output resources. + */ + private var totalRequestedWork = 0.0 + + /** + * The total allocated speed for the output resources. + */ + private var totalAllocatedSpeed = 0.0 + + /** + * The total allocated work requested for the output resources. + */ + private var totalAllocatedWork = 0.0 + + /** + * The amount of work that could not be performed due to over-committing resources. + */ + private var totalOvercommittedWork = 0.0 + + /** + * The amount of work that was lost due to interference. + */ + private var totalInterferedWork = 0.0 + + /** + * A flag to indicate that the switch is closed. + */ + private var isClosed: Boolean = false + + /** + * An internal [SimResourceConsumer] implementation for switch inputs. + */ + private val consumer = object : SimResourceConsumer { + /** + * The resource context of the consumer. + */ + private lateinit var ctx: SimResourceContext + + val remainingWork: Double + get() = ctx.remainingWork + + override fun onStart(ctx: SimResourceContext) { + this.ctx = ctx + } + + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) + } + + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() + + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call tou output.close() + iterator.remove() + + output.close() + } + } + } + + /** + * The total amount of remaining work. + */ + private val totalRemainingWork: Double + get() = consumer.remainingWork + + override fun addOutput(capacity: Double): SimResourceProvider { + check(!isClosed) { "Distributor has been closed" } + + val provider = OutputProvider(capacity) + _outputs.add(provider) + return provider + } + + override fun close() { + if (!isClosed) { + isClosed = true + input.cancel() + } + } + + init { + input.startConsumer(consumer) + } + + /** + * Indicate that the workloads should be re-scheduled. + */ + private fun schedule() { + input.interrupt() + } + + /** + * Schedule the work over the physical CPUs. + */ + private fun doSchedule(capacity: Double): SimResourceCommand { + // If there is no work yet, mark all inputs as idle. + if (outputContexts.isEmpty()) { + return SimResourceCommand.Idle() + } + + val maxUsage = capacity + var duration: Double = Double.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + var availableSpeed = maxUsage + var totalRequestedSpeed = 0.0 + var totalRequestedWork = 0.0 + + // Flush the work of the outputs + var outputIterator = outputContexts.listIterator() + while (outputIterator.hasNext()) { + val output = outputIterator.next() + + output.flush(isIntermediate = true) + + if (output.activeCommand == SimResourceCommand.Exit) { + // Apparently the output consumer has exited, so remove it from the scheduling queue. + outputIterator.remove() + } + } + + // Sort the outputs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + outputContexts.sort() + + // Divide the available input capacity fairly across the outputs using max-min fair sharing + outputIterator = outputContexts.listIterator() + var remaining = outputContexts.size + while (outputIterator.hasNext()) { + val output = outputIterator.next() + val availableShare = availableSpeed / remaining-- + + when (val command = output.activeCommand) { + is SimResourceCommand.Idle -> { + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + output.actualSpeed = 0.0 + } + is SimResourceCommand.Consume -> { + val grantedSpeed = min(output.allowedSpeed, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, command.deadline) + + // Ignore idle computation + if (grantedSpeed <= 0.0 || command.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } + + totalRequestedSpeed += command.limit + totalRequestedWork += command.work + + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed + + // The duration that we want to run is that of the shortest request from an output + duration = min(duration, command.work / grantedSpeed) + } + SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } + } + } + + assert(deadline >= clock.millis()) { "Deadline already passed" } + + this.totalRequestedSpeed = totalRequestedSpeed + this.totalRequestedWork = totalRequestedWork + this.totalAllocatedSpeed = maxUsage - availableSpeed + this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + + return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) + SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + } + + /** + * Obtain the next command to perform. + */ + private fun doNext(capacity: Double): SimResourceCommand { + val totalRequestedWork = totalRequestedWork.toLong() + val totalRemainingWork = totalRemainingWork.toLong() + val totalAllocatedWork = totalAllocatedWork.toLong() + val totalRequestedSpeed = totalRequestedSpeed + val totalAllocatedSpeed = totalAllocatedSpeed + + // Force all inputs to re-schedule their work. + val command = doSchedule(capacity) + + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork, + totalAllocatedWork - totalRemainingWork, + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalRequestedSpeed, + totalAllocatedSpeed, + ) + + totalInterferedWork = 0.0 + totalOvercommittedWork = 0.0 + + return command + } + + /** + * Event listener for hypervisor events. + */ + public interface Listener { + /** + * This method is invoked when a slice is finished. + */ + public fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) + } + + /** + * An internal [SimResourceProvider] implementation for switch outputs. + */ + private inner class OutputProvider(val capacity: Double) : SimResourceProvider { + /** + * The [OutputContext] that is currently running. + */ + private var ctx: OutputContext? = null + + override var state: SimResourceState = SimResourceState.Pending + internal set + + override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + + val ctx = OutputContext(this, consumer) + this.ctx = ctx + this.state = SimResourceState.Active + outputContexts += ctx + + ctx.start() + schedule() + } + + override fun close() { + cancel() + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Stopped + _outputs.remove(this) + } + } + + override fun interrupt() { + ctx?.interrupt() + } + + override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.stop() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + } + + /** + * A [SimAbstractResourceContext] for the output resources. + */ + private inner class OutputContext( + private val provider: OutputProvider, + consumer: SimResourceConsumer + ) : SimAbstractResourceContext(clock, consumer), Comparable<OutputContext> { + override val capacity: Double + get() = provider.capacity + + /** + * The current command that is processed by the vCPU. + */ + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 + + private fun reportOvercommit() { + val remainingWork = remainingWork + totalOvercommittedWork += remainingWork + } + + override fun onIdle(deadline: Long) { + reportOvercommit() + + allowedSpeed = 0.0 + activeCommand = SimResourceCommand.Idle(deadline) + } + + override fun onConsume(work: Double, limit: Double, deadline: Long) { + reportOvercommit() + + allowedSpeed = getSpeed(limit) + activeCommand = SimResourceCommand.Consume(work, limit, deadline) + } + + override fun onFinish(cause: Throwable?) { + reportOvercommit() + + activeCommand = SimResourceCommand.Exit + provider.cancel() + + super.onFinish(cause) + } + + override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { + // Apply performance interference model + val performanceScore = 1.0 + + // Compute the remaining amount of work + return if (work > 0.0) { + // Compute the fraction of compute time allocated to the VM + val fraction = actualSpeed / totalAllocatedSpeed + + // Compute the work that was actually granted to the VM. + val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction + val processed = processingAvailable * performanceScore + + val interferedWork = processingAvailable - processed + + totalInterferedWork += interferedWork + + max(0.0, work - processed) + } else { + 0.0 + } + } + + override fun interrupt() { + // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead + // to infinite recursion. + if (isProcessing) { + return + } + + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } + + override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 732e709a..227f4d62 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -25,17 +25,16 @@ package org.opendc.simulator.resources /** * A helper class to construct a [SimResourceProvider] which forwards the requests to a [SimResourceConsumer]. */ -public class SimResourceForwarder<R : SimResource>(override val resource: R) : - SimResourceProvider<R>, SimResourceConsumer<R> { +public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { /** * The [SimResourceContext] in which the forwarder runs. */ - private var ctx: SimResourceContext<R>? = null + private var ctx: SimResourceContext? = null /** * The delegate [SimResourceConsumer]. */ - private var delegate: SimResourceConsumer<R>? = null + private var delegate: SimResourceConsumer? = null /** * A flag to indicate that the delegate was started. @@ -48,11 +47,13 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : override var state: SimResourceState = SimResourceState.Pending private set - override fun startConsumer(consumer: SimResourceConsumer<R>) { + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } state = SimResourceState.Active delegate = consumer + + // Interrupt the provider to replace the consumer interrupt() } @@ -83,11 +84,11 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : } } - override fun onStart(ctx: SimResourceContext<R>) { + override fun onStart(ctx: SimResourceContext) { this.ctx = ctx } - override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val delegate = delegate if (!hasDelegateStarted) { @@ -117,7 +118,7 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : } } - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null val delegate = delegate diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 1593281b..52b13c5c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -27,12 +27,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine /** * A [SimResourceProvider] provides some resource of type [R]. */ -public interface SimResourceProvider<out R : SimResource> : AutoCloseable { - /** - * The resource that is managed by this provider. - */ - public val resource: R - +public interface SimResourceProvider : AutoCloseable { /** * The state of the resource. */ @@ -43,7 +38,7 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable { * * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. */ - public fun startConsumer(consumer: SimResourceConsumer<R>) + public fun startConsumer(consumer: SimResourceConsumer) /** * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. @@ -67,15 +62,15 @@ public interface SimResourceProvider<out R : SimResource> : AutoCloseable { * Consume the resource provided by this provider using the specified [consumer] and suspend execution until * the consumer has finished. */ -public suspend fun <R : SimResource> SimResourceProvider<R>.consume(consumer: SimResourceConsumer<R>) { +public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { return suspendCancellableCoroutine { cont -> - startConsumer(object : SimResourceConsumer<R> by consumer { - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + startConsumer(object : SimResourceConsumer by consumer { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { assert(!cont.isCompleted) { "Coroutine already completed" } - cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) - consumer.onFinish(ctx, cause) + + cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) } override fun toString(): String = "SimSuspendingResourceConsumer" diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 99545c4c..3b4e7e7a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -31,15 +31,15 @@ import kotlin.math.min /** * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. * - * @param resource The resource to provide. + * @param initialCapacity The initial capacity of the resource. * @param clock The virtual clock to track simulation time. * @param scheduler The scheduler to schedule the interrupts. */ -public class SimResourceSource<R : SimResource>( - override val resource: R, +public class SimResourceSource( + private val initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler<Any> -) : SimResourceProvider<R> { +) : SimResourceProvider { /** * The resource processing speed over time. */ @@ -55,7 +55,7 @@ public class SimResourceSource<R : SimResource>( override var state: SimResourceState = SimResourceState.Pending private set - override fun startConsumer(consumer: SimResourceConsumer<R>) { + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } val ctx = Context(consumer) @@ -89,7 +89,9 @@ public class SimResourceSource<R : SimResource>( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer<R>) : SimAbstractResourceContext<R>(resource, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = initialCapacity + /** * The processing speed of the resource. */ @@ -123,6 +125,6 @@ public class SimResourceSource<R : SimResource>( super.onFinish(cause) } - override fun toString(): String = "SimResourceSource.Context[resource=$resource]" + override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index cd1af3fc..53fec16a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -23,26 +23,26 @@ package org.opendc.simulator.resources /** - * A [SimResourceSwitch] enables switching of capacity of multiple resources of type [R] between multiple consumers. + * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. */ -public interface SimResourceSwitch<R : SimResource> : AutoCloseable { +public interface SimResourceSwitch : AutoCloseable { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set<SimResourceProvider<R>> + public val outputs: Set<SimResourceProvider> /** * The input resources that will be switched between the output providers. */ - public val inputs: Set<SimResourceProvider<R>> + public val inputs: Set<SimResourceProvider> /** - * Add an output to the switch represented by [resource]. + * Add an output to the switch with the specified [capacity]. */ - public fun addOutput(resource: R): SimResourceProvider<R> + public fun addOutput(capacity: Double): SimResourceProvider /** * Add the specified [input] to the switch. */ - public fun addInput(input: SimResourceProvider<R>) + public fun addInput(input: SimResourceProvider) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 5eea78f6..6e431ea1 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -28,45 +28,45 @@ import java.util.ArrayDeque * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that * a single output is directly connected to an input and that the switch can only support as much outputs as inputs. */ -public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> { +public class SimResourceSwitchExclusive : SimResourceSwitch { /** * A flag to indicate that the switch is closed. */ private var isClosed: Boolean = false private val _outputs = mutableSetOf<Provider>() - override val outputs: Set<SimResourceProvider<R>> + override val outputs: Set<SimResourceProvider> get() = _outputs - private val availableResources = ArrayDeque<SimResourceForwarder<R>>() + private val availableResources = ArrayDeque<SimResourceForwarder>() - private val _inputs = mutableSetOf<SimResourceProvider<R>>() - override val inputs: Set<SimResourceProvider<R>> + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> get() = _inputs - override fun addOutput(resource: R): SimResourceProvider<R> { + override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(resource, forwarder) + val output = Provider(capacity, forwarder) _outputs += output return output } - override fun addInput(input: SimResourceProvider<R>) { + override fun addInput(input: SimResourceProvider) { check(!isClosed) { "Switch has been closed" } if (input in inputs) { return } - val forwarder = SimResourceForwarder(input.resource) + val forwarder = SimResourceForwarder() _inputs += input availableResources += forwarder - input.startConsumer(object : SimResourceConsumer<R> by forwarder { - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { + input.startConsumer(object : SimResourceConsumer by forwarder { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { // De-register the input after it has finished _inputs -= input forwarder.onFinish(ctx, cause) @@ -78,13 +78,13 @@ public class SimResourceSwitchExclusive<R : SimResource> : SimResourceSwitch<R> isClosed = true // Cancel all upstream subscriptions - _inputs.forEach(SimResourceProvider<R>::cancel) + _inputs.forEach(SimResourceProvider::cancel) } private inner class Provider( - override val resource: R, - private val forwarder: SimResourceForwarder<R> - ) : SimResourceProvider<R> by forwarder { + private val capacity: Double, + private val forwarder: SimResourceForwarder + ) : SimResourceProvider by forwarder { override fun close() { _outputs -= this availableResources += forwarder diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index ee8edfcd..c796c251 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -23,97 +23,61 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* -import org.opendc.simulator.resources.consumer.SimConsumerBarrier import java.time.Clock -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ -public class SimResourceSwitchMaxMin<R : SimResource>( - private val clock: Clock, - private val listener: Listener<R>? = null -) : SimResourceSwitch<R> { - private val inputConsumers = mutableSetOf<InputConsumer>() - private val _outputs = mutableSetOf<OutputProvider>() - override val outputs: Set<SimResourceProvider<R>> +public class SimResourceSwitchMaxMin( + clock: Clock, + private val listener: Listener? = null +) : SimResourceSwitch { + private val _outputs = mutableSetOf<SimResourceProvider>() + override val outputs: Set<SimResourceProvider> get() = _outputs - private val _inputs = mutableSetOf<SimResourceProvider<R>>() - override val inputs: Set<SimResourceProvider<R>> + private val _inputs = mutableSetOf<SimResourceProvider>() + override val inputs: Set<SimResourceProvider> get() = _inputs /** - * The commands to submit to the underlying host. + * A flag to indicate that the switch was closed. */ - private val commands = mutableMapOf<R, SimResourceCommand>() + private var isClosed = false /** - * The active output contexts. + * The aggregator to aggregate the resources. */ - private val outputContexts: MutableList<OutputContext> = mutableListOf() + private val aggregator = SimResourceAggregatorMaxMin(clock) /** - * The remaining work of all inputs. + * The distributor to distribute the aggregated resources. */ - private val totalRemainingWork: Double - get() = inputConsumers.sumByDouble { it.remainingWork } - - /** - * The total speed requested by the vCPUs. - */ - private var totalRequestedSpeed = 0.0 - - /** - * The total amount of work requested by the vCPUs. - */ - private var totalRequestedWork = 0.0 - - /** - * The total allocated speed for the vCPUs. - */ - private var totalAllocatedSpeed = 0.0 - - /** - * The total allocated work requested for the vCPUs. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the scheduler has submitted work that has not yet been completed. - */ - private var isDirty: Boolean = false - - /** - * The scheduler barrier. - */ - private var barrier: SimConsumerBarrier = SimConsumerBarrier(0) - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false + private val distributor = SimResourceDistributorMaxMin( + aggregator.output, clock, + object : SimResourceDistributorMaxMin.Listener { + override fun onSliceFinish( + switch: SimResourceDistributor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) + } + } + ) /** * Add an output to the switch represented by [resource]. */ - override fun addOutput(resource: R): SimResourceProvider<R> { + override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Switch has been closed" } - val provider = OutputProvider(resource) + val provider = distributor.addOutput(capacity) _outputs.add(provider) return provider } @@ -121,169 +85,29 @@ public class SimResourceSwitchMaxMin<R : SimResource>( /** * Add the specified [input] to the switch. */ - override fun addInput(input: SimResourceProvider<R>) { + override fun addInput(input: SimResourceProvider) { check(!isClosed) { "Switch has been closed" } - val consumer = InputConsumer(input) - _inputs.add(input) - inputConsumers += consumer + aggregator.addInput(input) } override fun close() { - isClosed = true - } - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - isDirty = true - interruptAll() - } - - /** - * Schedule the work over the physical CPUs. - */ - private fun doSchedule() { - // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { - commands.replaceAll { _, _ -> SimResourceCommand.Idle() } - interruptAll() - } - - val maxUsage = inputs.sumByDouble { it.resource.capacity } - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage - var totalRequestedSpeed = 0.0 - var totalRequestedWork = 0.0 - - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - val outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() - val availableShare = availableSpeed / remaining-- - - when (val command = output.activeCommand) { - is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - output.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - output.actualSpeed = 0.0 - continue - } - - totalRequestedSpeed += command.limit - totalRequestedWork += command.work - - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request from an output - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> { - // Apparently the output consumer has exited, so remove it from the scheduling queue. - outputIterator.remove() - } - } - } - - // Round the duration to milliseconds - duration = ceil(duration * 1000) / 1000 - - assert(deadline >= clock.millis()) { "Deadline already passed" } - - val totalAllocatedSpeed = maxUsage - availableSpeed - var totalAllocatedWork = 0.0 - availableSpeed = totalAllocatedSpeed - - // Divide the requests over the available capacity of the input resources fairly - for (input in inputs.sortedByDescending { it.resource.capacity }) { - val maxResourceUsage = input.resource.capacity - val fraction = maxResourceUsage / maxUsage - val grantedSpeed = min(maxResourceUsage, totalAllocatedSpeed * fraction) - val grantedWork = duration * grantedSpeed - - commands[input.resource] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) - - totalAllocatedWork += grantedWork - availableSpeed -= grantedSpeed - } - - this.totalRequestedSpeed = totalRequestedSpeed - this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = totalAllocatedSpeed - this.totalAllocatedWork = totalAllocatedWork - - interruptAll() - } - - /** - * Flush the progress of the vCPUs. - */ - private fun flushGuests() { - val totalRemainingWork = totalRemainingWork - - // Flush all the outputs work - for (output in outputContexts) { - output.flush(isIntermediate = true) - } - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork.toLong(), - (totalAllocatedWork - totalRemainingWork).toLong(), - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalRequestedSpeed, - totalAllocatedSpeed - ) - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - // Force all inputs to re-schedule their work. - doSchedule() - } - - /** - * Interrupt all inputs. - */ - private fun interruptAll() { - for (input in inputConsumers) { - input.interrupt() + if (!isClosed) { + isClosed = true + distributor.close() + aggregator.close() } } /** * Event listener for hypervisor events. */ - public interface Listener<R : SimResource> { + public interface Listener { /** * This method is invoked when a slice is finished. */ public fun onSliceFinish( - switch: SimResourceSwitchMaxMin<R>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -292,203 +116,4 @@ public class SimResourceSwitchMaxMin<R : SimResource>( cpuDemand: Double ) } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class OutputProvider(override val resource: R) : SimResourceProvider<R> { - /** - * The [OutputContext] that is currently running. - */ - private var ctx: OutputContext? = null - - override var state: SimResourceState = SimResourceState.Pending - internal set - - override fun startConsumer(consumer: SimResourceConsumer<R>) { - check(state == SimResourceState.Pending) { "Resource cannot be consumed" } - - val ctx = OutputContext(this, resource, consumer) - this.ctx = ctx - this.state = SimResourceState.Active - outputContexts += ctx - - ctx.start() - schedule() - } - - override fun close() { - cancel() - - state = SimResourceState.Stopped - _outputs.remove(this) - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - private val provider: OutputProvider, - resource: R, - consumer: SimResourceConsumer<R> - ) : SimAbstractResourceContext<R>(resource, clock, consumer), Comparable<OutputContext> { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - private fun reportOvercommit() { - totalOvercommittedWork += remainingWork - } - - override fun onIdle(deadline: Long) { - reportOvercommit() - - allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - reportOvercommit() - - allowedSpeed = getSpeed(limit) - activeCommand = SimResourceCommand.Consume(work, limit, deadline) - } - - override fun onFinish(cause: Throwable?) { - reportOvercommit() - - activeCommand = SimResourceCommand.Exit - provider.cancel() - - super.onFinish(cause) - } - - override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 - - // Compute the remaining amount of work - return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM - val fraction = actualSpeed / totalAllocatedSpeed - - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) - } else { - 0.0 - } - } - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } - - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) - } - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private inner class InputConsumer(val input: SimResourceProvider<R>) : SimResourceConsumer<R> { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext<R> - - /** - * The remaining work of this consumer. - */ - val remainingWork: Double - get() = ctx.remainingWork - - init { - barrier = SimConsumerBarrier(barrier.parties + 1) - input.startConsumer(this@InputConsumer) - } - - /** - * Interrupt the consumer - */ - fun interrupt() { - ctx.interrupt() - } - - override fun onStart(ctx: SimResourceContext<R>) { - this.ctx = ctx - } - - override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { - val isLast = barrier.enter() - - // Flush the progress of the guest after the barrier has been reached. - if (isLast && isDirty) { - isDirty = false - flushGuests() - } - - return if (isDirty) { - // Wait for the scheduler determine the work after the barrier has been reached by all CPUs. - SimResourceCommand.Idle() - } else { - // Indicate that the scheduler needs to run next call. - if (isLast) { - isDirty = true - } - - commands[ctx.resource] ?: SimResourceCommand.Idle() - } - } - - override fun onFinish(ctx: SimResourceContext<R>, cause: Throwable?) { - barrier = SimConsumerBarrier(barrier.parties - 1) - inputConsumers -= this@InputConsumer - _inputs -= input - - super.onFinish(ctx, cause) - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt index 7aa5a5aa..52a42241 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt @@ -42,4 +42,11 @@ public class SimConsumerBarrier(public val parties: Int) { } return false } + + /** + * Reset the barrier. + */ + public fun reset() { + counter = 0 + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 0189fe4c..a52d1d5d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -31,15 +30,15 @@ import org.opendc.simulator.resources.SimResourceContext * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource * consumption for some period of time. */ -public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer<SimResource> { +public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { private var iterator: Iterator<Fragment>? = null - override fun onStart(ctx: SimResourceContext<SimResource>) { + override fun onStart(ctx: SimResourceContext) { check(iterator == null) { "Consumer already running" } iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext<SimResource>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() @@ -58,7 +57,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour } } - override fun onFinish(ctx: SimResourceContext<SimResource>, cause: Throwable?) { + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { iterator = null } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 62425583..8f24a020 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources.consumer -import org.opendc.simulator.resources.SimResource import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -30,10 +29,10 @@ import org.opendc.simulator.resources.SimResourceContext /** * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. */ -public class SimWorkConsumer<R : SimResource>( +public class SimWorkConsumer( private val work: Double, private val utilization: Double -) : SimResourceConsumer<R> { +) : SimResourceConsumer { init { require(work >= 0.0) { "Work must be positive" } @@ -43,12 +42,12 @@ public class SimWorkConsumer<R : SimResource>( private var limit = 0.0 private var remainingWork: Double = 0.0 - override fun onStart(ctx: SimResourceContext<R>) { - limit = ctx.resource.capacity * utilization + override fun onStart(ctx: SimResourceContext) { + limit = ctx.capacity * utilization remainingWork = work } - override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { val work = this.remainingWork + ctx.remainingWork this.remainingWork -= work return if (work > 0.0) { diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt new file mode 100644 index 00000000..3dffc7bf --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.utils.TimerScheduler + +/** + * Test suite for the [SimResourceAggregatorMaxMin] class. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimResourceAggregatorMaxMinTest { + @Test + fun testSingleCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(1.0, 0.5) + val usage = mutableListOf<Double>() + val job = launch { sources[0].speed.toList(usage) } + + try { + aggregator.output.consume(consumer) + yield() + + assertAll( + { assertEquals(1000, currentTime) }, + { assertEquals(listOf(0.0, 0.5, 0.0), usage) } + ) + } finally { + aggregator.output.close() + job.cancel() + } + } + + @Test + fun testDoubleCapacity() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = SimWorkConsumer(2.0, 1.0) + val usage = mutableListOf<Double>() + val job = launch { sources[0].speed.toList(usage) } + + try { + aggregator.output.consume(consumer) + yield() + assertAll( + { assertEquals(1000, currentTime) }, + { assertEquals(listOf(0.0, 1.0, 0.0), usage) } + ) + } finally { + aggregator.output.close() + job.cancel() + } + } + + @Test + fun testOvercommit() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.output.consume(consumer) + yield() + assertEquals(1000, currentTime) + + verify(exactly = 2) { consumer.onNext(any()) } + } finally { + aggregator.output.close() + } + } + + @Test + fun testException() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val scheduler = TimerScheduler<Any>(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(clock) + val sources = listOf( + SimResourceSource(1.0, clock, scheduler), + SimResourceSource(1.0, clock, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(1.0, 1.0)) + .andThenThrows(IllegalStateException()) + + try { + assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } + yield() + assertEquals(SimResourceState.Pending, sources[0].state) + } finally { + aggregator.output.close() + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 5d4eb46d..c6988ed9 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -33,25 +33,18 @@ import org.opendc.simulator.utils.DelayControllerClockAdapter */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testFlushWithoutCommand() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { - override fun onIdle(deadline: Long) {} + val context = object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish(cause: Throwable?) {} } @@ -61,12 +54,13 @@ class SimResourceContextTest { @Test fun testIntermediateFlush() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -82,12 +76,13 @@ class SimResourceContextTest { @Test fun testIntermediateFlushIdle() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -100,7 +95,7 @@ class SimResourceContextTest { context.flush(isIntermediate = true) assertAll( - { verify(exactly = 1) { context.onIdle(any()) } }, + { verify(exactly = 2) { context.onIdle(any()) } }, { verify(exactly = 1) { context.onFinish(null) } } ) } @@ -108,12 +103,13 @@ class SimResourceContextTest { @Test fun testDoubleStart() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val resource = SimCpu(4200.0) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext<SimCpu>(resource, clock, consumer) { + val context = object : SimAbstractResourceContext(clock, consumer) { + override val capacity: Double = 4200.0 + override fun onIdle(deadline: Long) {} override fun onFinish(cause: Throwable?) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt index 47794bdd..f68450ff 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -40,28 +40,20 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceForwarderTest { - - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testExitImmediately() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) launch { source.consume(forwarder) source.close() } - forwarder.consume(object : SimResourceConsumer<SimCpu> { - override fun onNext( - ctx: SimResourceContext<SimCpu> - ): SimResourceCommand { + forwarder.consume(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } }) @@ -72,22 +64,20 @@ internal class SimResourceForwarderTest { @Test fun testExit() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) launch { source.consume(forwarder) source.close() } - forwarder.consume(object : SimResourceConsumer<SimCpu> { + forwarder.consume(object : SimResourceConsumer { var isFirst = true - override fun onNext( - ctx: SimResourceContext<SimCpu> - ): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return if (isFirst) { isFirst = false SimResourceCommand.Consume(10.0, 1.0) @@ -102,11 +92,9 @@ internal class SimResourceForwarderTest { @Test fun testState() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onNext( - ctx: SimResourceContext<SimCpu> - ): SimResourceCommand = SimResourceCommand.Exit + val forwarder = SimResourceForwarder() + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit } assertEquals(SimResourceState.Pending, forwarder.state) @@ -125,9 +113,9 @@ internal class SimResourceForwarderTest { @Test fun testCancelPendingDelegate() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Exit forwarder.startConsumer(consumer) @@ -138,12 +126,12 @@ internal class SimResourceForwarderTest { @Test fun testCancelStartedDelegate() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) source.startConsumer(forwarder) @@ -158,12 +146,12 @@ internal class SimResourceForwarderTest { @Test fun testCancelPropagation() = runBlockingTest { - val forwarder = SimResourceForwarder(SimCpu(1000.0)) + val forwarder = SimResourceForwarder() val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(SimCpu(2000.0), clock, scheduler) + val source = SimResourceSource(2000.0, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) source.startConsumer(forwarder) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index c0ed8c9e..1279c679 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -37,20 +37,16 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceSourceTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSpeed() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, provider.resource.speed)) + .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) .andThen(SimResourceCommand.Exit) try { @@ -60,7 +56,7 @@ class SimResourceSourceTest { provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() provider.close() @@ -71,11 +67,12 @@ class SimResourceSourceTest { fun testSpeedLimit() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1000 * provider.resource.speed, 2 * provider.resource.speed)) + .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) .andThen(SimResourceCommand.Exit) try { @@ -85,7 +82,7 @@ class SimResourceSourceTest { provider.consume(consumer) job.cancel() - assertEquals(listOf(0.0, provider.resource.speed, 0.0), res) { "Speed is reported correctly" } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() provider.close() @@ -100,14 +97,15 @@ class SimResourceSourceTest { fun testIntermediateInterrupt() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = object : SimResourceConsumer<SimCpu> { - override fun onStart(ctx: SimResourceContext<SimCpu>) { + val consumer = object : SimResourceConsumer { + override fun onStart(ctx: SimResourceContext) { ctx.interrupt() } - override fun onNext(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } } @@ -124,16 +122,17 @@ class SimResourceSourceTest { fun testInterrupt() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) - lateinit var resCtx: SimResourceContext<SimCpu> + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) + lateinit var resCtx: SimResourceContext - val consumer = object : SimResourceConsumer<SimCpu> { + val consumer = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext<SimCpu>) { + override fun onStart(ctx: SimResourceContext) { resCtx = ctx } - override fun onNext(ctx: SimResourceContext<SimCpu>): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { assertEquals(0.0, ctx.remainingWork) return if (isFirst) { isFirst = false @@ -162,9 +161,10 @@ class SimResourceSourceTest { fun testFailure() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onStart(any()) } .throws(IllegalStateException()) @@ -182,9 +182,10 @@ class SimResourceSourceTest { fun testExceptionPropagationOnNext() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -203,9 +204,10 @@ class SimResourceSourceTest { fun testConcurrentConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -227,9 +229,10 @@ class SimResourceSourceTest { fun testClosedConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -249,9 +252,10 @@ class SimResourceSourceTest { fun testCloseDuringConsumption() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) @@ -272,9 +276,10 @@ class SimResourceSourceTest { fun testIdle() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle(clock.millis() + 500)) .andThen(SimResourceCommand.Exit) @@ -295,9 +300,10 @@ class SimResourceSourceTest { runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle()) .andThenThrows(IllegalStateException()) @@ -316,9 +322,10 @@ class SimResourceSourceTest { fun testIncorrectDeadline() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(4200.0), clock, scheduler) + val capacity = 4200.0 + val provider = SimResourceSource(capacity, clock, scheduler) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Idle(2)) .andThen(SimResourceCommand.Exit) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index dc90a43e..edd60502 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -42,11 +42,6 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchExclusiveTest { - class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - /** * Test a trace workload. */ @@ -68,12 +63,12 @@ internal class SimResourceSwitchExclusiveTest { ), ) - val switch = SimResourceSwitchExclusive<SimCpu>() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) val job = launch { source.speed.toList(speed) } try { @@ -99,15 +94,15 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive<SimCpu>() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { provider.consume(workload) @@ -127,16 +122,14 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer<SimCpu> { + val workload = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext<SimCpu>) { + override fun onStart(ctx: SimResourceContext) { isFirst = true } - override fun onNext( - ctx: SimResourceContext<SimCpu> - ): SimResourceCommand { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { return if (isFirst) { isFirst = false SimResourceCommand.Consume(duration / 1000.0, 1.0) @@ -146,12 +139,12 @@ internal class SimResourceSwitchExclusiveTest { } } - val switch = SimResourceSwitchExclusive<SimCpu>() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { provider.consume(workload) @@ -172,15 +165,15 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive<SimCpu>() - val source = SimResourceSource(SimCpu(3200.0), clock, scheduler) + val switch = SimResourceSwitchExclusive() + val source = SimResourceSource(3200.0, clock, scheduler) switch.addInput(source) - switch.addOutput(SimCpu(3200.0)) - assertThrows<IllegalStateException> { switch.addOutput(SimCpu(3200.0)) } + switch.addOutput(3200.0) + assertThrows<IllegalStateException> { switch.addOutput(3200.0) } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 8b989334..5f4fd187 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -40,23 +40,18 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchMaxMinTest { - class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSmoke() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin<SimCpu>(clock) + val switch = SimResourceSwitchMaxMin(clock) - val sources = List(2) { SimResourceSource(SimCpu(2000.0), clock, scheduler) } + val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) } sources.forEach { switch.addInput(it) } - val provider = switch.addOutput(SimCpu(1000.0)) + val provider = switch.addOutput(1000.0) - val consumer = mockk<SimResourceConsumer<SimCpu>>(relaxUnitFun = true) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit try { @@ -76,13 +71,13 @@ internal class SimResourceSwitchMaxMinTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> { + val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L override fun onSliceFinish( - switch: SimResourceSwitchMaxMin<SimCpu>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -108,10 +103,10 @@ internal class SimResourceSwitchMaxMinTest { ) val switch = SimResourceSwitchMaxMin(clock, listener) - val provider = switch.addOutput(SimCpu(3200.0)) + val provider = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) provider.consume(workload) yield() } finally { @@ -135,13 +130,13 @@ internal class SimResourceSwitchMaxMinTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener<SimCpu> { + val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L override fun onSliceFinish( - switch: SimResourceSwitchMaxMin<SimCpu>, + switch: SimResourceSwitchMaxMin, requestedWork: Long, grantedWork: Long, overcommittedWork: Long, @@ -176,11 +171,11 @@ internal class SimResourceSwitchMaxMinTest { ) val switch = SimResourceSwitchMaxMin(clock, listener) - val providerA = switch.addOutput(SimCpu(3200.0)) - val providerB = switch.addOutput(SimCpu(3200.0)) + val providerA = switch.addOutput(3200.0) + val providerB = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(SimCpu(3200.0), clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, clock, scheduler)) coroutineScope { launch { providerA.consume(workloadA) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index b05195f7..4d6b19ee 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -35,18 +35,13 @@ import org.opendc.utils.TimerScheduler */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimWorkConsumerTest { - data class SimCpu(val speed: Double) : SimResource { - override val capacity: Double - get() = speed - } - @Test fun testSmoke() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + val provider = SimResourceSource(1.0, clock, scheduler) - val consumer = SimWorkConsumer<SimCpu>(1.0, 1.0) + val consumer = SimWorkConsumer(1.0, 1.0) try { provider.consume(consumer) @@ -60,9 +55,9 @@ internal class SimWorkConsumerTest { fun testUtilization() = runBlockingTest { val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(SimCpu(1.0), clock, scheduler) + val provider = SimResourceSource(1.0, clock, scheduler) - val consumer = SimWorkConsumer<SimCpu>(1.0, 0.5) + val consumer = SimWorkConsumer(1.0, 0.5) try { provider.consume(consumer) |
