From dd605ab1f70fef1fbbed848e8ebbd6b231622273 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 29 Sep 2021 22:20:48 +0200 Subject: refactor(simulator): Remove transform from SimResourceForwarder This change removes the ability to transform the duration of a pull from the SimResourceForwarder class. This ability is not used anymore, since pushes are now done using a method instead of a command. --- .../kotlin/org/opendc/simulator/power/SimPdu.kt | 2 +- .../kotlin/org/opendc/simulator/power/SimUps.kt | 2 +- .../simulator/resources/SimResourceForwarder.kt | 220 +++++++++++++++++++ .../resources/SimResourceSwitchExclusive.kt | 4 +- .../simulator/resources/SimResourceTransformer.kt | 238 --------------------- .../resources/SimResourceForwarderTest.kt | 220 +++++++++++++++++++ .../resources/SimResourceTransformerTest.kt | 234 -------------------- 7 files changed, 444 insertions(+), 476 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt delete mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt delete mode 100644 opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index 947f6cb2..1a12a52a 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -42,7 +42,7 @@ public class SimPdu( private val switch = SimResourceSwitchMaxMin(interpreter) /** - * The [SimResourceTransformer] that represents the input of the PDU. + * The [SimResourceForwarder] that represents the input of the PDU. */ private val forwarder = SimResourceForwarder() diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt index 79c1b37d..9c7400ed 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt @@ -86,7 +86,7 @@ public class SimUps( /** * A UPS inlet. */ - public inner class Inlet(private val forwarder: SimResourceTransformer) : SimPowerInlet(), AutoCloseable { + public inner class Inlet(private val forwarder: SimResourceForwarder) : SimPowerInlet(), AutoCloseable { override fun createConsumer(): SimResourceConsumer = forwarder /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt new file mode 100644 index 00000000..0cd2bfc7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.impl.SimResourceCountersImpl +import java.time.Clock + +/** + * A class that acts as a [SimResourceConsumer] and [SimResourceProvider] at the same time. + * + * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. + */ +public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimResourceConsumer, SimResourceProvider, AutoCloseable { + /** + * The delegate [SimResourceConsumer]. + */ + private var delegate: SimResourceConsumer? = null + + /** + * A flag to indicate that the delegate was started. + */ + private var hasDelegateStarted: Boolean = false + + /** + * The exposed [SimResourceContext]. + */ + private val _ctx = object : SimResourceContext { + override val clock: Clock + get() = _innerCtx!!.clock + + override val capacity: Double + get() = _innerCtx?.capacity ?: 0.0 + + override val demand: Double + get() = _innerCtx?.demand ?: 0.0 + + override val speed: Double + get() = _innerCtx?.speed ?: 0.0 + + override fun interrupt() { + _innerCtx?.interrupt() + } + + override fun push(rate: Double) { + _innerCtx?.push(rate) + _limit = rate + } + + override fun close() { + val delegate = checkNotNull(delegate) { "Delegate not active" } + + if (isCoupled) + _innerCtx?.close() + else + _innerCtx?.push(0.0) + + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onEvent(this, SimResourceEvent.Exit) + } + } + + /** + * The [SimResourceContext] in which the forwarder runs. + */ + private var _innerCtx: SimResourceContext? = null + + override val isActive: Boolean + get() = delegate != null + + override val capacity: Double + get() = _ctx.capacity + + override val speed: Double + get() = _ctx.speed + + override val demand: Double + get() = _ctx.demand + + override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + + override fun startConsumer(consumer: SimResourceConsumer) { + check(delegate == null) { "Resource transformer already active" } + + delegate = consumer + + // Interrupt the provider to replace the consumer + interrupt() + } + + override fun interrupt() { + _ctx.interrupt() + } + + override fun cancel() { + val delegate = delegate + val ctx = _innerCtx + + if (delegate != null) { + this.delegate = null + + if (ctx != null) { + delegate.onEvent(this._ctx, SimResourceEvent.Exit) + } + } + } + + override fun close() { + val ctx = _innerCtx + + if (ctx != null) { + this._innerCtx = null + ctx.interrupt() + } + } + + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + val delegate = delegate + + if (!hasDelegateStarted) { + start() + } + + updateCounters(ctx, delta) + + return delegate?.onNext(this._ctx, now, delta) ?: Long.MAX_VALUE + } + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + _innerCtx = ctx + } + SimResourceEvent.Exit -> { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onEvent(this._ctx, SimResourceEvent.Exit) + } + } + else -> delegate?.onEvent(this._ctx, event) + } + } + + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onFailure(this._ctx, cause) + } + } + + /** + * Start the delegate. + */ + private fun start() { + val delegate = delegate ?: return + delegate.onEvent(checkNotNull(_innerCtx), SimResourceEvent.Start) + + hasDelegateStarted = true + } + + /** + * Reset the delegate. + */ + private fun reset() { + delegate = null + hasDelegateStarted = false + } + + /** + * The requested speed. + */ + private var _limit: Double = 0.0 + + /** + * Update the resource counters for the transformer. + */ + private fun updateCounters(ctx: SimResourceContext, delta: Long) { + if (delta <= 0) { + return + } + + val counters = _counters + val deltaS = delta / 1000.0 + val work = _limit * deltaS + val actualWork = ctx.speed * deltaS + counters.demand += work + counters.actual += actualWork + counters.overcommit += (work - actualWork) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 2be8ccb0..f1e004d2 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -37,7 +37,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private val _inputs = mutableSetOf() override val inputs: Set get() = _inputs - private val _availableInputs = ArrayDeque() + private val _availableInputs = ArrayDeque() override val counters: SimResourceCounters = object : SimResourceCounters { override val demand: Double @@ -114,7 +114,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { /** * An output of the resource switch. */ - private inner class Output(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { + private inner class Output(private val forwarder: SimResourceForwarder) : SimResourceProvider by forwarder { /** * Close the output. */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt deleted file mode 100644 index 76a3bdd7..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.impl.SimResourceCountersImpl -import java.time.Clock - -/** - * A [SimResourceConsumer] and [SimResourceProvider] that transforms the resource commands emitted by the resource - * commands to the resource provider. - * - * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. - * @param transform The function to transform the received resource command. - */ -public class SimResourceTransformer( - private val isCoupled: Boolean = false, - private val transform: (SimResourceContext, Long) -> Long -) : SimResourceConsumer, SimResourceProvider, AutoCloseable { - /** - * The delegate [SimResourceConsumer]. - */ - private var delegate: SimResourceConsumer? = null - - /** - * A flag to indicate that the delegate was started. - */ - private var hasDelegateStarted: Boolean = false - - /** - * The exposed [SimResourceContext]. - */ - private val ctx = object : SimResourceContext { - override val clock: Clock - get() = _ctx!!.clock - - override val capacity: Double - get() = _ctx?.capacity ?: 0.0 - - override val demand: Double - get() = _ctx?.demand ?: 0.0 - - override val speed: Double - get() = _ctx?.speed ?: 0.0 - - override fun interrupt() { - _ctx?.interrupt() - } - - override fun push(rate: Double) { - _ctx?.push(rate) - _limit = rate - } - - override fun close() { - val delegate = checkNotNull(delegate) { "Delegate not active" } - - if (isCoupled) - _ctx?.close() - else - _ctx?.push(0.0) - - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - - delegate.onEvent(this, SimResourceEvent.Exit) - } - } - - /** - * The [SimResourceContext] in which the forwarder runs. - */ - private var _ctx: SimResourceContext? = null - - override val isActive: Boolean - get() = delegate != null - - override val capacity: Double - get() = ctx.capacity - - override val speed: Double - get() = ctx.speed - - override val demand: Double - get() = ctx.demand - - override val counters: SimResourceCounters - get() = _counters - private val _counters = SimResourceCountersImpl() - - override fun startConsumer(consumer: SimResourceConsumer) { - check(delegate == null) { "Resource transformer already active" } - - delegate = consumer - - // Interrupt the provider to replace the consumer - interrupt() - } - - override fun interrupt() { - ctx.interrupt() - } - - override fun cancel() { - val delegate = delegate - val ctx = _ctx - - if (delegate != null) { - this.delegate = null - - if (ctx != null) { - delegate.onEvent(this.ctx, SimResourceEvent.Exit) - } - } - } - - override fun close() { - val ctx = _ctx - - if (ctx != null) { - this._ctx = null - ctx.interrupt() - } - } - - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val delegate = delegate - - if (!hasDelegateStarted) { - start() - } - - updateCounters(ctx, delta) - - return if (delegate != null) { - transform(ctx, delegate.onNext(this.ctx, now, delta)) - } else { - Long.MAX_VALUE - } - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - _ctx = ctx - } - SimResourceEvent.Exit -> { - _ctx = null - - val delegate = delegate - if (delegate != null) { - reset() - delegate.onEvent(this.ctx, SimResourceEvent.Exit) - } - } - else -> delegate?.onEvent(this.ctx, event) - } - } - - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { - _ctx = null - - val delegate = delegate - if (delegate != null) { - reset() - delegate.onFailure(this.ctx, cause) - } - } - - /** - * Start the delegate. - */ - private fun start() { - val delegate = delegate ?: return - delegate.onEvent(checkNotNull(_ctx), SimResourceEvent.Start) - - hasDelegateStarted = true - } - - /** - * Reset the delegate. - */ - private fun reset() { - delegate = null - hasDelegateStarted = false - } - - /** - * The requested speed. - */ - private var _limit: Double = 0.0 - - /** - * Update the resource counters for the transformer. - */ - private fun updateCounters(ctx: SimResourceContext, delta: Long) { - if (delta <= 0) { - return - } - - val counters = _counters - val deltaS = delta / 1000.0 - val work = _limit * deltaS - val actualWork = ctx.speed * deltaS - counters.demand += work - counters.actual += actualWork - counters.overcommit += (work - actualWork) - } -} - -/** - * Constructs a [SimResourceTransformer] that forwards the received resource command with an identity transform. - * - * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. - */ -public fun SimResourceForwarder(isCoupled: Boolean = false): SimResourceTransformer { - return SimResourceTransformer(isCoupled) { _, command -> command } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt new file mode 100644 index 00000000..49e60f68 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl + +/** + * A test suite for the [SimResourceForwarder] class. + */ +internal class SimResourceForwarderTest { + @Test + fun testCancelImmediately() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) + + launch { source.consume(forwarder) } + + forwarder.consume(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + }) + + forwarder.close() + source.cancel() + } + + @Test + fun testCancel() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) + + launch { source.consume(forwarder) } + + forwarder.consume(object : SimResourceConsumer { + var isFirst = true + + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + ctx.push(1.0) + 10 * 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + }) + + forwarder.close() + source.cancel() + } + + @Test + fun testState() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + } + + assertFalse(forwarder.isActive) + + forwarder.startConsumer(consumer) + assertTrue(forwarder.isActive) + + assertThrows { forwarder.startConsumer(consumer) } + + forwarder.cancel() + assertFalse(forwarder.isActive) + + forwarder.close() + assertFalse(forwarder.isActive) + } + + @Test + fun testCancelPendingDelegate() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + }) + + forwarder.startConsumer(consumer) + forwarder.cancel() + + verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) } + } + + @Test + fun testCancelStartedDelegate() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) + + val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + forwarder.cancel() + + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } + } + + @Test + fun testCancelPropagation() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) + + val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + source.cancel() + + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } + } + + @Test + fun testExitPropagation() = runBlockingSimulation { + val forwarder = SimResourceForwarder(isCoupled = true) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) + + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + } + + source.startConsumer(forwarder) + forwarder.consume(consumer) + yield() + + assertFalse(forwarder.isActive) + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) + + val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } + } + + @Test + fun testCounters() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) + + val consumer = SimWorkConsumer(2.0, 1.0) + source.startConsumer(forwarder) + + forwarder.consume(consumer) + + yield() + + assertEquals(2.0, source.counters.actual) + assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } + assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } + assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(2000, clock.millis()) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt deleted file mode 100644 index d7d0924f..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.* -import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl - -/** - * A test suite for the [SimResourceTransformer] class. - */ -internal class SimResourceTransformerTest { - @Test - fun testCancelImmediately() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) - - launch { source.consume(forwarder) } - - forwarder.consume(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() - return Long.MAX_VALUE - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testCancel() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) - - launch { source.consume(forwarder) } - - forwarder.consume(object : SimResourceConsumer { - var isFirst = true - - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (isFirst) { - isFirst = false - ctx.push(1.0) - 10 * 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testState() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() - return Long.MAX_VALUE - } - } - - assertFalse(forwarder.isActive) - - forwarder.startConsumer(consumer) - assertTrue(forwarder.isActive) - - assertThrows { forwarder.startConsumer(consumer) } - - forwarder.cancel() - assertFalse(forwarder.isActive) - - forwarder.close() - assertFalse(forwarder.isActive) - } - - @Test - fun testCancelPendingDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() - return Long.MAX_VALUE - } - }) - - forwarder.startConsumer(consumer) - forwarder.cancel() - - verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) } - } - - @Test - fun testCancelStartedDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) - - val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - forwarder.cancel() - - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } - } - - @Test - fun testCancelPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) - - val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - source.cancel() - - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } - } - - @Test - fun testExitPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) - - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() - return Long.MAX_VALUE - } - } - - source.startConsumer(forwarder) - forwarder.consume(consumer) - yield() - - assertFalse(forwarder.isActive) - } - - @Test - fun testAdjustCapacity() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(1.0, scheduler) - - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - source.startConsumer(forwarder) - - coroutineScope { - launch { forwarder.consume(consumer) } - delay(1000) - source.capacity = 0.5 - } - - assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } - } - - @Test - fun testTransformExit() = runBlockingSimulation { - val forwarder = SimResourceTransformer { ctx, _ -> ctx.close(); Long.MAX_VALUE } - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(1.0, scheduler) - - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) - source.startConsumer(forwarder) - forwarder.consume(consumer) - - assertEquals(0, clock.millis()) - verify(exactly = 1) { consumer.onNext(any(), any(), any()) } - } - - @Test - fun testCounters() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(1.0, scheduler) - - val consumer = SimWorkConsumer(2.0, 1.0) - source.startConsumer(forwarder) - - forwarder.consume(consumer) - - yield() - - assertEquals(2.0, source.counters.actual) - assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } - assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } - assertEquals(2000, clock.millis()) - } -} -- cgit v1.2.3