From 4e9f72b50473d73f9ca9e30a7fbeb97a8a1c0555 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 7 Apr 2021 16:26:00 +0200 Subject: simulator: Move away from StateFlow for low-level monitoring This change removes the StateFlow speed property on the SimResourceSource, as the overhead of emitting changes to the StateFlow is too high in a single-thread context. Our new approach is to use direct callbacks and counters. --- .../resources/SimResourceAggregatorMaxMinTest.kt | 17 +++++++++-------- .../opendc/simulator/resources/SimResourceSourceTest.kt | 12 +++++------- .../resources/SimResourceSwitchExclusiveTest.kt | 11 +++++------ 3 files changed, 19 insertions(+), 21 deletions(-) (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/test') diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index de864c1c..bf8c6d1f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -26,12 +26,12 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -47,15 +47,18 @@ internal class SimResourceAggregatorMaxMinTest { val scheduler = TimerScheduler(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) + val forwarder = SimResourceForwarder() val sources = listOf( - SimResourceSource(1.0, clock, scheduler), + forwarder, SimResourceSource(1.0, clock, scheduler) ) sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) val usage = mutableListOf() - val job = launch { sources[0].speed.toList(usage) } + val source = SimResourceSource(1.0, clock, scheduler) + val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) + source.startConsumer(adapter) try { aggregator.output.consume(consumer) @@ -67,7 +70,6 @@ internal class SimResourceAggregatorMaxMinTest { ) } finally { aggregator.output.close() - job.cancel() } } @@ -85,18 +87,17 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(2.0, 1.0) val usage = mutableListOf() - val job = launch { sources[0].speed.toList(usage) } + val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(consumer) + aggregator.output.consume(adapter) yield() assertAll( { assertEquals(1000, currentTime) }, - { assertEquals(listOf(0.0, 1.0, 0.0), usage) } + { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { aggregator.output.close() - job.cancel() } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 58e19421..dbba6160 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -27,10 +27,10 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -54,11 +54,10 @@ class SimResourceSourceTest { try { val res = mutableListOf() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() @@ -102,11 +101,10 @@ class SimResourceSourceTest { try { val res = mutableListOf() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index edd60502..9a40edc4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -65,17 +64,17 @@ internal class SimResourceSwitchExclusiveTest { val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, clock, scheduler) - - switch.addInput(source) + val forwarder = SimResourceForwarder() + val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addInput(forwarder) val provider = switch.addOutput(3200.0) - val job = launch { source.speed.toList(speed) } try { provider.consume(workload) yield() } finally { - job.cancel() provider.close() } -- cgit v1.2.3