diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-21 14:34:10 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-06-21 14:34:10 +0200 |
| commit | ba32561e4b0e00c00d528df615a58e396e0fddc0 (patch) | |
| tree | 06bbb8161910a81b3dc1160b180650660bc50be4 | |
| parent | b8b0f39028af90fa54b42a00214b2ea9a5e48e2e (diff) | |
| parent | 966715d7df139a431293f5c2fc67916fbcc1ecfb (diff) | |
simulator: Optimize resource interpreter implementation
This pull request implements several optimizations in the resource interpreter implementation.
* Interpreter is now shared across hosts in experiments
* Interpreter allocations are pooled where possible
* Resource lifecycle concept is eliminated
* Optimized flag management in interpreter
25 files changed, 350 insertions, 458 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 540e27fe..c9527bb6 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -42,7 +42,6 @@ import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.failures.FailureDomain import org.opendc.simulator.resources.SimResourceInterpreter -import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume @@ -56,7 +55,7 @@ public class SimHost( model: SimMachineModel, override val meta: Map<String, Any>, context: CoroutineContext, - clock: Clock, + interpreter: SimResourceInterpreter, meter: Meter, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor, @@ -70,12 +69,12 @@ public class SimHost( model: SimMachineModel, meta: Map<String, Any>, context: CoroutineContext, - clock: Clock, + interpreter: SimResourceInterpreter, meter: Meter, hypervisor: SimHypervisorProvider, powerModel: PowerModel = ConstantPowerModel(0.0), mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper) + ) : this(uid, name, model, meta, context, interpreter, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper) /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. @@ -98,11 +97,6 @@ public class SimHost( private var availableMemory: Long = model.memory.sumOf { it.size } /** - * The resource interpreter to schedule the resource interactions. - */ - private val interpreter = SimResourceInterpreter(context, clock) - - /** * The machine to run on. */ public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, scalingDriver) diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a6cff3ba..79489fdb 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -47,6 +47,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.util.UUID @@ -83,7 +84,8 @@ internal class SimHostTest { .setClock(clock.toOtelClock()) .build() - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val interpreter = SimResourceInterpreter(coroutineContext, clock) + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 0fbb7280..0485415c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -50,6 +50,7 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector +import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.io.File @@ -144,6 +145,7 @@ public suspend fun withComputeService( scheduler: ComputeScheduler, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { + val interpreter = SimResourceInterpreter(coroutineContext, clock) val hosts = environmentReader .use { it.read() } .map { def -> @@ -153,7 +155,7 @@ public suspend fun withComputeService( def.model, def.meta, coroutineContext, - clock, + interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), def.powerModel diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index 65915cc6..2f14776a 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -49,6 +49,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.* import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter import java.io.File import java.time.Clock import java.util.* @@ -120,6 +121,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val model = createMachineModel() + val interpreter = SimResourceInterpreter(coroutineContext, clock) val hosts = List(64) { id -> SimHost( UUID(0, id.toLong()), @@ -127,7 +129,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { model, emptyMap(), coroutineContext, - clock, + interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), PerformanceScalingGovernor(), diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index d24ed1f3..c560cd28 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -129,6 +129,10 @@ public abstract class SimAbstractHypervisor( override fun close() { super.close() + for (cpu in cpus) { + cpu.close() + } + _vms.remove(this) } } @@ -137,9 +141,9 @@ public abstract class SimAbstractHypervisor( * A [SimProcessingUnit] of a virtual machine. */ private class VCpu( - private val source: SimResourceProvider, + private val source: SimResourceCloseableProvider, override val model: ProcessingUnit - ) : SimProcessingUnit, SimResourceProvider by source { + ) : SimProcessingUnit, SimResourceCloseableProvider by source { override var capacity: Double get() = source.capacity set(_) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 93d306cf..3a70680c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -115,11 +115,6 @@ public abstract class SimAbstractMachine( isTerminated = true cancel() - interpreter.batch { - for (cpu in cpus) { - cpu.close() - } - } } /* SimResourceSystem */ diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index 11034a57..3ce85d02 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -83,7 +83,7 @@ public class SimPdu( /** * A PDU outlet. */ - public class Outlet(private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable { + public class Outlet(private val provider: SimResourceCloseableProvider) : SimPowerOutlet(), AutoCloseable { override fun onConnect(inlet: SimPowerInlet) { provider.startConsumer(inlet.createConsumer()) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 5fe7d7bb..84217278 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -56,8 +56,6 @@ public abstract class SimAbstractResourceAggregator( /* SimResourceAggregator */ override fun addInput(input: SimResourceProvider) { - check(state != SimResourceState.Stopped) { "Aggregator has been stopped" } - val consumer = Consumer() _inputs.add(input) _inputConsumers.add(consumer) @@ -70,8 +68,8 @@ public abstract class SimAbstractResourceAggregator( private val _inputConsumers = mutableListOf<Consumer>() /* SimResourceProvider */ - override val state: SimResourceState - get() = _output.state + override val isActive: Boolean + get() = _output.isActive override val capacity: Double get() = _output.capacity @@ -97,10 +95,6 @@ public abstract class SimAbstractResourceAggregator( _output.interrupt() } - override fun close() { - _output.close() - } - private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index de26f99e..c1b1450e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -33,6 +33,12 @@ public abstract class SimAbstractResourceProvider( initialCapacity: Double ) : SimResourceProvider { /** + * A flag to indicate that the resource provider is active. + */ + public override val isActive: Boolean + get() = ctx != null + + /** * The capacity of the resource. */ public override var capacity: Double = initialCapacity @@ -67,12 +73,6 @@ public abstract class SimAbstractResourceProvider( private set /** - * The state of the resource provider. - */ - final override var state: SimResourceState = SimResourceState.Pending - private set - - /** * Construct the [SimResourceProviderLogic] instance for a new consumer. */ protected abstract fun createLogic(): SimResourceProviderLogic @@ -96,21 +96,15 @@ public abstract class SimAbstractResourceProvider( } final override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } + check(ctx == null) { "Resource is in invalid state" } val ctx = interpreter.newContext(consumer, createLogic(), parent) ctx.capacity = capacity this.ctx = ctx - this.state = SimResourceState.Active start(ctx) } - override fun close() { - cancel() - state = SimResourceState.Stopped - } - final override fun interrupt() { ctx?.interrupt() } @@ -121,10 +115,6 @@ public abstract class SimAbstractResourceProvider( this.ctx = null ctx.close() } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } } override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]" diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index c39c1aca..991cda7a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -45,7 +45,7 @@ public class SimResourceAggregatorMaxMin( val command = if (grantedWork > 0.0 && grantedSpeed > 0.0) SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) else - SimResourceCommand.Idle(deadline) + SimResourceCommand.Idle() input.push(command) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt new file mode 100644 index 00000000..bce8274b --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceProvider] that has a controllable and limited lifetime. + * + * This interface is used to signal that the resource provider may be closed and not reused after that point. + */ +public interface SimResourceCloseableProvider : SimResourceProvider, AutoCloseable { + /** + * End the lifetime of the resource provider. + * + * This operation cancels the existing resource consumer and prevents the resource provider from being reused. + */ + public override fun close() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index e0333ff9..6bfbfc99 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -29,10 +29,10 @@ public interface SimResourceDistributor : SimResourceConsumer { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set<SimResourceProvider> + public val outputs: Set<SimResourceCloseableProvider> /** * Create a new output for the distributor. */ - public fun newOutput(): SimResourceProvider + public fun newOutput(): SimResourceCloseableProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index be9e89fb..d8fc8cb6 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -32,7 +32,7 @@ public class SimResourceDistributorMaxMin( private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null ) : SimResourceDistributor { - override val outputs: Set<SimResourceProvider> + override val outputs: Set<SimResourceCloseableProvider> get() = _outputs private val _outputs = mutableSetOf<Output>() @@ -57,7 +57,7 @@ public class SimResourceDistributorMaxMin( private var totalAllocatedSpeed = 0.0 /* SimResourceDistributor */ - override fun newOutput(): SimResourceProvider { + override fun newOutput(): SimResourceCloseableProvider { val provider = Output(ctx?.capacity ?: 0.0) _outputs.add(provider) return provider @@ -65,7 +65,7 @@ public class SimResourceDistributorMaxMin( /* SimResourceConsumer */ override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx.capacity) + return doNext(ctx) } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -94,12 +94,13 @@ public class SimResourceDistributorMaxMin( /** * Schedule the work of the outputs. */ - private fun doNext(capacity: Double): SimResourceCommand { + private fun doNext(ctx: SimResourceContext): SimResourceCommand { // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { return SimResourceCommand.Idle() } + val capacity = ctx.capacity var duration: Double = Double.MAX_VALUE var deadline: Long = Long.MAX_VALUE var availableSpeed = capacity @@ -112,7 +113,7 @@ public class SimResourceDistributorMaxMin( output.pull() // Remove outputs that have finished - if (output.isFinished) { + if (!output.isActive) { outputIterator.remove() } } @@ -125,33 +126,23 @@ public class SimResourceDistributorMaxMin( var remaining = activeOutputs.size for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- + val grantedSpeed = min(output.allowedSpeed, availableShare) + deadline = min(deadline, output.deadline) - when (val command = output.activeCommand) { - is SimResourceCommand.Idle -> { - deadline = min(deadline, command.deadline) - output.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(output.allowedSpeed, availableShare) - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - output.actualSpeed = 0.0 - continue - } + // Ignore idle computation + if (grantedSpeed <= 0.0 || output.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } - totalRequestedSpeed += command.limit - totalRequestedWork += command.work + totalRequestedSpeed += output.limit + totalRequestedWork += output.work - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed - // The duration that we want to run is that of the shortest request of an output - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } - } + // The duration that we want to run is that of the shortest request of an output + duration = min(duration, output.work / grantedSpeed) } assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } @@ -178,11 +169,30 @@ public class SimResourceDistributorMaxMin( /** * An internal [SimResourceProvider] implementation for switch outputs. */ - private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable<Output> { + private inner class Output(capacity: Double) : + SimAbstractResourceProvider(interpreter, parent, capacity), + SimResourceCloseableProvider, + SimResourceProviderLogic, + Comparable<Output> { + /** + * A flag to indicate that the output is closed. + */ + private var isClosed: Boolean = false + + /** + * The current requested work. + */ + var work: Double = 0.0 + /** - * The current command that is processed by the resource. + * The requested limit. */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + var limit: Double = 0.0 + + /** + * The current deadline. + */ + var deadline: Long = Long.MAX_VALUE /** * The processing speed that is allowed by the model constraints. @@ -195,12 +205,6 @@ public class SimResourceDistributorMaxMin( var actualSpeed: Double = 0.0 /** - * A flag to indicate that the output is finished. - */ - val isFinished - get() = activeCommand is SimResourceCommand.Exit - - /** * The timestamp at which we received the last command. */ private var lastCommandTimestamp: Long = Long.MIN_VALUE @@ -209,6 +213,8 @@ public class SimResourceDistributorMaxMin( override fun createLogic(): SimResourceProviderLogic = this override fun start(ctx: SimResourceControllableContext) { + check(!isClosed) { "Cannot re-use closed output" } + activeOutputs += this interpreter.batch { @@ -219,27 +225,27 @@ public class SimResourceDistributorMaxMin( } override fun close() { - val state = state - - super.close() - - if (state != SimResourceState.Stopped) { - _outputs.remove(this) - } + isClosed = true + cancel() + _outputs.remove(this) } /* SimResourceProviderLogic */ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) + this.deadline = deadline + work = 0.0 + limit = 0.0 lastCommandTimestamp = ctx.clock.millis() return Long.MAX_VALUE } override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - allowedSpeed = ctx.speed - activeCommand = SimResourceCommand.Consume(work, limit, deadline) + allowedSpeed = min(ctx.capacity, limit) + this.work = work + this.limit = limit + this.deadline = deadline lastCommandTimestamp = ctx.clock.millis() return Long.MAX_VALUE @@ -250,7 +256,9 @@ public class SimResourceDistributorMaxMin( } override fun onFinish(ctx: SimResourceControllableContext) { - activeCommand = SimResourceCommand.Exit + work = 0.0 + limit = 0.0 + deadline = Long.MAX_VALUE lastCommandTimestamp = ctx.clock.millis() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index f709ca17..b68b7261 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -29,11 +29,11 @@ import kotlin.coroutines.resumeWithException /** * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. */ -public interface SimResourceProvider : AutoCloseable { +public interface SimResourceProvider { /** - * The state of the resource. + * A flag to indicate that the resource provider is currently being consumed by a [SimResourceConsumer]. */ - public val state: SimResourceState + public val isActive: Boolean /** * The resource capacity available at this instant. @@ -71,13 +71,6 @@ public interface SimResourceProvider : AutoCloseable { * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. */ public fun cancel() - - /** - * End the lifetime of the resource. - * - * This operation terminates the existing resource consumer. - */ - public override fun close() } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index e224285e..f6e7b22f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -29,7 +29,7 @@ public interface SimResourceSwitch : AutoCloseable { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set<SimResourceProvider> + public val outputs: Set<SimResourceCloseableProvider> /** * The input resources that will be switched between the output providers. @@ -44,7 +44,7 @@ public interface SimResourceSwitch : AutoCloseable { /** * Create a new output on the switch. */ - public fun newOutput(): SimResourceProvider + public fun newOutput(): SimResourceCloseableProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 2950af80..4ff741ed 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -35,7 +35,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private var isClosed: Boolean = false private val _outputs = mutableSetOf<Provider>() - override val outputs: Set<SimResourceProvider> + override val outputs: Set<SimResourceCloseableProvider> get() = _outputs private val availableResources = ArrayDeque<SimResourceTransformer>() @@ -61,7 +61,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" } - override fun newOutput(): SimResourceProvider { + override fun newOutput(): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() @@ -101,7 +101,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { _inputs.forEach(SimResourceProvider::cancel) } - private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { + private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceCloseableProvider, SimResourceProvider by forwarder { override fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. _outputs -= this diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 684a1b52..50d58798 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -33,7 +33,7 @@ public class SimResourceSwitchMaxMin( /** * The output resource providers to which resource consumers can be attached. */ - override val outputs: Set<SimResourceProvider> + override val outputs: Set<SimResourceCloseableProvider> get() = distributor.outputs /** @@ -70,7 +70,7 @@ public class SimResourceSwitchMaxMin( /** * Add an output to the switch. */ - override fun newOutput(): SimResourceProvider { + override fun newOutput(): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } return distributor.newOutput() @@ -88,7 +88,7 @@ public class SimResourceSwitchMaxMin( override fun close() { if (!isClosed) { isClosed = true - aggregator.close() + aggregator.cancel() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index fd3d1230..cec27e1c 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -33,7 +33,7 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl public class SimResourceTransformer( private val isCoupled: Boolean = false, private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand -) : SimResourceFlow { +) : SimResourceFlow, AutoCloseable { /** * The [SimResourceContext] in which the forwarder runs. */ @@ -49,11 +49,8 @@ public class SimResourceTransformer( */ private var hasDelegateStarted: Boolean = false - /** - * The state of the forwarder. - */ - override var state: SimResourceState = SimResourceState.Pending - private set + override val isActive: Boolean + get() = delegate != null override val capacity: Double get() = ctx?.capacity ?: 0.0 @@ -69,9 +66,8 @@ public class SimResourceTransformer( private val _counters = SimResourceCountersImpl() override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } + check(delegate == null) { "Resource transformer already active" } - state = SimResourceState.Active delegate = consumer // Interrupt the provider to replace the consumer @@ -86,19 +82,18 @@ public class SimResourceTransformer( val delegate = delegate val ctx = ctx - state = SimResourceState.Pending - - if (delegate != null && ctx != null) { + if (delegate != null) { this.delegate = null - delegate.onEvent(ctx, SimResourceEvent.Exit) + + if (ctx != null) { + delegate.onEvent(ctx, SimResourceEvent.Exit) + } } } override fun close() { val ctx = ctx - state = SimResourceState.Stopped - if (ctx != null) { this.ctx = null ctx.interrupt() @@ -114,9 +109,7 @@ public class SimResourceTransformer( updateCounters(ctx) - return if (state == SimResourceState.Stopped) { - SimResourceCommand.Exit - } else if (delegate != null) { + return if (delegate != null) { val command = transform(ctx, delegate.onNext(ctx)) _work = if (command is SimResourceCommand.Consume) command.work else 0.0 @@ -128,7 +121,7 @@ public class SimResourceTransformer( delegate.onEvent(ctx, SimResourceEvent.Exit) - if (isCoupled || state == SimResourceState.Stopped) + if (isCoupled) SimResourceCommand.Exit else onNext(ctx) @@ -184,10 +177,6 @@ public class SimResourceTransformer( private fun reset() { delegate = null hasDelegateStarted = false - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 46c5c63f..90c7bc75 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -39,7 +39,8 @@ internal class SimResourceContextImpl( * The clock of the context. */ override val clock: Clock - get() = interpreter.clock + get() = _clock + private val _clock = interpreter.clock /** * The capacity of the resource. @@ -59,18 +60,7 @@ internal class SimResourceContextImpl( * The amount of work still remaining at this instant. */ override val remainingWork: Double - get() { - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(now).also { _remainingWork = it } - } else { - _remainingWork - } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE + get() = getRemainingWork(_clock.millis()) /** * A flag to indicate the state of the context. @@ -92,20 +82,6 @@ internal class SimResourceContextImpl( override val demand: Double get() = _limit - private val counters = object : SimResourceCounters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var overcommit: Double = 0.0 - - override fun reset() { - demand = 0.0 - actual = 0.0 - overcommit = 0.0 - } - - override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" - } - /** * The current state of the resource context. */ @@ -117,7 +93,7 @@ internal class SimResourceContextImpl( /** * The update flag indicating why the update was triggered. */ - private var _flag: Flag = Flag.None + private var _flag: Int = 0 /** * The current pending update. @@ -147,7 +123,7 @@ internal class SimResourceContextImpl( return } - enableFlag(Flag.Interrupt) + _flag = _flag or FLAG_INTERRUPT scheduleUpdate() } @@ -156,7 +132,7 @@ internal class SimResourceContextImpl( return } - enableFlag(Flag.Invalidate) + _flag = _flag or FLAG_INVALIDATE scheduleUpdate() } @@ -173,7 +149,7 @@ internal class SimResourceContextImpl( */ fun requiresUpdate(timestamp: Long): Boolean { // Either the resource context is flagged or there is a pending update at this timestamp - return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp + return _flag != 0 || _pendingUpdate?.timestamp == timestamp } /** @@ -185,7 +161,7 @@ internal class SimResourceContextImpl( val newState = doUpdate(timestamp, oldState) _state = newState - _flag = Flag.None + _flag = 0 when (newState) { SimResourceState.Pending -> @@ -222,8 +198,8 @@ internal class SimResourceContextImpl( // Resource context is not active, so its state will not update SimResourceState.Pending, SimResourceState.Stopped -> state SimResourceState.Active -> { - val isInterrupted = _flag == Flag.Interrupt - val remainingWork = remainingWork + val isInterrupted = _flag and FLAG_INTERRUPT != 0 + val remainingWork = getRemainingWork(timestamp) val isConsume = _limit > 0.0 // Update the resource counters only if there is some progress @@ -236,11 +212,15 @@ internal class SimResourceContextImpl( // 2. The resource capacity cannot satisfy the demand. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) { - next(timestamp) + when (val command = consumer.onNext(this)) { + is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline) + is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline) + is SimResourceCommand.Exit -> interpretExit() + } } else if (isConsume) { - interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp) + interpretConsume(timestamp, remainingWork, _limit, _deadline) } else { - interpret(SimResourceCommand.Idle(_deadline), timestamp) + interpretIdle(timestamp, _deadline) } } } @@ -273,57 +253,65 @@ internal class SimResourceContextImpl( } /** - * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + * Interpret the [SimResourceCommand.Consume] command. */ - private fun interpret(command: SimResourceCommand, now: Long): SimResourceState { - return when (command) { - is SimResourceCommand.Idle -> { - val deadline = command.deadline + private fun interpretConsume(now: Long, work: Double, limit: Double, deadline: Long): SimResourceState { + require(deadline >= now) { "Deadline already passed" } - require(deadline >= now) { "Deadline already passed" } + _speed = min(capacity, limit) + _work = work + _limit = limit + _deadline = deadline - _speed = 0.0 - _work = 0.0 - _limit = 0.0 - _deadline = deadline + val timestamp = logic.onConsume(this, work, limit, deadline) + scheduleUpdate(timestamp) - val timestamp = logic.onIdle(this, deadline) - scheduleUpdate(timestamp) + return SimResourceState.Active + } - SimResourceState.Active - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline + /** + * Interpret the [SimResourceCommand.Idle] command. + */ + private fun interpretIdle(now: Long, deadline: Long): SimResourceState { + require(deadline >= now) { "Deadline already passed" } - require(deadline >= now) { "Deadline already passed" } + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = deadline - _speed = min(capacity, limit) - _work = work - _limit = limit - _deadline = deadline + val timestamp = logic.onIdle(this, deadline) + scheduleUpdate(timestamp) - val timestamp = logic.onConsume(this, work, limit, deadline) - scheduleUpdate(timestamp) + return SimResourceState.Active + } - SimResourceState.Active - } - is SimResourceCommand.Exit -> { - _speed = 0.0 - _work = 0.0 - _limit = 0.0 - _deadline = Long.MAX_VALUE + /** + * Interpret the [SimResourceCommand.Exit] command. + */ + private fun interpretExit(): SimResourceState { + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = Long.MAX_VALUE - SimResourceState.Stopped - } - } + return SimResourceState.Stopped } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE + /** - * Request the workload for more work. + * Obtain the remaining work at the given timestamp. */ - private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now) + private fun getRemainingWork(now: Long): Double { + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(now).also { _remainingWork = it } + } else { + _remainingWork + } + } /** * Compute the remaining work based on the current state. @@ -357,25 +345,6 @@ internal class SimResourceContextImpl( } /** - * Enable the specified [flag] taking into account precedence. - */ - private fun enableFlag(flag: Flag) { - _flag = when (_flag) { - Flag.None -> flag - Flag.Invalidate -> - when (flag) { - Flag.None -> flag - else -> flag - } - Flag.Interrupt -> - when (flag) { - Flag.None, Flag.Invalidate -> flag - else -> flag - } - } - } - - /** * Schedule an update for this resource context. */ private fun scheduleUpdate() { @@ -411,12 +380,12 @@ internal class SimResourceContextImpl( } /** - * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or - * interrupted. + * A flag to indicate that the context should be invalidated. */ - enum class Flag { - None, - Interrupt, - Invalidate - } + private val FLAG_INVALIDATE = 0b01 + + /** + * A flag to indicate that the context should be interrupted. + */ + private val FLAG_INTERRUPT = 0b10 } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt index cb0d6160..6dd02ae5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -48,12 +48,12 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * The queue of resource updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque<Update>() + private val queue = ArrayDeque<SimResourceContextImpl>() /** * A priority queue containing the resource updates to be scheduled in the future. */ - private val futureQueue = PriorityQueue<Update>() + private val futureQueue = PriorityQueue<Update>(compareBy { it.timestamp }) /** * The stack of interpreter invocations to occur in the future. @@ -83,7 +83,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, * re-computed. In case no interpreter is currently active, the interpreter will be started. */ fun scheduleImmediate(ctx: SimResourceContextImpl) { - queue.add(Update(ctx, Long.MIN_VALUE)) + queue.add(ctx) // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked // up by the active interpreter. @@ -137,7 +137,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, require(timestamp >= now) { "Timestamp must be in the future" } - val update = Update(ctx, timestamp) + val update = allocUpdate(ctx, timestamp) futureQueue.add(update) // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference. @@ -193,9 +193,16 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, futureQueue.poll() - if (update(now) && visited.add(update.ctx)) { - collectAncestors(update.ctx, visited) + val shouldExecute = !update.isCancelled && update.ctx.requiresUpdate(now) + if (shouldExecute) { + update.ctx.doUpdate(now) + + if (visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } } + + updatePool.add(update) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -203,9 +210,15 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, do { // Execute all immediate updates while (true) { - val update = queue.poll() ?: break - if (update(now) && visited.add(update.ctx)) { - collectAncestors(update.ctx, visited) + val ctx = queue.poll() ?: break + val shouldExecute = ctx.requiresUpdate(now) + + if (shouldExecute) { + ctx.doUpdate(now) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) + } } } @@ -278,6 +291,26 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** + * The pool of existing updates. + */ + private val updatePool = ArrayDeque<Update>() + + /** + * Allocate an [Update] object. + */ + private fun allocUpdate(ctx: SimResourceContextImpl, timestamp: Long): Update { + val update = updatePool.poll() + return if (update != null) { + update.ctx = ctx + update.timestamp = timestamp + update.isCancelled = false + update + } else { + Update(ctx, timestamp) + } + } + + /** * A future interpreter invocation. * * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case @@ -299,7 +332,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be * cancelled if the resource context was invalidated in the meantime. */ - class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable<Update> { + class Update(@JvmField var ctx: SimResourceContextImpl, @JvmField var timestamp: Long) { /** * A flag to indicate that the task has been cancelled. */ @@ -313,19 +346,6 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, isCancelled = true } - /** - * Immediately run update. - */ - operator fun invoke(timestamp: Long): Boolean { - val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp) - if (shouldExecute) { - ctx.doUpdate(timestamp) - } - return shouldExecute - } - - override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp) - override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]" } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 51024e80..2f01a8c4 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -26,7 +26,7 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.* -import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows @@ -58,17 +58,13 @@ internal class SimResourceAggregatorMaxMinTest { val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) source.startConsumer(adapter) - try { - aggregator.consume(consumer) - yield() + aggregator.consume(consumer) + yield() - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 0.5, 0.0), usage) } - ) - } finally { - aggregator.close() - } + assertAll( + { assertEquals(1000, clock.millis()) }, + { assertEquals(listOf(0.0, 0.5, 0.0), usage) } + ) } @Test @@ -86,16 +82,12 @@ internal class SimResourceAggregatorMaxMinTest { val usage = mutableListOf<Double>() val adapter = SimSpeedConsumerAdapter(consumer, usage::add) - try { - aggregator.consume(adapter) - yield() - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 2.0, 0.0), usage) } - ) - } finally { - aggregator.close() - } + aggregator.consume(adapter) + yield() + assertAll( + { assertEquals(1000, clock.millis()) }, + { assertEquals(listOf(0.0, 2.0, 0.0), usage) } + ) } @Test @@ -114,15 +106,11 @@ internal class SimResourceAggregatorMaxMinTest { .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) .andThen(SimResourceCommand.Exit) - try { - aggregator.consume(consumer) - yield() - assertEquals(1000, clock.millis()) + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) - verify(exactly = 2) { consumer.onNext(any()) } - } finally { - aggregator.close() - } + verify(exactly = 2) { consumer.onNext(any()) } } @Test @@ -141,13 +129,9 @@ internal class SimResourceAggregatorMaxMinTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException("Test Exception")) - try { - assertThrows<IllegalStateException> { aggregator.consume(consumer) } - yield() - assertEquals(SimResourceState.Pending, sources[0].state) - } finally { - aggregator.close() - } + assertThrows<IllegalStateException> { aggregator.consume(consumer) } + yield() + assertFalse(sources[0].isActive) } @Test @@ -162,17 +146,13 @@ internal class SimResourceAggregatorMaxMinTest { sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(4.0, 1.0) - try { - coroutineScope { - launch { aggregator.consume(consumer) } - delay(1000) - sources[0].capacity = 0.5 - } - yield() - assertEquals(2334, clock.millis()) - } finally { - aggregator.close() + coroutineScope { + launch { aggregator.consume(consumer) } + delay(1000) + sources[0].capacity = 0.5 } + yield() + assertEquals(2334, clock.millis()) } @Test @@ -187,17 +167,13 @@ internal class SimResourceAggregatorMaxMinTest { sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) - try { - coroutineScope { - launch { aggregator.consume(consumer) } - delay(500) - sources[0].capacity = 0.5 - } - yield() - assertEquals(1000, clock.millis()) - } finally { - aggregator.close() + coroutineScope { + launch { aggregator.consume(consumer) } + delay(500) + sources[0].capacity = 0.5 } + yield() + assertEquals(1000, clock.millis()) } @Test @@ -216,13 +192,9 @@ internal class SimResourceAggregatorMaxMinTest { .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) .andThen(SimResourceCommand.Exit) - try { - aggregator.consume(consumer) - yield() - assertEquals(1000, clock.millis()) - assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } - } finally { - aggregator.close() - } + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) + assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 08d88093..4895544d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -50,16 +50,12 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) .andThen(SimResourceCommand.Exit) - try { - val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val res = mutableListOf<Double>() + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(adapter) + provider.consume(adapter) - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } finally { - provider.close() - } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } @Test @@ -69,17 +65,13 @@ class SimResourceSourceTest { val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - try { - coroutineScope { - launch { provider.consume(consumer) } - delay(1000) - provider.capacity = 0.5 - } - assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } - } finally { - provider.close() + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 } + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } @Test @@ -93,16 +85,12 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) .andThen(SimResourceCommand.Exit) - try { - val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val res = mutableListOf<Double>() + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(adapter) + provider.consume(adapter) - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } finally { - provider.close() - } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } /** @@ -125,11 +113,7 @@ class SimResourceSourceTest { } } - try { - provider.consume(consumer) - } finally { - provider.close() - } + provider.consume(consumer) } @Test @@ -160,17 +144,13 @@ class SimResourceSourceTest { } } - try { - launch { - yield() - resCtx.interrupt() - } - provider.consume(consumer) - - assertEquals(0, clock.millis()) - } finally { - provider.close() + launch { + yield() + resCtx.interrupt() } + provider.consume(consumer) + + assertEquals(0, clock.millis()) } @Test @@ -183,12 +163,8 @@ class SimResourceSourceTest { every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } .throws(IllegalStateException()) - try { - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } finally { - provider.close() + assertThrows<IllegalStateException> { + provider.consume(consumer) } } @@ -203,12 +179,8 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) - try { - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } finally { - provider.close() + assertThrows<IllegalStateException> { + provider.consume(consumer) } } @@ -223,41 +195,16 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) - try { - assertThrows<IllegalStateException> { - coroutineScope { - launch { provider.consume(consumer) } - provider.consume(consumer) - } - } - } finally { - provider.close() - } - } - - @Test - fun testClosedConsumption() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) - - try { - assertThrows<IllegalStateException> { - provider.close() + assertThrows<IllegalStateException> { + coroutineScope { + launch { provider.consume(consumer) } provider.consume(consumer) } - } finally { - provider.close() } } @Test - fun testCloseDuringConsumption() = runBlockingSimulation { + fun testCancelDuringConsumption() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -267,15 +214,11 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) - try { - launch { provider.consume(consumer) } - delay(500) - provider.close() + launch { provider.consume(consumer) } + delay(500) + provider.cancel() - assertEquals(500, clock.millis()) - } finally { - provider.close() - } + assertEquals(500, clock.millis()) } @Test @@ -289,13 +232,9 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Idle(clock.millis() + 500)) .andThen(SimResourceCommand.Exit) - try { - provider.consume(consumer) + provider.consume(consumer) - assertEquals(500, clock.millis()) - } finally { - provider.close() - } + assertEquals(500, clock.millis()) } @Test @@ -311,11 +250,7 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Idle()) .andThenThrows(IllegalStateException()) - try { - provider.consume(consumer) - } finally { - provider.close() - } + provider.consume(consumer) } } } @@ -331,12 +266,8 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Idle(2)) .andThen(SimResourceCommand.Exit) - try { - delay(10) + delay(10) - assertThrows<IllegalArgumentException> { provider.consume(consumer) } - } finally { - provider.close() - } + assertThrows<IllegalArgumentException> { provider.consume(consumer) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 810052b8..cf69b7b5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -40,15 +40,12 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceTransformerTest { @Test - fun testExitImmediately() = runBlockingSimulation { + fun testCancelImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - launch { - source.consume(forwarder) - source.close() - } + launch { source.consume(forwarder) } forwarder.consume(object : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand { @@ -57,18 +54,16 @@ internal class SimResourceTransformerTest { }) forwarder.close() + source.cancel() } @Test - fun testExit() = runBlockingSimulation { + fun testCancel() = runBlockingSimulation { val forwarder = SimResourceForwarder() val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - launch { - source.consume(forwarder) - source.close() - } + launch { source.consume(forwarder) } forwarder.consume(object : SimResourceConsumer { var isFirst = true @@ -84,6 +79,7 @@ internal class SimResourceTransformerTest { }) forwarder.close() + source.cancel() } @Test @@ -93,18 +89,18 @@ internal class SimResourceTransformerTest { override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit } - assertEquals(SimResourceState.Pending, forwarder.state) + assertFalse(forwarder.isActive) forwarder.startConsumer(consumer) - assertEquals(SimResourceState.Active, forwarder.state) + assertTrue(forwarder.isActive) assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } forwarder.cancel() - assertEquals(SimResourceState.Pending, forwarder.state) + assertFalse(forwarder.isActive) forwarder.close() - assertEquals(SimResourceState.Stopped, forwarder.state) + assertFalse(forwarder.isActive) } @Test @@ -171,7 +167,7 @@ internal class SimResourceTransformerTest { forwarder.consume(consumer) yield() - assertEquals(SimResourceState.Pending, source.state) + assertFalse(forwarder.isActive) } @Test diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index db4fe856..42648cf1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -41,12 +41,8 @@ internal class SimWorkConsumerTest { val consumer = SimWorkConsumer(1.0, 1.0) - try { - provider.consume(consumer) - assertEquals(1000, clock.millis()) - } finally { - provider.close() - } + provider.consume(consumer) + assertEquals(1000, clock.millis()) } @Test @@ -56,11 +52,7 @@ internal class SimWorkConsumerTest { val consumer = SimWorkConsumer(1.0, 0.5) - try { - provider.consume(consumer) - assertEquals(2000, clock.millis()) - } finally { - provider.close() - } + provider.consume(consumer) + assertEquals(2000, clock.millis()) } } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index a8d3a9e8..6807572b 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -43,6 +43,7 @@ import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -68,6 +69,7 @@ internal class WorkflowServiceIntegrationTest { .setClock(clock.toOtelClock()) .build() + val interpreter = SimResourceInterpreter(coroutineContext, clock) val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) .use { it.read() } .map { def -> @@ -77,7 +79,7 @@ internal class WorkflowServiceIntegrationTest { def.model, def.meta, coroutineContext, - clock, + interpreter, MeterProvider.noop().get("opendc-compute-simulator"), SimSpaceSharedHypervisorProvider() ) |
