diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-07 16:26:00 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-07 20:38:46 +0200 |
| commit | 4e9f72b50473d73f9ca9e30a7fbeb97a8a1c0555 (patch) | |
| tree | 6d4855c6a93cfc41064e73b169e2f39d5530a5ae /simulator/opendc-simulator | |
| parent | 95a0ed6911f136fb25bb76d6b6e010bf66b8ba5b (diff) | |
simulator: Move away from StateFlow for low-level monitoring
This change removes the StateFlow speed property on the
SimResourceSource, as the overhead of emitting changes to the StateFlow
is too high in a single-thread context. Our new approach is to use
direct callbacks and counters.
Diffstat (limited to 'simulator/opendc-simulator')
16 files changed, 117 insertions, 84 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 2127b066..1f26c9c9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource import org.opendc.simulator.resources.consume +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -47,9 +46,9 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * The speed of the CPU cores. */ - public val speed: List<Double> + public val speed: DoubleArray get() = _speed - private var _speed = mutableListOf<Double>() + private var _speed = doubleArrayOf() /** * A flag to indicate that the machine is terminated. @@ -94,29 +93,32 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine val ctx = Context(resources, meta) val totalCapacity = model.cpus.sumByDouble { it.frequency } - _speed = MutableList(model.cpus.size) { 0.0 } + _speed = DoubleArray(model.cpus.size) { 0.0 } + var totalSpeed = 0.0 workload.onStart(ctx) for ((cpu, source) in resources) { val consumer = workload.getConsumer(ctx, cpu) - val job = source.speed - .onEach { - _speed[cpu.id] = it - _usage.value = _speed.sum() / totalCapacity - } - .launchIn(this) - - launch { - try { - source.consume(consumer) - } finally { - job.cancel() - } + val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed -> + val oldSpeed = _speed[cpu.id] + _speed[cpu.id] = newSpeed + totalSpeed = totalSpeed - oldSpeed + newSpeed + + updateUsage(totalSpeed / totalCapacity) } + + launch { source.consume(adapter) } } } + /** + * This method is invoked when the usage of the machine is updated. + */ + protected open fun updateUsage(usage: Double) { + _usage.value = usage + } + override fun close() { if (!isTerminated) { isTerminated = true diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 51b807d2..d5577279 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -84,12 +84,15 @@ public class SimBareMetalMachine( /** * The power draw of the machine. */ - public val powerDraw: StateFlow<Double> = usage - .map { - this.scalingGovernors.forEach { it.onLimit() } - this.scalingDriver.computePower() - } - .stateIn(scope, SharingStarted.Eagerly, 0.0) + public var powerDraw: Double = 0.0 + private set + + override fun updateUsage(usage: Double) { + super.updateUsage(usage) + + scalingGovernors.forEach { it.onLimit() } + powerDraw = scalingDriver.computePower() + } override fun close() { super.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt index b4bbf9fb..4d62c383 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt @@ -28,7 +28,7 @@ package org.opendc.simulator.compute.cpufreq public class DemandScalingGovernor : ScalingGovernor { override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { override fun onLimit() { - ctx.setTarget(ctx.resource.speed.value) + ctx.setTarget(ctx.resource.speed) } override fun toString(): String = "DemandScalingGovernor.Logic" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt index d109e4d8..1c82253c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt @@ -59,7 +59,7 @@ public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDrive for (ctx in contexts) { targetFreq = max(ctx.target, targetFreq) - totalSpeed += ctx.resource.speed.value + totalSpeed += ctx.resource.speed } val maxFreq = states.lastKey() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt index 19c06126..c02b6285 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt @@ -35,7 +35,7 @@ internal class DemandScalingGovernorTest { fun testSetDemandLimit() { val ctx = mockk<ScalingContext>(relaxUnitFun = true) - every { ctx.resource.speed.value } returns 2100.0 + every { ctx.resource.speed } returns 2100.0 val logic = DemandScalingGovernor().createLogic(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt index 5c30bc1f..c6f233a6 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt @@ -59,7 +59,7 @@ internal class PStateScalingDriverTest { val resource = mockk<SimResourceSource>() every { cpu.frequency } returns 4100.0 - every { resource.speed.value } returns 1200.0 + every { resource.speed } returns 1200.0 val driver = PStateScalingDriver( sortedMapOf( @@ -84,7 +84,7 @@ internal class PStateScalingDriverTest { val resource = mockk<SimResourceSource>() every { cpu.frequency } returns 4100.0 - every { resource.speed.value } returns 1200.0 + every { resource.speed } returns 1200.0 val driver = PStateScalingDriver( sortedMapOf( @@ -125,11 +125,11 @@ internal class PStateScalingDriverTest { val scalingContext = logic.createContext(cpu, resource) - every { resource.speed.value } returns 1400.0 + every { resource.speed } returns 1400.0 scalingContext.setTarget(1400.0) assertEquals(150.0, logic.computePower()) - every { resource.speed.value } returns 1400.0 + every { resource.speed } returns 1400.0 scalingContext.setTarget(4000.0) assertEquals(235.0, logic.computePower()) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index e5991264..c7fa6a17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -102,7 +102,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double - get() = inputContexts.sumByDouble { it.remainingWork } + get() { + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE override fun interrupt() { super.interrupt() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 5c5ee038..05ed0714 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -272,6 +272,7 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = 0.0 + consumer.onConfirm(this, 0.0) onIdle(deadline) } @@ -283,6 +284,7 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = min(capacity, limit) + consumer.onConfirm(this, speed) onConsume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 672a3e9d..38672b13 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A [SimResourceConsumer] characterizes how a [SimResource] is consumed. + * A [SimResourceConsumer] characterizes how a resource is consumed. * * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) * for multiple resource providers, unless explicitly said otherwise. @@ -46,6 +46,14 @@ public interface SimResourceConsumer { public fun onNext(ctx: SimResourceContext): SimResourceCommand /** + * This method is invoked when the resource provider confirms that the consumer is running at the given speed. + * + * @param ctx The execution context in which the consumer runs. + * @param speed The speed at which the consumer runs. + */ + public fun onConfirm(ctx: SimResourceContext, speed: Double) {} + + /** * This is method is invoked when the capacity of the resource changes. * * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 9df333e3..dfdd2c2e 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -106,7 +106,7 @@ public class SimResourceDistributorMaxMin( val output = iterator.next() // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call tou output.close() + // during the call to output.close() iterator.remove() output.close() @@ -251,8 +251,8 @@ public class SimResourceDistributorMaxMin( totalAllocatedWork - totalRemainingWork, totalOvercommittedWork.toLong(), totalInterferedWork.toLong(), - totalRequestedSpeed, totalAllocatedSpeed, + totalRequestedSpeed ) totalInterferedWork = 0.0 diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 9b10edaf..025b0406 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock import kotlin.math.ceil @@ -42,11 +40,10 @@ public class SimResourceSource( private val scheduler: TimerScheduler<Any> ) : SimResourceProvider { /** - * The resource processing speed over time. + * The current processing speed of the resource. */ - public val speed: StateFlow<Double> - get() = _speed - private val _speed = MutableStateFlow(0.0) + public val speed: Double + get() = ctx?.speed ?: 0.0 /** * The capacity of the resource. @@ -101,8 +98,6 @@ public class SimResourceSource( */ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - _speed.value = speed - // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { scheduler.startSingleTimerTo(this, deadline) { flush() } @@ -110,15 +105,12 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - _speed.value = speed - val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - _speed.value = speed scheduler.cancel(this) cancel() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 73f18c7c..de455021 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -124,6 +124,10 @@ public class SimResourceTransformer( } } + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate?.onConfirm(ctx, speed) + } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { delegate?.onCapacityChanged(ctx, isThrottled) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index fd4a9ed5..114c7312 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources.consumer -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -32,37 +30,52 @@ import kotlin.math.min /** * Helper class to expose an observable [speed] field describing the speed of the consumer. */ -public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate { +public class SimSpeedConsumerAdapter( + private val delegate: SimResourceConsumer, + private val callback: (Double) -> Unit = {} +) : SimResourceConsumer by delegate { /** - * The resource processing speed over time. + * The resource processing speed at this instant. */ - public val speed: StateFlow<Double> - get() = _speed - private val _speed = MutableStateFlow(0.0) + public var speed: Double = 0.0 + private set(value) { + if (field != value) { + callback(value) + field = value + } + } + + init { + callback(0.0) + } override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val command = delegate.onNext(ctx) + return delegate.onNext(ctx) + } - when (command) { - is SimResourceCommand.Idle -> _speed.value = 0.0 - is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit) - is SimResourceCommand.Exit -> _speed.value = 0.0 - } + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate.onConfirm(ctx, speed) - return command + this.speed = speed } override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - val oldSpeed = _speed.value + val oldSpeed = speed delegate.onCapacityChanged(ctx, isThrottled) // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might // need to update the current speed. - if (oldSpeed == _speed.value) { - _speed.value = min(ctx.capacity, _speed.value) + if (oldSpeed == speed) { + speed = min(ctx.capacity, speed) } } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + speed = 0.0 + } + override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index de864c1c..bf8c6d1f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -26,12 +26,12 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -47,15 +47,18 @@ internal class SimResourceAggregatorMaxMinTest { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) + val forwarder = SimResourceForwarder() val sources = listOf( - SimResourceSource(1.0, clock, scheduler), + forwarder, SimResourceSource(1.0, clock, scheduler) ) sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) val usage = mutableListOf<Double>() - val job = launch { sources[0].speed.toList(usage) } + val source = SimResourceSource(1.0, clock, scheduler) + val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) + source.startConsumer(adapter) try { aggregator.output.consume(consumer) @@ -67,7 +70,6 @@ internal class SimResourceAggregatorMaxMinTest { ) } finally { aggregator.output.close() - job.cancel() } } @@ -85,18 +87,17 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(2.0, 1.0) val usage = mutableListOf<Double>() - val job = launch { sources[0].speed.toList(usage) } + val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(consumer) + aggregator.output.consume(adapter) yield() assertAll( { assertEquals(1000, currentTime) }, - { assertEquals(listOf(0.0, 1.0, 0.0), usage) } + { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { aggregator.output.close() - job.cancel() } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 58e19421..dbba6160 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -27,10 +27,10 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -54,11 +54,10 @@ class SimResourceSourceTest { try { val res = mutableListOf<Double>() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() @@ -102,11 +101,10 @@ class SimResourceSourceTest { try { val res = mutableListOf<Double>() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index edd60502..9a40edc4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -65,17 +64,17 @@ internal class SimResourceSwitchExclusiveTest { val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, clock, scheduler) - - switch.addInput(source) + val forwarder = SimResourceForwarder() + val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addInput(forwarder) val provider = switch.addOutput(3200.0) - val job = launch { source.speed.toList(speed) } try { provider.consume(workload) yield() } finally { - job.cancel() provider.close() } |
