summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src/test')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt202
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt74
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt104
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt351
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt173
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt193
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt210
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt66
8 files changed, 1373 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
new file mode 100644
index 00000000..e272abb8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceAggregatorMaxMin] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceAggregatorMaxMinTest {
+ @Test
+ fun testSingleCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val forwarder = SimResourceForwarder()
+ val sources = listOf(
+ forwarder,
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ val usage = mutableListOf<Double>()
+ val source = SimResourceSource(1.0, clock, scheduler)
+ val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
+ source.startConsumer(adapter)
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 0.5, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testDoubleCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ val usage = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
+
+ try {
+ aggregator.output.consume(adapter)
+ yield()
+ assertAll(
+ { assertEquals(1000, clock.millis()) },
+ { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
+ )
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testOvercommit() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.output.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+
+ verify(exactly = 2) { consumer.onNext(any()) }
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testException() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ yield()
+ assertEquals(SimResourceState.Pending, sources[0].state)
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(4.0, 1.0)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(1000)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(2334, clock.millis())
+ } finally {
+ aggregator.output.close()
+ }
+ }
+
+ @Test
+ fun testFailOverCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(clock)
+ val sources = listOf(
+ SimResourceSource(1.0, clock, scheduler),
+ SimResourceSource(1.0, clock, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+ try {
+ coroutineScope {
+ launch { aggregator.output.consume(consumer) }
+ delay(500)
+ sources[0].capacity = 0.5
+ }
+ yield()
+ assertEquals(1000, clock.millis())
+ } finally {
+ aggregator.output.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
new file mode 100644
index 00000000..02d456ff
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceCommandTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for [SimResourceCommand].
+ */
+class SimResourceCommandTest {
+ @Test
+ fun testZeroWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(0.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testNegativeWork() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(-1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testZeroLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, 0.0)
+ }
+ }
+
+ @Test
+ fun testNegativeLimit() {
+ assertThrows<IllegalArgumentException> {
+ SimResourceCommand.Consume(1.0, -1.0, 1)
+ }
+ }
+
+ @Test
+ fun testConsumeCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Consume(1.0, 1.0)
+ }
+ }
+
+ @Test
+ fun testIdleCorrect() {
+ assertDoesNotThrow {
+ SimResourceCommand.Idle(1)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
new file mode 100644
index 00000000..be909556
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+
+/**
+ * A test suite for the [SimAbstractResourceContext] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ }
+
+ context.flush()
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ context.flush(isIntermediate = true)
+
+ verify(exactly = 2) { context.onConsume(any(), any(), any()) }
+ }
+
+ @Test
+ fun testIntermediateFlushIdle() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ })
+
+ context.start()
+ delay(5)
+ context.flush(isIntermediate = true)
+ delay(5)
+ context.flush(isIntermediate = true)
+
+ assertAll(
+ { verify(exactly = 2) { context.onIdle(any()) } },
+ { verify(exactly = 1) { context.onFinish(null) } }
+ )
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onFinish(cause: Throwable?) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ }
+
+ context.start()
+ assertThrows<IllegalStateException> { context.start() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
new file mode 100644
index 00000000..39f74481
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -0,0 +1,351 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceSource] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class SimResourceSourceTest {
+ @Test
+ fun testSpeed() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, capacity))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+
+ try {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
+ }
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1000 * capacity, 2 * capacity))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ val res = mutableListOf<Double>()
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or
+ * [SimResourceConsumer.onNext].
+ */
+ @Test
+ fun testIntermediateInterrupt() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = object : SimResourceConsumer {
+ override fun onStart(ctx: SimResourceContext) {
+ ctx.interrupt()
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ }
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+ lateinit var resCtx: SimResourceContext
+
+ val consumer = object : SimResourceConsumer {
+ var isFirst = true
+ override fun onStart(ctx: SimResourceContext) {
+ resCtx = ctx
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ assertEquals(0.0, ctx.remainingWork)
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(4.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ }
+
+ try {
+ launch {
+ yield()
+ resCtx.interrupt()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onStart(any()) }
+ .throws(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testClosedConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ assertThrows<IllegalStateException> {
+ provider.close()
+ provider.consume(consumer)
+ }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testCloseDuringConsumption() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(1.0, 1.0))
+ .andThenThrows(IllegalStateException())
+
+ try {
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.close()
+
+ assertEquals(500, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testIdle() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(clock.millis() + 500))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ provider.consume(consumer)
+
+ assertEquals(500, clock.millis())
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ assertThrows<IllegalStateException> {
+ runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle())
+ .andThenThrows(IllegalStateException())
+
+ try {
+ provider.consume(consumer)
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+ }
+ }
+
+ @Test
+ fun testIncorrectDeadline() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = SimResourceSource(capacity, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Idle(2))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ delay(10)
+
+ assertThrows<IllegalArgumentException> { provider.consume(consumer) }
+ } finally {
+ scheduler.close()
+ provider.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
new file mode 100644
index 00000000..f7d17867
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitchExclusive] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchExclusiveTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+ val forwarder = SimResourceForwarder()
+ val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ switch.addInput(forwarder)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onStart(ctx: SimResourceContext) {
+ isFirst = true
+ }
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(duration / 1000.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ }
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ } finally {
+ provider.close()
+ }
+ assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
+
+ val switch = SimResourceSwitchExclusive()
+ val source = SimResourceSource(3200.0, clock, scheduler)
+
+ switch.addInput(source)
+
+ switch.addOutput(3200.0)
+ assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
new file mode 100644
index 00000000..7416f277
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -0,0 +1,193 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * Test suite for the [SimResourceSwitch] implementations
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceSwitchMaxMinTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin(clock)
+
+ val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) }
+ sources.forEach { switch.addInput(it) }
+
+ val provider = switch.addOutput(1000.0)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workload =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val provider = switch.addOutput(3200.0)
+
+ try {
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+
+ assertAll(
+ { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+
+ val listener = object : SimResourceSwitchMaxMin.Listener {
+ var totalRequestedWork = 0L
+ var totalGrantedWork = 0L
+ var totalOvercommittedWork = 0L
+
+ override fun onSliceFinish(
+ switch: SimResourceSwitchMaxMin,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ totalRequestedWork += requestedWork
+ totalGrantedWork += grantedWork
+ totalOvercommittedWork += overcommittedWork
+ }
+ }
+
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3500.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ SimTraceConsumer(
+ sequenceOf(
+ SimTraceConsumer.Fragment(duration * 1000, 28.0),
+ SimTraceConsumer.Fragment(duration * 1000, 3100.0),
+ SimTraceConsumer.Fragment(duration * 1000, 0.0),
+ SimTraceConsumer.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = SimResourceSwitchMaxMin(clock, listener)
+ val providerA = switch.addOutput(3200.0)
+ val providerB = switch.addOutput(3200.0)
+
+ try {
+ switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.close()
+ scheduler.close()
+ }
+ assertAll(
+ { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
new file mode 100644
index 00000000..d2ad73bc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimResourceTransformer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimResourceTransformerTest {
+ @Test
+ fun testExitImmediately() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return SimResourceCommand.Exit
+ }
+ })
+
+ forwarder.close()
+ scheduler.close()
+ }
+
+ @Test
+ fun testExit() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ launch {
+ source.consume(forwarder)
+ source.close()
+ }
+
+ forwarder.consume(object : SimResourceConsumer {
+ var isFirst = true
+
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return if (isFirst) {
+ isFirst = false
+ SimResourceCommand.Consume(10.0, 1.0)
+ } else {
+ SimResourceCommand.Exit
+ }
+ }
+ })
+
+ forwarder.close()
+ }
+
+ @Test
+ fun testState() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val consumer = object : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit
+ }
+
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.startConsumer(consumer)
+ assertEquals(SimResourceState.Active, forwarder.state)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertEquals(SimResourceState.Pending, forwarder.state)
+
+ forwarder.close()
+ assertEquals(SimResourceState.Stopped, forwarder.state)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any()) }
+ verify(exactly = 1) { consumer.onFinish(any(), null) }
+ }
+
+ @Test
+ fun testExitPropagation() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder(isCoupled = true)
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, clock, scheduler)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+ yield()
+
+ assertEquals(SimResourceState.Pending, source.state)
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(consumer) }
+ delay(1000)
+ source.capacity = 0.5
+ }
+
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ }
+
+ @Test
+ fun testTransformExit() = runBlockingSimulation {
+ val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val source = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ verify(exactly = 1) { consumer.onNext(any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
new file mode 100644
index 00000000..bf58b1b6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.utils.TimerScheduler
+
+/**
+ * A test suite for the [SimWorkConsumer] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimWorkConsumerTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(1000, clock.millis())
+ } finally {
+ provider.close()
+ }
+ }
+
+ @Test
+ fun testUtilization() = runBlockingSimulation {
+ val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, clock, scheduler)
+
+ val consumer = SimWorkConsumer(1.0, 0.5)
+
+ try {
+ provider.consume(consumer)
+ assertEquals(2000, clock.millis())
+ } finally {
+ provider.close()
+ }
+ }
+}