From df3c9dc3fcd2f89910575bfdc24a3db3af9eba0f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 20 Jun 2021 22:21:39 +0200 Subject: exp: Enable interpreter sharing across hosts This change enables the experiments to share the SimResourceInterpreter across multiple hosts, which allows updates to be scheduled efficiently for all machines at the same time. This is especially beneficial if the machines operate on the same time slices. --- .../src/main/kotlin/org/opendc/compute/simulator/SimHost.kt | 11 +++-------- .../test/kotlin/org/opendc/compute/simulator/SimHostTest.kt | 4 +++- .../org/opendc/experiments/capelin/ExperimentHelpers.kt | 4 +++- .../org/opendc/experiments/energy21/EnergyExperiment.kt | 4 +++- .../opendc/workflow/service/WorkflowServiceIntegrationTest.kt | 4 +++- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 540e27fe..557fa97a 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -56,7 +56,7 @@ public class SimHost( model: SimMachineModel, override val meta: Map, context: CoroutineContext, - clock: Clock, + interpreter: SimResourceInterpreter, meter: Meter, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor, @@ -70,12 +70,12 @@ public class SimHost( model: SimMachineModel, meta: Map, context: CoroutineContext, - clock: Clock, + interpreter: SimResourceInterpreter, meter: Meter, hypervisor: SimHypervisorProvider, powerModel: PowerModel = ConstantPowerModel(0.0), mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper) + ) : this(uid, name, model, meta, context, interpreter, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper) /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. @@ -97,11 +97,6 @@ public class SimHost( */ private var availableMemory: Long = model.memory.sumOf { it.size } - /** - * The resource interpreter to schedule the resource interactions. - */ - private val interpreter = SimResourceInterpreter(context, clock) - /** * The machine to run on. */ diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a6cff3ba..79489fdb 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -47,6 +47,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.util.UUID @@ -83,7 +84,8 @@ internal class SimHostTest { .setClock(clock.toOtelClock()) .build() - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val interpreter = SimResourceInterpreter(coroutineContext, clock) + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 0fbb7280..0485415c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -50,6 +50,7 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector +import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.io.File @@ -144,6 +145,7 @@ public suspend fun withComputeService( scheduler: ComputeScheduler, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { + val interpreter = SimResourceInterpreter(coroutineContext, clock) val hosts = environmentReader .use { it.read() } .map { def -> @@ -153,7 +155,7 @@ public suspend fun withComputeService( def.model, def.meta, coroutineContext, - clock, + interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), def.powerModel diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index 65915cc6..2f14776a 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -49,6 +49,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.* import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter import java.io.File import java.time.Clock import java.util.* @@ -120,6 +121,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val model = createMachineModel() + val interpreter = SimResourceInterpreter(coroutineContext, clock) val hosts = List(64) { id -> SimHost( UUID(0, id.toLong()), @@ -127,7 +129,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { model, emptyMap(), coroutineContext, - clock, + interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), PerformanceScalingGovernor(), diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index a8d3a9e8..6807572b 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -43,6 +43,7 @@ import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -68,6 +69,7 @@ internal class WorkflowServiceIntegrationTest { .setClock(clock.toOtelClock()) .build() + val interpreter = SimResourceInterpreter(coroutineContext, clock) val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) .use { it.read() } .map { def -> @@ -77,7 +79,7 @@ internal class WorkflowServiceIntegrationTest { def.model, def.meta, coroutineContext, - clock, + interpreter, MeterProvider.noop().get("opendc-compute-simulator"), SimSpaceSharedHypervisorProvider() ) -- cgit v1.2.3 From 882ae4a9830737ece2db9563d0f56387036a8e3d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 20 Jun 2021 22:23:31 +0200 Subject: simulator: Optimize access to remainingWork property This change updates the SimResourceContextImpl to optimize the access to the remainingWork property, which is required by many calls in the hot path. --- .../resources/impl/SimResourceContextImpl.kt | 47 +++++++++------------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 46c5c63f..237a2a77 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -39,7 +39,8 @@ internal class SimResourceContextImpl( * The clock of the context. */ override val clock: Clock - get() = interpreter.clock + get() = _clock + private val _clock = interpreter.clock /** * The capacity of the resource. @@ -59,18 +60,7 @@ internal class SimResourceContextImpl( * The amount of work still remaining at this instant. */ override val remainingWork: Double - get() { - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(now).also { _remainingWork = it } - } else { - _remainingWork - } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE + get() = getRemainingWork(_clock.millis()) /** * A flag to indicate the state of the context. @@ -92,20 +82,6 @@ internal class SimResourceContextImpl( override val demand: Double get() = _limit - private val counters = object : SimResourceCounters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var overcommit: Double = 0.0 - - override fun reset() { - demand = 0.0 - actual = 0.0 - overcommit = 0.0 - } - - override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" - } - /** * The current state of the resource context. */ @@ -223,7 +199,7 @@ internal class SimResourceContextImpl( SimResourceState.Pending, SimResourceState.Stopped -> state SimResourceState.Active -> { val isInterrupted = _flag == Flag.Interrupt - val remainingWork = remainingWork + val remainingWork = getRemainingWork(timestamp) val isConsume = _limit > 0.0 // Update the resource counters only if there is some progress @@ -325,6 +301,21 @@ internal class SimResourceContextImpl( */ private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now) + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE + + /** + * Obtain the remaining work at the given timestamp. + */ + private fun getRemainingWork(now: Long): Double { + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(now).also { _remainingWork = it } + } else { + _remainingWork + } + } + /** * Compute the remaining work based on the current state. */ -- cgit v1.2.3 From 4b9559ce78e1853600c816f8228205ddf405c5a2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 20 Jun 2021 22:24:26 +0200 Subject: simulator: Pool update allocations in interpreter This change updates the SimResourceInterpreter implementation to pool the allocations of the Update objects. This reduces the amount of allocations necessary in the hot path of the simulator. --- .../resources/impl/SimResourceInterpreterImpl.kt | 66 ++++++++++++++-------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt index cb0d6160..6dd02ae5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -48,12 +48,12 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * The queue of resource updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque() + private val queue = ArrayDeque() /** * A priority queue containing the resource updates to be scheduled in the future. */ - private val futureQueue = PriorityQueue() + private val futureQueue = PriorityQueue(compareBy { it.timestamp }) /** * The stack of interpreter invocations to occur in the future. @@ -83,7 +83,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, * re-computed. In case no interpreter is currently active, the interpreter will be started. */ fun scheduleImmediate(ctx: SimResourceContextImpl) { - queue.add(Update(ctx, Long.MIN_VALUE)) + queue.add(ctx) // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked // up by the active interpreter. @@ -137,7 +137,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, require(timestamp >= now) { "Timestamp must be in the future" } - val update = Update(ctx, timestamp) + val update = allocUpdate(ctx, timestamp) futureQueue.add(update) // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference. @@ -193,9 +193,16 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, futureQueue.poll() - if (update(now) && visited.add(update.ctx)) { - collectAncestors(update.ctx, visited) + val shouldExecute = !update.isCancelled && update.ctx.requiresUpdate(now) + if (shouldExecute) { + update.ctx.doUpdate(now) + + if (visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } } + + updatePool.add(update) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -203,9 +210,15 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, do { // Execute all immediate updates while (true) { - val update = queue.poll() ?: break - if (update(now) && visited.add(update.ctx)) { - collectAncestors(update.ctx, visited) + val ctx = queue.poll() ?: break + val shouldExecute = ctx.requiresUpdate(now) + + if (shouldExecute) { + ctx.doUpdate(now) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) + } } } @@ -277,6 +290,26 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } } + /** + * The pool of existing updates. + */ + private val updatePool = ArrayDeque() + + /** + * Allocate an [Update] object. + */ + private fun allocUpdate(ctx: SimResourceContextImpl, timestamp: Long): Update { + val update = updatePool.poll() + return if (update != null) { + update.ctx = ctx + update.timestamp = timestamp + update.isCancelled = false + update + } else { + Update(ctx, timestamp) + } + } + /** * A future interpreter invocation. * @@ -299,7 +332,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be * cancelled if the resource context was invalidated in the meantime. */ - class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable { + class Update(@JvmField var ctx: SimResourceContextImpl, @JvmField var timestamp: Long) { /** * A flag to indicate that the task has been cancelled. */ @@ -313,19 +346,6 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, isCancelled = true } - /** - * Immediately run update. - */ - operator fun invoke(timestamp: Long): Boolean { - val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp) - if (shouldExecute) { - ctx.doUpdate(timestamp) - } - return shouldExecute - } - - override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp) - override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]" } } -- cgit v1.2.3 From d54ac10449083a490e741d6c54e6f3aa07b71af0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 20 Jun 2021 22:56:33 +0200 Subject: simulator: Remove concept of resource lifecycle This change removes the AutoCloseable interface from the SimResourceProvider and removes the concept of a resource lifecycle. Instead, resource providers are now either active (running a resource consumer) or in-active (being idle), which simplifies implementation. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 1 - .../simulator/compute/SimAbstractHypervisor.kt | 8 +- .../opendc/simulator/compute/SimAbstractMachine.kt | 5 - .../kotlin/org/opendc/simulator/power/SimPdu.kt | 2 +- .../resources/SimAbstractResourceAggregator.kt | 10 +- .../resources/SimAbstractResourceProvider.kt | 24 +--- .../resources/SimResourceCloseableProvider.kt | 37 ++++++ .../simulator/resources/SimResourceDistributor.kt | 4 +- .../resources/SimResourceDistributorMaxMin.kt | 27 ++-- .../simulator/resources/SimResourceProvider.kt | 13 +- .../simulator/resources/SimResourceSwitch.kt | 4 +- .../resources/SimResourceSwitchExclusive.kt | 6 +- .../simulator/resources/SimResourceSwitchMaxMin.kt | 6 +- .../simulator/resources/SimResourceTransformer.kt | 33 ++--- .../resources/SimResourceAggregatorMaxMinTest.kt | 100 +++++--------- .../simulator/resources/SimResourceSourceTest.kt | 145 ++++++--------------- .../resources/SimResourceTransformerTest.kt | 26 ++-- .../simulator/resources/SimWorkConsumerTest.kt | 16 +-- 18 files changed, 183 insertions(+), 284 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 557fa97a..c9527bb6 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -42,7 +42,6 @@ import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.failures.FailureDomain import org.opendc.simulator.resources.SimResourceInterpreter -import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index d24ed1f3..c560cd28 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -129,6 +129,10 @@ public abstract class SimAbstractHypervisor( override fun close() { super.close() + for (cpu in cpus) { + cpu.close() + } + _vms.remove(this) } } @@ -137,9 +141,9 @@ public abstract class SimAbstractHypervisor( * A [SimProcessingUnit] of a virtual machine. */ private class VCpu( - private val source: SimResourceProvider, + private val source: SimResourceCloseableProvider, override val model: ProcessingUnit - ) : SimProcessingUnit, SimResourceProvider by source { + ) : SimProcessingUnit, SimResourceCloseableProvider by source { override var capacity: Double get() = source.capacity set(_) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 93d306cf..3a70680c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -115,11 +115,6 @@ public abstract class SimAbstractMachine( isTerminated = true cancel() - interpreter.batch { - for (cpu in cpus) { - cpu.close() - } - } } /* SimResourceSystem */ diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index 11034a57..3ce85d02 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -83,7 +83,7 @@ public class SimPdu( /** * A PDU outlet. */ - public class Outlet(private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable { + public class Outlet(private val provider: SimResourceCloseableProvider) : SimPowerOutlet(), AutoCloseable { override fun onConnect(inlet: SimPowerInlet) { provider.startConsumer(inlet.createConsumer()) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 5fe7d7bb..84217278 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -56,8 +56,6 @@ public abstract class SimAbstractResourceAggregator( /* SimResourceAggregator */ override fun addInput(input: SimResourceProvider) { - check(state != SimResourceState.Stopped) { "Aggregator has been stopped" } - val consumer = Consumer() _inputs.add(input) _inputConsumers.add(consumer) @@ -70,8 +68,8 @@ public abstract class SimAbstractResourceAggregator( private val _inputConsumers = mutableListOf() /* SimResourceProvider */ - override val state: SimResourceState - get() = _output.state + override val isActive: Boolean + get() = _output.isActive override val capacity: Double get() = _output.capacity @@ -97,10 +95,6 @@ public abstract class SimAbstractResourceAggregator( _output.interrupt() } - override fun close() { - _output.close() - } - private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index de26f99e..c1b1450e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -32,6 +32,12 @@ public abstract class SimAbstractResourceProvider( private val parent: SimResourceSystem?, initialCapacity: Double ) : SimResourceProvider { + /** + * A flag to indicate that the resource provider is active. + */ + public override val isActive: Boolean + get() = ctx != null + /** * The capacity of the resource. */ @@ -66,12 +72,6 @@ public abstract class SimAbstractResourceProvider( protected var ctx: SimResourceControllableContext? = null private set - /** - * The state of the resource provider. - */ - final override var state: SimResourceState = SimResourceState.Pending - private set - /** * Construct the [SimResourceProviderLogic] instance for a new consumer. */ @@ -96,21 +96,15 @@ public abstract class SimAbstractResourceProvider( } final override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } + check(ctx == null) { "Resource is in invalid state" } val ctx = interpreter.newContext(consumer, createLogic(), parent) ctx.capacity = capacity this.ctx = ctx - this.state = SimResourceState.Active start(ctx) } - override fun close() { - cancel() - state = SimResourceState.Stopped - } - final override fun interrupt() { ctx?.interrupt() } @@ -121,10 +115,6 @@ public abstract class SimAbstractResourceProvider( this.ctx = null ctx.close() } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } } override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]" diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt new file mode 100644 index 00000000..bce8274b --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCloseableProvider.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A [SimResourceProvider] that has a controllable and limited lifetime. + * + * This interface is used to signal that the resource provider may be closed and not reused after that point. + */ +public interface SimResourceCloseableProvider : SimResourceProvider, AutoCloseable { + /** + * End the lifetime of the resource provider. + * + * This operation cancels the existing resource consumer and prevents the resource provider from being reused. + */ + public override fun close() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index e0333ff9..6bfbfc99 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -29,10 +29,10 @@ public interface SimResourceDistributor : SimResourceConsumer { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set + public val outputs: Set /** * Create a new output for the distributor. */ - public fun newOutput(): SimResourceProvider + public fun newOutput(): SimResourceCloseableProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index be9e89fb..f7c5c5d7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -32,7 +32,7 @@ public class SimResourceDistributorMaxMin( private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null ) : SimResourceDistributor { - override val outputs: Set + override val outputs: Set get() = _outputs private val _outputs = mutableSetOf() @@ -57,7 +57,7 @@ public class SimResourceDistributorMaxMin( private var totalAllocatedSpeed = 0.0 /* SimResourceDistributor */ - override fun newOutput(): SimResourceProvider { + override fun newOutput(): SimResourceCloseableProvider { val provider = Output(ctx?.capacity ?: 0.0) _outputs.add(provider) return provider @@ -178,7 +178,16 @@ public class SimResourceDistributorMaxMin( /** * An internal [SimResourceProvider] implementation for switch outputs. */ - private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable { + private inner class Output(capacity: Double) : + SimAbstractResourceProvider(interpreter, parent, capacity), + SimResourceCloseableProvider, + SimResourceProviderLogic, + Comparable { + /** + * A flag to indicate that the output is closed. + */ + private var isClosed: Boolean = false + /** * The current command that is processed by the resource. */ @@ -209,6 +218,8 @@ public class SimResourceDistributorMaxMin( override fun createLogic(): SimResourceProviderLogic = this override fun start(ctx: SimResourceControllableContext) { + check(!isClosed) { "Cannot re-use closed output" } + activeOutputs += this interpreter.batch { @@ -219,13 +230,9 @@ public class SimResourceDistributorMaxMin( } override fun close() { - val state = state - - super.close() - - if (state != SimResourceState.Stopped) { - _outputs.remove(this) - } + isClosed = true + cancel() + _outputs.remove(this) } /* SimResourceProviderLogic */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index f709ca17..b68b7261 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -29,11 +29,11 @@ import kotlin.coroutines.resumeWithException /** * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. */ -public interface SimResourceProvider : AutoCloseable { +public interface SimResourceProvider { /** - * The state of the resource. + * A flag to indicate that the resource provider is currently being consumed by a [SimResourceConsumer]. */ - public val state: SimResourceState + public val isActive: Boolean /** * The resource capacity available at this instant. @@ -71,13 +71,6 @@ public interface SimResourceProvider : AutoCloseable { * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. */ public fun cancel() - - /** - * End the lifetime of the resource. - * - * This operation terminates the existing resource consumer. - */ - public override fun close() } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index e224285e..f6e7b22f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -29,7 +29,7 @@ public interface SimResourceSwitch : AutoCloseable { /** * The output resource providers to which resource consumers can be attached. */ - public val outputs: Set + public val outputs: Set /** * The input resources that will be switched between the output providers. @@ -44,7 +44,7 @@ public interface SimResourceSwitch : AutoCloseable { /** * Create a new output on the switch. */ - public fun newOutput(): SimResourceProvider + public fun newOutput(): SimResourceCloseableProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 2950af80..4ff741ed 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -35,7 +35,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private var isClosed: Boolean = false private val _outputs = mutableSetOf() - override val outputs: Set + override val outputs: Set get() = _outputs private val availableResources = ArrayDeque() @@ -61,7 +61,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" } - override fun newOutput(): SimResourceProvider { + override fun newOutput(): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() @@ -101,7 +101,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { _inputs.forEach(SimResourceProvider::cancel) } - private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { + private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceCloseableProvider, SimResourceProvider by forwarder { override fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. _outputs -= this diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 684a1b52..50d58798 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -33,7 +33,7 @@ public class SimResourceSwitchMaxMin( /** * The output resource providers to which resource consumers can be attached. */ - override val outputs: Set + override val outputs: Set get() = distributor.outputs /** @@ -70,7 +70,7 @@ public class SimResourceSwitchMaxMin( /** * Add an output to the switch. */ - override fun newOutput(): SimResourceProvider { + override fun newOutput(): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } return distributor.newOutput() @@ -88,7 +88,7 @@ public class SimResourceSwitchMaxMin( override fun close() { if (!isClosed) { isClosed = true - aggregator.close() + aggregator.cancel() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index fd3d1230..cec27e1c 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -33,7 +33,7 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl public class SimResourceTransformer( private val isCoupled: Boolean = false, private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand -) : SimResourceFlow { +) : SimResourceFlow, AutoCloseable { /** * The [SimResourceContext] in which the forwarder runs. */ @@ -49,11 +49,8 @@ public class SimResourceTransformer( */ private var hasDelegateStarted: Boolean = false - /** - * The state of the forwarder. - */ - override var state: SimResourceState = SimResourceState.Pending - private set + override val isActive: Boolean + get() = delegate != null override val capacity: Double get() = ctx?.capacity ?: 0.0 @@ -69,9 +66,8 @@ public class SimResourceTransformer( private val _counters = SimResourceCountersImpl() override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } + check(delegate == null) { "Resource transformer already active" } - state = SimResourceState.Active delegate = consumer // Interrupt the provider to replace the consumer @@ -86,19 +82,18 @@ public class SimResourceTransformer( val delegate = delegate val ctx = ctx - state = SimResourceState.Pending - - if (delegate != null && ctx != null) { + if (delegate != null) { this.delegate = null - delegate.onEvent(ctx, SimResourceEvent.Exit) + + if (ctx != null) { + delegate.onEvent(ctx, SimResourceEvent.Exit) + } } } override fun close() { val ctx = ctx - state = SimResourceState.Stopped - if (ctx != null) { this.ctx = null ctx.interrupt() @@ -114,9 +109,7 @@ public class SimResourceTransformer( updateCounters(ctx) - return if (state == SimResourceState.Stopped) { - SimResourceCommand.Exit - } else if (delegate != null) { + return if (delegate != null) { val command = transform(ctx, delegate.onNext(ctx)) _work = if (command is SimResourceCommand.Consume) command.work else 0.0 @@ -128,7 +121,7 @@ public class SimResourceTransformer( delegate.onEvent(ctx, SimResourceEvent.Exit) - if (isCoupled || state == SimResourceState.Stopped) + if (isCoupled) SimResourceCommand.Exit else onNext(ctx) @@ -184,10 +177,6 @@ public class SimResourceTransformer( private fun reset() { delegate = null hasDelegateStarted = false - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 51024e80..2f01a8c4 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -26,7 +26,7 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.* -import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows @@ -58,17 +58,13 @@ internal class SimResourceAggregatorMaxMinTest { val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) source.startConsumer(adapter) - try { - aggregator.consume(consumer) - yield() + aggregator.consume(consumer) + yield() - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 0.5, 0.0), usage) } - ) - } finally { - aggregator.close() - } + assertAll( + { assertEquals(1000, clock.millis()) }, + { assertEquals(listOf(0.0, 0.5, 0.0), usage) } + ) } @Test @@ -86,16 +82,12 @@ internal class SimResourceAggregatorMaxMinTest { val usage = mutableListOf() val adapter = SimSpeedConsumerAdapter(consumer, usage::add) - try { - aggregator.consume(adapter) - yield() - assertAll( - { assertEquals(1000, clock.millis()) }, - { assertEquals(listOf(0.0, 2.0, 0.0), usage) } - ) - } finally { - aggregator.close() - } + aggregator.consume(adapter) + yield() + assertAll( + { assertEquals(1000, clock.millis()) }, + { assertEquals(listOf(0.0, 2.0, 0.0), usage) } + ) } @Test @@ -114,15 +106,11 @@ internal class SimResourceAggregatorMaxMinTest { .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) .andThen(SimResourceCommand.Exit) - try { - aggregator.consume(consumer) - yield() - assertEquals(1000, clock.millis()) + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) - verify(exactly = 2) { consumer.onNext(any()) } - } finally { - aggregator.close() - } + verify(exactly = 2) { consumer.onNext(any()) } } @Test @@ -141,13 +129,9 @@ internal class SimResourceAggregatorMaxMinTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException("Test Exception")) - try { - assertThrows { aggregator.consume(consumer) } - yield() - assertEquals(SimResourceState.Pending, sources[0].state) - } finally { - aggregator.close() - } + assertThrows { aggregator.consume(consumer) } + yield() + assertFalse(sources[0].isActive) } @Test @@ -162,17 +146,13 @@ internal class SimResourceAggregatorMaxMinTest { sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(4.0, 1.0) - try { - coroutineScope { - launch { aggregator.consume(consumer) } - delay(1000) - sources[0].capacity = 0.5 - } - yield() - assertEquals(2334, clock.millis()) - } finally { - aggregator.close() + coroutineScope { + launch { aggregator.consume(consumer) } + delay(1000) + sources[0].capacity = 0.5 } + yield() + assertEquals(2334, clock.millis()) } @Test @@ -187,17 +167,13 @@ internal class SimResourceAggregatorMaxMinTest { sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) - try { - coroutineScope { - launch { aggregator.consume(consumer) } - delay(500) - sources[0].capacity = 0.5 - } - yield() - assertEquals(1000, clock.millis()) - } finally { - aggregator.close() + coroutineScope { + launch { aggregator.consume(consumer) } + delay(500) + sources[0].capacity = 0.5 } + yield() + assertEquals(1000, clock.millis()) } @Test @@ -216,13 +192,9 @@ internal class SimResourceAggregatorMaxMinTest { .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) .andThen(SimResourceCommand.Exit) - try { - aggregator.consume(consumer) - yield() - assertEquals(1000, clock.millis()) - assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } - } finally { - aggregator.close() - } + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) + assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 08d88093..4895544d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -50,16 +50,12 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1000 * capacity, capacity)) .andThen(SimResourceCommand.Exit) - try { - val res = mutableListOf() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val res = mutableListOf() + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(adapter) + provider.consume(adapter) - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } finally { - provider.close() - } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } @Test @@ -69,17 +65,13 @@ class SimResourceSourceTest { val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - try { - coroutineScope { - launch { provider.consume(consumer) } - delay(1000) - provider.capacity = 0.5 - } - assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } - } finally { - provider.close() + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 } + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } @Test @@ -93,16 +85,12 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity)) .andThen(SimResourceCommand.Exit) - try { - val res = mutableListOf() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val res = mutableListOf() + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(adapter) + provider.consume(adapter) - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } finally { - provider.close() - } + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } /** @@ -125,11 +113,7 @@ class SimResourceSourceTest { } } - try { - provider.consume(consumer) - } finally { - provider.close() - } + provider.consume(consumer) } @Test @@ -160,17 +144,13 @@ class SimResourceSourceTest { } } - try { - launch { - yield() - resCtx.interrupt() - } - provider.consume(consumer) - - assertEquals(0, clock.millis()) - } finally { - provider.close() + launch { + yield() + resCtx.interrupt() } + provider.consume(consumer) + + assertEquals(0, clock.millis()) } @Test @@ -183,12 +163,8 @@ class SimResourceSourceTest { every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } .throws(IllegalStateException()) - try { - assertThrows { - provider.consume(consumer) - } - } finally { - provider.close() + assertThrows { + provider.consume(consumer) } } @@ -203,12 +179,8 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) - try { - assertThrows { - provider.consume(consumer) - } - } finally { - provider.close() + assertThrows { + provider.consume(consumer) } } @@ -223,41 +195,16 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) - try { - assertThrows { - coroutineScope { - launch { provider.consume(consumer) } - provider.consume(consumer) - } - } - } finally { - provider.close() - } - } - - @Test - fun testClosedConsumption() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) - - val consumer = mockk(relaxUnitFun = true) - every { consumer.onNext(any()) } - .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) - - try { - assertThrows { - provider.close() + assertThrows { + coroutineScope { + launch { provider.consume(consumer) } provider.consume(consumer) } - } finally { - provider.close() } } @Test - fun testCloseDuringConsumption() = runBlockingSimulation { + fun testCancelDuringConsumption() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -267,15 +214,11 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Consume(1.0, 1.0)) .andThenThrows(IllegalStateException()) - try { - launch { provider.consume(consumer) } - delay(500) - provider.close() + launch { provider.consume(consumer) } + delay(500) + provider.cancel() - assertEquals(500, clock.millis()) - } finally { - provider.close() - } + assertEquals(500, clock.millis()) } @Test @@ -289,13 +232,9 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Idle(clock.millis() + 500)) .andThen(SimResourceCommand.Exit) - try { - provider.consume(consumer) + provider.consume(consumer) - assertEquals(500, clock.millis()) - } finally { - provider.close() - } + assertEquals(500, clock.millis()) } @Test @@ -311,11 +250,7 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Idle()) .andThenThrows(IllegalStateException()) - try { - provider.consume(consumer) - } finally { - provider.close() - } + provider.consume(consumer) } } } @@ -331,12 +266,8 @@ class SimResourceSourceTest { .returns(SimResourceCommand.Idle(2)) .andThen(SimResourceCommand.Exit) - try { - delay(10) + delay(10) - assertThrows { provider.consume(consumer) } - } finally { - provider.close() - } + assertThrows { provider.consume(consumer) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 810052b8..cf69b7b5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -40,15 +40,12 @@ import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceTransformerTest { @Test - fun testExitImmediately() = runBlockingSimulation { + fun testCancelImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - launch { - source.consume(forwarder) - source.close() - } + launch { source.consume(forwarder) } forwarder.consume(object : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand { @@ -57,18 +54,16 @@ internal class SimResourceTransformerTest { }) forwarder.close() + source.cancel() } @Test - fun testExit() = runBlockingSimulation { + fun testCancel() = runBlockingSimulation { val forwarder = SimResourceForwarder() val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - launch { - source.consume(forwarder) - source.close() - } + launch { source.consume(forwarder) } forwarder.consume(object : SimResourceConsumer { var isFirst = true @@ -84,6 +79,7 @@ internal class SimResourceTransformerTest { }) forwarder.close() + source.cancel() } @Test @@ -93,18 +89,18 @@ internal class SimResourceTransformerTest { override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit } - assertEquals(SimResourceState.Pending, forwarder.state) + assertFalse(forwarder.isActive) forwarder.startConsumer(consumer) - assertEquals(SimResourceState.Active, forwarder.state) + assertTrue(forwarder.isActive) assertThrows { forwarder.startConsumer(consumer) } forwarder.cancel() - assertEquals(SimResourceState.Pending, forwarder.state) + assertFalse(forwarder.isActive) forwarder.close() - assertEquals(SimResourceState.Stopped, forwarder.state) + assertFalse(forwarder.isActive) } @Test @@ -171,7 +167,7 @@ internal class SimResourceTransformerTest { forwarder.consume(consumer) yield() - assertEquals(SimResourceState.Pending, source.state) + assertFalse(forwarder.isActive) } @Test diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index db4fe856..42648cf1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -41,12 +41,8 @@ internal class SimWorkConsumerTest { val consumer = SimWorkConsumer(1.0, 1.0) - try { - provider.consume(consumer) - assertEquals(1000, clock.millis()) - } finally { - provider.close() - } + provider.consume(consumer) + assertEquals(1000, clock.millis()) } @Test @@ -56,11 +52,7 @@ internal class SimWorkConsumerTest { val consumer = SimWorkConsumer(1.0, 0.5) - try { - provider.consume(consumer) - assertEquals(2000, clock.millis()) - } finally { - provider.close() - } + provider.consume(consumer) + assertEquals(2000, clock.millis()) } } -- cgit v1.2.3 From 629a8520c7611f61a513458961a08ae7494158ab Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 20 Jun 2021 23:20:20 +0200 Subject: simulator: Optimize flag management of resource context This change optimizes the internal flag management used in the SimResourceContextImpl to use bitwise flags instead of enums. This approach simplifies the implementation immensely and reduces the number of branches. --- .../resources/impl/SimResourceContextImpl.kt | 45 +++++++--------------- 1 file changed, 13 insertions(+), 32 deletions(-) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 237a2a77..5c3f95e8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -93,7 +93,7 @@ internal class SimResourceContextImpl( /** * The update flag indicating why the update was triggered. */ - private var _flag: Flag = Flag.None + private var _flag: Int = 0 /** * The current pending update. @@ -123,7 +123,7 @@ internal class SimResourceContextImpl( return } - enableFlag(Flag.Interrupt) + _flag = _flag or FLAG_INTERRUPT scheduleUpdate() } @@ -132,7 +132,7 @@ internal class SimResourceContextImpl( return } - enableFlag(Flag.Invalidate) + _flag = _flag or FLAG_INVALIDATE scheduleUpdate() } @@ -149,7 +149,7 @@ internal class SimResourceContextImpl( */ fun requiresUpdate(timestamp: Long): Boolean { // Either the resource context is flagged or there is a pending update at this timestamp - return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp + return _flag != 0 || _pendingUpdate?.timestamp == timestamp } /** @@ -161,7 +161,7 @@ internal class SimResourceContextImpl( val newState = doUpdate(timestamp, oldState) _state = newState - _flag = Flag.None + _flag = 0 when (newState) { SimResourceState.Pending -> @@ -198,7 +198,7 @@ internal class SimResourceContextImpl( // Resource context is not active, so its state will not update SimResourceState.Pending, SimResourceState.Stopped -> state SimResourceState.Active -> { - val isInterrupted = _flag == Flag.Interrupt + val isInterrupted = _flag and FLAG_INTERRUPT != 0 val remainingWork = getRemainingWork(timestamp) val isConsume = _limit > 0.0 @@ -347,25 +347,6 @@ internal class SimResourceContextImpl( } } - /** - * Enable the specified [flag] taking into account precedence. - */ - private fun enableFlag(flag: Flag) { - _flag = when (_flag) { - Flag.None -> flag - Flag.Invalidate -> - when (flag) { - Flag.None -> flag - else -> flag - } - Flag.Interrupt -> - when (flag) { - Flag.None, Flag.Invalidate -> flag - else -> flag - } - } - } - /** * Schedule an update for this resource context. */ @@ -402,12 +383,12 @@ internal class SimResourceContextImpl( } /** - * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or - * interrupted. + * A flag to indicate that the context should be invalidated. */ - enum class Flag { - None, - Interrupt, - Invalidate - } + private val FLAG_INVALIDATE = 0b01 + + /** + * A flag to indicate that the context should be interrupted. + */ + private val FLAG_INTERRUPT = 0b10 } -- cgit v1.2.3 From 966715d7df139a431293f5c2fc67916fbcc1ecfb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 21 Jun 2021 11:36:25 +0200 Subject: simulator: Reduce allocations in interpreter hot path This change updates the resources module to reduce the number of object allocations in the interpreter's hot path. This in turn should reduce the GC pressure. --- .../resources/SimResourceAggregatorMaxMin.kt | 2 +- .../resources/SimResourceDistributorMaxMin.kt | 77 ++++++++++---------- .../resources/impl/SimResourceContextImpl.kt | 81 +++++++++++----------- 3 files changed, 79 insertions(+), 81 deletions(-) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index c39c1aca..991cda7a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -45,7 +45,7 @@ public class SimResourceAggregatorMaxMin( val command = if (grantedWork > 0.0 && grantedSpeed > 0.0) SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) else - SimResourceCommand.Idle(deadline) + SimResourceCommand.Idle() input.push(command) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index f7c5c5d7..d8fc8cb6 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -65,7 +65,7 @@ public class SimResourceDistributorMaxMin( /* SimResourceConsumer */ override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx.capacity) + return doNext(ctx) } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -94,12 +94,13 @@ public class SimResourceDistributorMaxMin( /** * Schedule the work of the outputs. */ - private fun doNext(capacity: Double): SimResourceCommand { + private fun doNext(ctx: SimResourceContext): SimResourceCommand { // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { return SimResourceCommand.Idle() } + val capacity = ctx.capacity var duration: Double = Double.MAX_VALUE var deadline: Long = Long.MAX_VALUE var availableSpeed = capacity @@ -112,7 +113,7 @@ public class SimResourceDistributorMaxMin( output.pull() // Remove outputs that have finished - if (output.isFinished) { + if (!output.isActive) { outputIterator.remove() } } @@ -125,33 +126,23 @@ public class SimResourceDistributorMaxMin( var remaining = activeOutputs.size for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- + val grantedSpeed = min(output.allowedSpeed, availableShare) + deadline = min(deadline, output.deadline) - when (val command = output.activeCommand) { - is SimResourceCommand.Idle -> { - deadline = min(deadline, command.deadline) - output.actualSpeed = 0.0 - } - is SimResourceCommand.Consume -> { - val grantedSpeed = min(output.allowedSpeed, availableShare) - deadline = min(deadline, command.deadline) - - // Ignore idle computation - if (grantedSpeed <= 0.0 || command.work <= 0.0) { - output.actualSpeed = 0.0 - continue - } + // Ignore idle computation + if (grantedSpeed <= 0.0 || output.work <= 0.0) { + output.actualSpeed = 0.0 + continue + } - totalRequestedSpeed += command.limit - totalRequestedWork += command.work + totalRequestedSpeed += output.limit + totalRequestedWork += output.work - output.actualSpeed = grantedSpeed - availableSpeed -= grantedSpeed + output.actualSpeed = grantedSpeed + availableSpeed -= grantedSpeed - // The duration that we want to run is that of the shortest request of an output - duration = min(duration, command.work / grantedSpeed) - } - SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } - } + // The duration that we want to run is that of the shortest request of an output + duration = min(duration, output.work / grantedSpeed) } assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } @@ -189,9 +180,19 @@ public class SimResourceDistributorMaxMin( private var isClosed: Boolean = false /** - * The current command that is processed by the resource. + * The current requested work. + */ + var work: Double = 0.0 + + /** + * The requested limit. */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() + var limit: Double = 0.0 + + /** + * The current deadline. + */ + var deadline: Long = Long.MAX_VALUE /** * The processing speed that is allowed by the model constraints. @@ -203,12 +204,6 @@ public class SimResourceDistributorMaxMin( */ var actualSpeed: Double = 0.0 - /** - * A flag to indicate that the output is finished. - */ - val isFinished - get() = activeCommand is SimResourceCommand.Exit - /** * The timestamp at which we received the last command. */ @@ -238,15 +233,19 @@ public class SimResourceDistributorMaxMin( /* SimResourceProviderLogic */ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { allowedSpeed = 0.0 - activeCommand = SimResourceCommand.Idle(deadline) + this.deadline = deadline + work = 0.0 + limit = 0.0 lastCommandTimestamp = ctx.clock.millis() return Long.MAX_VALUE } override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - allowedSpeed = ctx.speed - activeCommand = SimResourceCommand.Consume(work, limit, deadline) + allowedSpeed = min(ctx.capacity, limit) + this.work = work + this.limit = limit + this.deadline = deadline lastCommandTimestamp = ctx.clock.millis() return Long.MAX_VALUE @@ -257,7 +256,9 @@ public class SimResourceDistributorMaxMin( } override fun onFinish(ctx: SimResourceControllableContext) { - activeCommand = SimResourceCommand.Exit + work = 0.0 + limit = 0.0 + deadline = Long.MAX_VALUE lastCommandTimestamp = ctx.clock.millis() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 5c3f95e8..90c7bc75 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -212,11 +212,15 @@ internal class SimResourceContextImpl( // 2. The resource capacity cannot satisfy the demand. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) { - next(timestamp) + when (val command = consumer.onNext(this)) { + is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline) + is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline) + is SimResourceCommand.Exit -> interpretExit() + } } else if (isConsume) { - interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp) + interpretConsume(timestamp, remainingWork, _limit, _deadline) } else { - interpret(SimResourceCommand.Idle(_deadline), timestamp) + interpretIdle(timestamp, _deadline) } } } @@ -249,57 +253,50 @@ internal class SimResourceContextImpl( } /** - * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + * Interpret the [SimResourceCommand.Consume] command. */ - private fun interpret(command: SimResourceCommand, now: Long): SimResourceState { - return when (command) { - is SimResourceCommand.Idle -> { - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - _speed = 0.0 - _work = 0.0 - _limit = 0.0 - _deadline = deadline + private fun interpretConsume(now: Long, work: Double, limit: Double, deadline: Long): SimResourceState { + require(deadline >= now) { "Deadline already passed" } - val timestamp = logic.onIdle(this, deadline) - scheduleUpdate(timestamp) + _speed = min(capacity, limit) + _work = work + _limit = limit + _deadline = deadline - SimResourceState.Active - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline + val timestamp = logic.onConsume(this, work, limit, deadline) + scheduleUpdate(timestamp) - require(deadline >= now) { "Deadline already passed" } + return SimResourceState.Active + } - _speed = min(capacity, limit) - _work = work - _limit = limit - _deadline = deadline + /** + * Interpret the [SimResourceCommand.Idle] command. + */ + private fun interpretIdle(now: Long, deadline: Long): SimResourceState { + require(deadline >= now) { "Deadline already passed" } - val timestamp = logic.onConsume(this, work, limit, deadline) - scheduleUpdate(timestamp) + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = deadline - SimResourceState.Active - } - is SimResourceCommand.Exit -> { - _speed = 0.0 - _work = 0.0 - _limit = 0.0 - _deadline = Long.MAX_VALUE + val timestamp = logic.onIdle(this, deadline) + scheduleUpdate(timestamp) - SimResourceState.Stopped - } - } + return SimResourceState.Active } /** - * Request the workload for more work. + * Interpret the [SimResourceCommand.Exit] command. */ - private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now) + private fun interpretExit(): SimResourceState { + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = Long.MAX_VALUE + + return SimResourceState.Stopped + } private var _remainingWork: Double = 0.0 private var _remainingWorkFlush: Long = Long.MIN_VALUE -- cgit v1.2.3