diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-28 11:58:19 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:37 +0200 |
| commit | 02fa44c0b116ff51c4cbe2876d8b2a225ed68553 (patch) | |
| tree | 38562ef2e6f3cde4e46ad5eb32d7573cebaabef6 /opendc-simulator/opendc-simulator-resources/src/test | |
| parent | d575bed5418be222e1d3ad39af862e2390596d61 (diff) | |
refactor(simulator): Add support for pushing flow from context
This change adds a new method to `SimResourceContext` called `push`
which allows users to change the requested flow rate directly without
having to interrupt the consumer.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src/test')
7 files changed, 161 insertions, 178 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index a9390553..f4ea5fe8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -22,14 +22,12 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk +import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer @@ -100,10 +98,17 @@ internal class SimResourceAggregatorMaxMinTest { ) sources.forEach(aggregator::addInput) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(4.0, 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(4.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + }) aggregator.consume(consumer) yield() @@ -113,27 +118,6 @@ internal class SimResourceAggregatorMaxMinTest { } @Test - fun testException() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - - val aggregator = SimResourceAggregatorMaxMin(scheduler) - val sources = listOf( - SimResourceSource(1.0, scheduler), - SimResourceSource(1.0, scheduler) - ) - sources.forEach(aggregator::addInput) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0, duration = 1000)) - .andThenThrows(IllegalStateException("Test Exception")) - - assertThrows<IllegalStateException> { aggregator.consume(consumer) } - yield() - assertFalse(sources[0].isActive) - } - - @Test fun testAdjustCapacity() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) @@ -186,10 +170,17 @@ internal class SimResourceAggregatorMaxMinTest { ) sources.forEach(aggregator::addInput) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(4.0, 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(4.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } aggregator.consume(consumer) yield() diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt deleted file mode 100644 index 9a52dc63..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows - -/** - * Test suite for [SimResourceCommand]. - */ -class SimResourceCommandTest { - @Test - fun testNegativeLimit() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(-1.0, 1) - } - } - - @Test - fun testNegativeDuration() { - assertThrows<IllegalArgumentException> { - SimResourceCommand.Consume(1.0, -1) - } - } - - @Test - fun testConsumeCorrect() { - assertDoesNotThrow { - SimResourceCommand.Consume(1.0) - } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 0cb95abb..4e57f598 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -36,8 +36,17 @@ class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(1.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) @@ -48,14 +57,23 @@ class SimResourceContextTest { @Test fun testIntermediateFlush() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(4.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = spyk(object : SimResourceProviderLogic { override fun onFinish(ctx: SimResourceControllableContext) {} override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration }) - val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path @@ -67,11 +85,20 @@ class SimResourceContextTest { @Test fun testIntermediateFlushIdle() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(0.0) + 10 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = spyk(object : SimResourceProviderLogic {}) - val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() delay(5) @@ -88,8 +115,17 @@ class SimResourceContextTest { @Test fun testDoubleStart() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) andThen SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(0.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + } val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) @@ -104,8 +140,17 @@ class SimResourceContextTest { @Test fun testIdempotentCapacityChange() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (now == 0L) { + ctx.push(1.0) + 1000 + } else { + ctx.close() + Long.MAX_VALUE + } + } + }) val logic = object : SimResourceProviderLogic {} @@ -120,12 +165,23 @@ class SimResourceContextTest { @Test fun testFailureNoInfiniteLoop() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit - every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") - every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") - val logic = spyk(object : SimResourceProviderLogic {}) + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + if (event == SimResourceEvent.Exit) throw IllegalStateException("onEvent") + } + + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + throw IllegalStateException("onFailure") + } + }) + + val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index c310fad6..e055daf7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -44,10 +44,7 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(capacity, duration = 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = SimWorkConsumer(4200.0, 1.0) val res = mutableListOf<Double>() val adapter = SimSpeedConsumerAdapter(consumer, res::add) @@ -79,10 +76,7 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(2 * capacity, duration = 1000)) - .andThen(SimResourceCommand.Exit) + val consumer = SimWorkConsumer(capacity, 2.0) val res = mutableListOf<Double>() val adapter = SimSpeedConsumerAdapter(consumer, res::add) @@ -103,8 +97,9 @@ internal class SimResourceSourceTest { val provider = SimResourceSource(capacity, scheduler) val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -132,12 +127,14 @@ internal class SimResourceSourceTest { } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(1.0, duration = 4000) + ctx.push(1.0) + 4000 } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } } @@ -172,10 +169,19 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0, duration = 1000)) - .andThenThrows(IllegalStateException()) + val consumer = object : SimResourceConsumer { + var isFirst = true + + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + ctx.push(1.0) + 1000 + } else { + throw IllegalStateException() + } + } + } assertThrows<IllegalStateException> { provider.consume(consumer) @@ -188,10 +194,7 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0)) - .andThenThrows(IllegalStateException()) + val consumer = SimWorkConsumer(capacity, 1.0) assertThrows<IllegalStateException> { coroutineScope { @@ -207,30 +210,13 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(1.0)) - .andThenThrows(IllegalStateException()) + val consumer = SimWorkConsumer(capacity, 1.0) launch { provider.consume(consumer) } delay(500) provider.cancel() - assertEquals(500, clock.millis()) - } - - @Test - fun testIdle() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(0.0, 500)) - .andThen(SimResourceCommand.Exit) - - provider.consume(consumer) + yield() assertEquals(500, clock.millis()) } @@ -243,10 +229,9 @@ internal class SimResourceSourceTest { val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } - .returns(SimResourceCommand.Consume(0.0)) - .andThenThrows(IllegalStateException()) + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE + } provider.consume(consumer) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index ad3b0f9f..9f86dc0d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -32,6 +30,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** @@ -88,8 +87,7 @@ internal class SimResourceSwitchExclusiveTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = duration) andThen SimResourceCommand.Exit + val workload = SimWorkConsumer(duration * 3.2, 1.0) val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, scheduler) @@ -125,12 +123,14 @@ internal class SimResourceSwitchExclusiveTest { } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(1.0, duration = duration) + ctx.push(1.0) + duration } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } } @@ -159,9 +159,6 @@ internal class SimResourceSwitchExclusiveTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { workload.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0) andThen SimResourceCommand.Exit - val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, scheduler) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index d8f18e65..ba0d66ff 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.yield @@ -31,6 +29,7 @@ import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** @@ -46,9 +45,7 @@ internal class SimResourceSwitchMaxMinTest { sources.forEach { switch.addInput(it) } val provider = switch.newOutput() - - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(1.0, duration = 1000) andThen SimResourceCommand.Exit + val consumer = SimWorkConsumer(2000.0, 1.0) try { provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 3780fd60..fc43c3da 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import io.mockk.every -import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* @@ -47,8 +45,9 @@ internal class SimResourceTransformerTest { launch { source.consume(forwarder) } forwarder.consume(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { - return SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE } }) @@ -67,12 +66,14 @@ internal class SimResourceTransformerTest { forwarder.consume(object : SimResourceConsumer { var isFirst = true - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - SimResourceCommand.Consume(1.0, duration = 10 * 1000L) + ctx.push(1.0) + 10 * 1000 } else { - SimResourceCommand.Exit + ctx.close() + Long.MAX_VALUE } } }) @@ -85,7 +86,10 @@ internal class SimResourceTransformerTest { fun testState() = runBlockingSimulation { val forwarder = SimResourceForwarder() val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand = SimResourceCommand.Exit + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } } assertFalse(forwarder.isActive) @@ -106,8 +110,12 @@ internal class SimResourceTransformerTest { fun testCancelPendingDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit + val consumer = spyk(object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + }) forwarder.startConsumer(consumer) forwarder.cancel() @@ -121,8 +129,7 @@ internal class SimResourceTransformerTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) + val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -140,8 +147,7 @@ internal class SimResourceTransformerTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Consume(0.0, 10) + val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -159,8 +165,12 @@ internal class SimResourceTransformerTest { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onNext(any(), any(), any()) } returns SimResourceCommand.Exit + val consumer = object : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + ctx.close() + return Long.MAX_VALUE + } + } source.startConsumer(forwarder) forwarder.consume(consumer) @@ -190,7 +200,7 @@ internal class SimResourceTransformerTest { @Test fun testTransformExit() = runBlockingSimulation { - val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } + val forwarder = SimResourceTransformer { ctx, _ -> ctx.close(); Long.MAX_VALUE } val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) |
