summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-30 16:00:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:40 +0200
commit94783ff9d8cd81275fefd5804ac99f98e2dee3a4 (patch)
tree9668a4370fa49aa7bae4decb4b358681dd3031b7 /opendc-simulator/opendc-simulator-flow/src/main/kotlin/org
parenta2ce07026bf3ef17326e72f395dfa2dd9d9b17be (diff)
fix(simulator): Fix loss computation for UPS and PDU
This change fixes the loss computation for both the UPS and PDU implementation that was broken due to the new pushing mechanism. We implement a new class FlowMapper that can be used to map the flow pushed by a `FlowSource` using a user-specified method.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/kotlin/org')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt75
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt1
2 files changed, 76 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
new file mode 100644
index 00000000..6867bcef
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A [FlowConsumer] that maps the pushed flow through [transform].
+ *
+ * @param source The source of the flow.
+ * @param transform The method to transform the flow.
+ */
+public class FlowMapper(
+ private val source: FlowSource,
+ private val transform: (FlowConnection, Double) -> Double
+) : FlowSource {
+
+ /**
+ * The current active connection.
+ */
+ private var _conn: Connection? = null
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ check(_conn == null) { "Concurrent access" }
+ val delegate = Connection(conn, transform)
+ _conn = delegate
+ source.onStart(delegate, now)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ val delegate = checkNotNull(_conn) { "Invariant violation" }
+ _conn = null
+ source.onStop(delegate, now, delta)
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val delegate = checkNotNull(_conn) { "Invariant violation" }
+ return source.onPull(delegate, now, delta)
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ val delegate = _conn ?: return
+ source.onConverge(delegate, now, delta)
+ }
+
+ /**
+ * The wrapper [FlowConnection] that is used to transform the flow.
+ */
+ private class Connection(
+ private val delegate: FlowConnection,
+ private val transform: (FlowConnection, Double) -> Double
+ ) : FlowConnection by delegate {
+ override fun push(rate: Double) {
+ delegate.push(transform(this, rate))
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index c235b9ae..55fa92df 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -127,6 +127,7 @@ internal class FlowConsumerContextImpl(
engine.batch {
source.onStart(this, _clock.millis())
_state = State.Active
+
pull()
}
}