summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt7
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt5
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt18
6 files changed, 32 insertions, 18 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
index c327e1e9..8ff0bc76 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -52,6 +52,13 @@ public interface FlowConnection : AutoCloseable {
public fun pull()
/**
+ * Pull the source.
+ *
+ * @param now The timestamp at which the connection is pulled.
+ */
+ public fun pull(now: Long)
+
+ /**
* Push the given flow [rate] over this connection.
*
* @param rate The rate of the flow to push.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
index d7182497..98922ab3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -55,6 +55,8 @@ public interface FlowConsumerContext : FlowConnection {
/**
* Synchronously pull the source of the connection.
+ *
+ * @param now The timestamp at which the connection is pulled.
*/
- public fun pullSync()
+ public fun pullSync(now: Long)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 7230a966..e3bdd7ba 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -72,6 +72,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
_innerCtx?.pull()
}
+ override fun pull(now: Long) {
+ _innerCtx?.pull(now)
+ }
+
@JvmField var lastPull = Long.MAX_VALUE
override fun push(rate: Double) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 0baa7880..58ca918b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -164,17 +164,21 @@ internal class FlowConsumerContextImpl(
}
}
- override fun pull() {
+ override fun pull(now: Long) {
val flags = _flags
if (flags and ConnState != ConnActive) {
return
}
// Mark connection as pulled
- scheduleImmediate(_clock.millis(), flags or ConnPulled)
+ scheduleImmediate(now, flags or ConnPulled)
}
- override fun pullSync() {
+ override fun pull() {
+ pull(_clock.millis())
+ }
+
+ override fun pullSync(now: Long) {
val flags = _flags
// Do not attempt to flush the connection if the connection is closed or an update is already active
@@ -182,8 +186,6 @@ internal class FlowConsumerContextImpl(
return
}
- val now = _clock.millis()
-
if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
engine.scheduleSync(now, this)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 55debef0..3c79d54e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -129,11 +129,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc
/* Runnable */
override fun run() {
- val now = _clock.millis()
val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
- assert(now >= invocation.timestamp) { "Future invocations invariant violated" }
-
- doRunEngine(now)
+ doRunEngine(invocation.timestamp)
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index eab5b299..a0fb8a4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -302,7 +302,7 @@ public class MaxMinFlowMultiplexer(
// a few inputs and little changes at the same timestamp.
// We always pick for option (1) unless there are no outputs available.
if (activationOutput != null) {
- activationOutput.pull()
+ activationOutput.pull(now)
return
} else {
runScheduler(now)
@@ -320,7 +320,7 @@ public class MaxMinFlowMultiplexer(
return try {
_schedulerActive = true
- doRunScheduler(delta)
+ doRunScheduler(now, delta)
} finally {
_schedulerActive = false
}
@@ -371,7 +371,7 @@ public class MaxMinFlowMultiplexer(
*
* @return The deadline after which a new scheduling cycle should start.
*/
- private fun doRunScheduler(delta: Long): Long {
+ private fun doRunScheduler(now: Long, delta: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
var inputArray = _inputArray
@@ -396,7 +396,8 @@ public class MaxMinFlowMultiplexer(
// Pull in the work of the inputs
for (i in 0 until inputSize) {
val input = inputArray[i]
- input.pullSync()
+
+ input.pullSync(now)
// Remove inputs that have finished
if (!input.isActive) {
@@ -595,8 +596,8 @@ public class MaxMinFlowMultiplexer(
/**
* Pull the source if necessary.
*/
- fun pullSync() {
- _ctx?.pullSync()
+ fun pullSync(now: Long) {
+ _ctx?.pullSync(now)
}
/* FlowConsumer */
@@ -733,8 +734,8 @@ public class MaxMinFlowMultiplexer(
/**
* Pull this output.
*/
- fun pull() {
- _conn?.pull()
+ fun pull(now: Long) {
+ _conn?.pull(now)
}
override fun onStart(conn: FlowConnection, now: Long) {
@@ -772,6 +773,7 @@ public class MaxMinFlowMultiplexer(
// Output is not the activation output, so trigger activation output and do not install timer for this
// output (by returning `Long.MAX_VALUE`)
scheduler.trigger(now)
+
Long.MAX_VALUE
}
}