summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-30 14:45:42 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:40 +0200
commit4f5a1f88d0c6aa19ce4cab0ec7b9b13a24c92fbe (patch)
tree3b3566e70fa3aa43f31a4b51e52263ec98d4e694
parentb0fc93f818e5e735e972a04f5aa49e0ebe1de181 (diff)
refactor(simulator): Remove capacity event
This change removes the Capacity entry from FlowEvent. Since the source is always pulled on a capacity change, we do not need a separate event for this.
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt5
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt23
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt2
8 files changed, 21 insertions, 45 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 2ba65e90..6f460ef7 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -131,6 +131,8 @@ public class SimTFDevice(
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val consumedWork = conn.rate * delta / 1000.0
+ capacity = conn.capacity
+
val activeWork = activeWork
if (activeWork != null) {
if (activeWork.consume(consumedWork)) {
@@ -158,12 +160,8 @@ public class SimTFDevice(
override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
FlowEvent.Start -> {
- this.ctx = conn
- this.capacity = conn.capacity
- }
- FlowEvent.Capacity -> {
- this.capacity = conn.capacity
- conn.pull()
+ ctx = conn
+ capacity = conn.capacity
}
FlowEvent.Converge -> {
_usage.record(conn.rate)
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
index 14c85183..bb6f25b1 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
@@ -40,9 +40,4 @@ public enum class FlowEvent {
* This event is emitted to the source when the system has converged into a steady state.
*/
Converge,
-
- /**
- * This event is emitted to the source when the capacity of the consumer has changed.
- */
- Capacity,
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index a74f89b4..a86ed6ea 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -51,7 +51,11 @@ internal class FlowConsumerContextImpl(
// Only changes will be propagated
if (value != oldValue) {
field = value
- onCapacityChange()
+
+ // Do not pull the source if it has not been started yet
+ if (_state == State.Active) {
+ pull()
+ }
}
}
@@ -328,23 +332,6 @@ internal class FlowConsumerContextImpl(
}
/**
- * Indicate that the capacity of the resource has changed.
- */
- private fun onCapacityChange() {
- // Do not inform the consumer if it has not been started yet
- if (_state != State.Active) {
- return
- }
-
- engine.batch {
- // Inform the consumer of the capacity change. This might already trigger an interrupt.
- source.onEvent(this, _clock.millis(), FlowEvent.Capacity)
-
- pull()
- }
- }
-
- /**
* Schedule an immediate update for this connection.
*/
private fun scheduleImmediate() {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index a3e108f6..b98cf2f1 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -356,8 +356,7 @@ public class MaxMinFlowMultiplexer(
/**
* The capacity of this output.
*/
- val capacity: Double
- get() = _ctx?.capacity ?: 0.0
+ @JvmField var capacity: Double = 0.0
/**
* Push the specified rate to the consumer.
@@ -374,6 +373,12 @@ public class MaxMinFlowMultiplexer(
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val capacity = capacity
+ if (capacity != conn.capacity) {
+ this.capacity = capacity
+ updateCapacity()
+ }
+
runScheduler(now)
return Long.MAX_VALUE
}
@@ -383,13 +388,14 @@ public class MaxMinFlowMultiplexer(
FlowEvent.Start -> {
assert(_ctx == null) { "Source running concurrently" }
_ctx = conn
+ capacity = conn.capacity
updateCapacity()
}
FlowEvent.Exit -> {
_ctx = null
+ capacity = 0.0
updateCapacity()
}
- FlowEvent.Capacity -> updateCapacity()
else -> {}
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index fcee3906..24ae64cb 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.flow.source
import org.opendc.simulator.flow.FlowConnection
import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
-import kotlin.math.min
/**
* Helper class to expose an observable [rate] field describing the flow rate of the source.
@@ -59,20 +58,11 @@ public class FlowSourceRateAdapter(
}
override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- val oldSpeed = rate
-
try {
delegate.onEvent(conn, now, event)
when (event) {
FlowEvent.Converge -> rate = conn.rate
- FlowEvent.Capacity -> {
- // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
- // need to update the current speed.
- if (oldSpeed == rate) {
- rate = min(conn.capacity, rate)
- }
- }
FlowEvent.Exit -> rate = 0.0
else -> {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
index f1a5cbe4..fe39eb2c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -99,6 +99,6 @@ class FlowConsumerContextTest {
context.start()
context.capacity = 4200.0
- verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index d125c638..7fae918a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -197,7 +197,7 @@ internal class FlowForwarderTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
index 010a985e..5d579e5d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -67,7 +67,7 @@ internal class FlowSinkTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
}
@Test