diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-30 14:45:42 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:40 +0200 |
| commit | 4f5a1f88d0c6aa19ce4cab0ec7b9b13a24c92fbe (patch) | |
| tree | 3b3566e70fa3aa43f31a4b51e52263ec98d4e694 /opendc-simulator | |
| parent | b0fc93f818e5e735e972a04f5aa49e0ebe1de181 (diff) | |
refactor(simulator): Remove capacity event
This change removes the Capacity entry from FlowEvent. Since the source
is always pulled on a capacity change, we do not need a separate event
for this.
Diffstat (limited to 'opendc-simulator')
7 files changed, 17 insertions, 39 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt index 14c85183..bb6f25b1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt @@ -40,9 +40,4 @@ public enum class FlowEvent { * This event is emitted to the source when the system has converged into a steady state. */ Converge, - - /** - * This event is emitted to the source when the capacity of the consumer has changed. - */ - Capacity, } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index a74f89b4..a86ed6ea 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -51,7 +51,11 @@ internal class FlowConsumerContextImpl( // Only changes will be propagated if (value != oldValue) { field = value - onCapacityChange() + + // Do not pull the source if it has not been started yet + if (_state == State.Active) { + pull() + } } } @@ -328,23 +332,6 @@ internal class FlowConsumerContextImpl( } /** - * Indicate that the capacity of the resource has changed. - */ - private fun onCapacityChange() { - // Do not inform the consumer if it has not been started yet - if (_state != State.Active) { - return - } - - engine.batch { - // Inform the consumer of the capacity change. This might already trigger an interrupt. - source.onEvent(this, _clock.millis(), FlowEvent.Capacity) - - pull() - } - } - - /** * Schedule an immediate update for this connection. */ private fun scheduleImmediate() { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index a3e108f6..b98cf2f1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -356,8 +356,7 @@ public class MaxMinFlowMultiplexer( /** * The capacity of this output. */ - val capacity: Double - get() = _ctx?.capacity ?: 0.0 + @JvmField var capacity: Double = 0.0 /** * Push the specified rate to the consumer. @@ -374,6 +373,12 @@ public class MaxMinFlowMultiplexer( } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val capacity = capacity + if (capacity != conn.capacity) { + this.capacity = capacity + updateCapacity() + } + runScheduler(now) return Long.MAX_VALUE } @@ -383,13 +388,14 @@ public class MaxMinFlowMultiplexer( FlowEvent.Start -> { assert(_ctx == null) { "Source running concurrently" } _ctx = conn + capacity = conn.capacity updateCapacity() } FlowEvent.Exit -> { _ctx = null + capacity = 0.0 updateCapacity() } - FlowEvent.Capacity -> updateCapacity() else -> {} } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index fcee3906..24ae64cb 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.source import org.opendc.simulator.flow.FlowConnection import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource -import kotlin.math.min /** * Helper class to expose an observable [rate] field describing the flow rate of the source. @@ -59,20 +58,11 @@ public class FlowSourceRateAdapter( } override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - val oldSpeed = rate - try { delegate.onEvent(conn, now, event) when (event) { FlowEvent.Converge -> rate = conn.rate - FlowEvent.Capacity -> { - // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might - // need to update the current speed. - if (oldSpeed == rate) { - rate = min(conn.capacity, rate) - } - } FlowEvent.Exit -> rate = 0.0 else -> {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index f1a5cbe4..fe39eb2c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -99,6 +99,6 @@ class FlowConsumerContextTest { context.start() context.capacity = 4200.0 - verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + verify(exactly = 1) { consumer.onPull(any(), any(), any()) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index d125c638..7fae918a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -197,7 +197,7 @@ internal class FlowForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + verify(exactly = 1) { consumer.onPull(any(), any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 010a985e..5d579e5d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -67,7 +67,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + verify(exactly = 1) { consumer.onPull(any(), any(), any()) } } @Test |
