summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-30 16:27:45 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:41 +0200
commit559ac2327b8aa319fb8ab4558d4f4aa3382349f4 (patch)
treeb4c7640b57485c928479fe7d855bfbee8fc18e23
parent94783ff9d8cd81275fefd5804ac99f98e2dee3a4 (diff)
perf(simulator): Make convergence callback optional
This change adds two new properties for controlling whether the convergence callbacks of the source and consumer respectively should be invoked. This saves a lot of unnecessary calls for stages that do not have any implementation of the `onConvergence` method.
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt5
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt)17
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt11
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt39
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt14
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt4
20 files changed, 116 insertions, 49 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 017bca59..fb36d2c7 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -131,6 +131,8 @@ public class SimTFDevice(
override fun onStart(conn: FlowConnection, now: Long) {
ctx = conn
capacity = conn.capacity
+
+ conn.shouldSourceConverge = true
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 6a62d8a5..60a10f20 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -43,9 +43,9 @@ import kotlin.coroutines.resume
*/
public abstract class SimAbstractMachine(
protected val engine: FlowEngine,
- final override val parent: FlowSystem?,
+ private val parent: FlowConvergenceListener?,
final override val model: MachineModel
-) : SimMachine, FlowSystem {
+) : SimMachine, FlowConvergenceListener {
/**
* The resources allocated for this machine.
*/
@@ -116,6 +116,10 @@ public abstract class SimAbstractMachine(
cancel()
}
+ override fun onConverge(now: Long, delta: Long) {
+ parent?.onConverge(now, delta)
+ }
+
/**
* Cancel the workload that is currently running on the machine.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 37cf282b..0bcf5957 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -46,7 +46,7 @@ public class SimBareMetalMachine(
model: MachineModel,
powerDriver: PowerDriver,
public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)),
- parent: FlowSystem? = null,
+ parent: FlowConvergenceListener? = null,
) : SimAbstractMachine(engine, parent, model) {
/**
* The power draw of the machine onto the PSU.
@@ -66,7 +66,7 @@ public class SimBareMetalMachine(
*/
private val powerDriverLogic = powerDriver.createLogic(this, cpus)
- override fun onConverge(timestamp: Long) {
+ override fun onConverge(now: Long, delta: Long) {
psu.update()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index 62d91c0b..09defbb5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -83,6 +83,7 @@ public class SimPsu(
override fun createSource(): FlowSource = object : FlowSource {
override fun onStart(conn: FlowConnection, now: Long) {
_ctx = conn
+ conn.shouldSourceConverge = true
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index b145eefc..bcba8e8e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -145,8 +145,6 @@ public abstract class SimAbstractHypervisor(
interferenceDomain?.leave(interferenceKey)
}
}
-
- override fun onConverge(timestamp: Long) {}
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
index 36ab7c1c..b0515c6e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
@@ -28,8 +28,8 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.flow.FlowConvergenceListener
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSystem
import org.opendc.simulator.flow.mux.FlowMultiplexer
import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
@@ -45,7 +45,7 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
*/
public class SimFairShareHypervisor(
engine: FlowEngine,
- private val parent: FlowSystem? = null,
+ private val parent: FlowConvergenceListener? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
private val listener: SimHypervisor.Listener? = null
@@ -57,11 +57,9 @@ public class SimFairShareHypervisor(
return SwitchSystem(ctx).switch
}
- private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowSystem {
+ private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowConvergenceListener {
val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
- override val parent: FlowSystem? = this@SimFairShareHypervisor.parent
-
private var lastCpuUsage = 0.0
private var lastCpuDemand = 0.0
private var lastDemand = 0.0
@@ -70,11 +68,11 @@ public class SimFairShareHypervisor(
private var lastInterference = 0.0
private var lastReport = Long.MIN_VALUE
- override fun onConverge(timestamp: Long) {
+ override fun onConverge(now: Long, delta: Long) {
val listener = listener ?: return
val counters = switch.counters
- if (timestamp > lastReport) {
+ if (now > lastReport) {
listener.onSliceFinish(
this@SimFairShareHypervisor,
counters.demand - lastDemand,
@@ -85,7 +83,7 @@ public class SimFairShareHypervisor(
lastCpuDemand
)
}
- lastReport = timestamp
+ lastReport = now
lastCpuDemand = switch.outputs.sumOf { it.demand }
lastCpuUsage = switch.outputs.sumOf { it.rate }
@@ -96,6 +94,8 @@ public class SimFairShareHypervisor(
val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency }
triggerGovernors(load)
+
+ parent?.onConverge(now, delta)
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
index bfa099fb..e0a70926 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
+import org.opendc.simulator.flow.FlowConvergenceListener
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSystem
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
@@ -35,7 +35,7 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override fun create(
engine: FlowEngine,
- parent: FlowSystem?,
+ parent: FlowConvergenceListener?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
index 97f07097..dad2cc3b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
+import org.opendc.simulator.flow.FlowConvergenceListener
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSystem
/**
* A service provider interface for constructing a [SimHypervisor].
@@ -44,7 +44,7 @@ public interface SimHypervisorProvider {
*/
public fun create(
engine: FlowEngine,
- parent: FlowSystem? = null,
+ parent: FlowConvergenceListener? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
listener: SimHypervisor.Listener? = null
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
index 7869d72d..93921eb9 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
+import org.opendc.simulator.flow.FlowConvergenceListener
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSystem
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
@@ -35,7 +35,7 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override fun create(
engine: FlowEngine,
- parent: FlowSystem?,
+ parent: FlowConvergenceListener?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
index fa833961..c327e1e9 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -42,6 +42,11 @@ public interface FlowConnection : AutoCloseable {
public val demand: Double
/**
+ * A flag to control whether [FlowSource.onConverge] should be invoked for this source.
+ */
+ public var shouldSourceConverge: Boolean
+
+ /**
* Pull the source.
*/
public fun pull()
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
index 75b2d25b..15f9b93b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -34,6 +34,11 @@ public interface FlowConsumerContext : FlowConnection {
public override var capacity: Double
/**
+ * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer.
+ */
+ public var shouldConsumerConverge: Boolean
+
+ /**
* Start the flow over the connection.
*/
public fun start()
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
index ef94ab22..50fbc9c7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -39,6 +39,9 @@ public interface FlowConsumerLogic {
/**
* This method is invoked when the flow graph has converged into a steady-state system.
*
+ * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method
+ * will not be invoked.
+ *
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the system converged.
* @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
index db6aa69f..d1afda6f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
@@ -23,21 +23,14 @@
package org.opendc.simulator.flow
/**
- * A system of possible multiple sub-resources.
- *
- * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the
- * resource provider.
+ * A listener interface for when a flow stage has converged into a steady-state.
*/
-public interface FlowSystem {
- /**
- * The parent system to which this system belongs or `null` if it has no parent.
- */
- public val parent: FlowSystem?
-
+public interface FlowConvergenceListener {
/**
* This method is invoked when the system has converged to a steady-state.
*
- * @param timestamp The timestamp at which the system converged.
+ * @param now The timestamp at which the system converged.
+ * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(timestamp: Long)
+ public fun onConverge(now: Long, delta: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index ab5b31c2..17de601a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -52,6 +52,12 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
* The exposed [FlowConnection].
*/
private val _ctx = object : FlowConnection {
+ override var shouldSourceConverge: Boolean = false
+ set(value) {
+ field = value
+ _innerCtx?.shouldSourceConverge = value
+ }
+
override val capacity: Double
get() = _innerCtx?.capacity ?: 0.0
@@ -141,6 +147,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
override fun onStart(conn: FlowConnection, now: Long) {
_innerCtx = conn
+
+ if (_ctx.shouldSourceConverge) {
+ conn.shouldSourceConverge = true
+ }
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index fc590177..549a338b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -32,9 +32,16 @@ package org.opendc.simulator.flow
public class FlowSink(
private val engine: FlowEngine,
initialCapacity: Double,
- private val parent: FlowSystem? = null
+ private val parent: FlowConvergenceListener? = null
) : AbstractFlowConsumer(engine, initialCapacity) {
+ override fun start(ctx: FlowConsumerContext) {
+ if (parent != null) {
+ ctx.shouldConsumerConverge = true
+ }
+ super.start(ctx)
+ }
+
override fun createLogic(): FlowConsumerLogic {
return object : FlowConsumerLogic {
override fun onPush(
@@ -52,7 +59,7 @@ public class FlowSink(
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now)
+ parent?.onConverge(now, delta)
}
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index a4f624ef..3a7e52aa 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -59,6 +59,9 @@ public interface FlowSource {
/**
* This method is invoked when the flow graph has converged into a steady-state system.
*
+ * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method
+ * will not be invoked.
+ *
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the system converged.
* @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 55fa92df..c087a28d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -73,6 +73,12 @@ internal class FlowConsumerContextImpl(
get() = _demand
/**
+ * Flags to control the convergence of the consumer and source.
+ */
+ override var shouldConsumerConverge: Boolean = false
+ override var shouldSourceConverge: Boolean = false
+
+ /**
* The clock to track simulation time.
*/
private val _clock = engine.clock
@@ -114,7 +120,8 @@ internal class FlowConsumerContextImpl(
*/
private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull`
private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush`
- private var _lastConvergence: Long = Long.MAX_VALUE // Last call to `onConvergence`
+ private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence`
+ private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence`
/**
* The timers at which the context is scheduled to be interrupted.
@@ -199,8 +206,14 @@ internal class FlowConsumerContextImpl(
* @return A flag to indicate whether the connection has already been updated before convergence.
*/
fun doUpdate(now: Long): Boolean {
- val willConverge = _willConverge
- _willConverge = true
+ // The connection will only converge if either the source or the consumer wants the converge callback to be
+ // invoked.
+ val shouldConverge = shouldSourceConverge || shouldConsumerConverge
+ var willConverge = false
+ if (shouldConverge) {
+ willConverge = _willConverge
+ _willConverge = true
+ }
val oldState = _state
if (oldState != State.Active) {
@@ -286,19 +299,25 @@ internal class FlowConsumerContextImpl(
/**
* This method is invoked when the system converges into a steady state.
*/
- fun onConverge(timestamp: Long) {
- val delta = max(0, timestamp - _lastConvergence)
- _lastConvergence = timestamp
+ fun onConverge(now: Long) {
_willConverge = false
try {
- if (_state == State.Active) {
- source.onConverge(this, timestamp, delta)
+ if (_state == State.Active && shouldSourceConverge) {
+ val delta = max(0, now - _lastSourceConvergence)
+ _lastSourceConvergence = now
+
+ source.onConverge(this, now, delta)
}
- logic.onConverge(this, timestamp, delta)
+ if (shouldConsumerConverge) {
+ val delta = max(0, now - _lastConsumerConvergence)
+ _lastConsumerConvergence = now
+
+ logic.onConverge(this, now, delta)
+ }
} catch (cause: Throwable) {
- doFailSource(timestamp, cause)
+ doFailSource(now, cause)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index c7379fa9..7232df35 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -38,7 +38,7 @@ import kotlin.math.min
*/
public class MaxMinFlowMultiplexer(
private val engine: FlowEngine,
- private val parent: FlowSystem? = null,
+ private val parent: FlowConvergenceListener? = null,
private val interferenceDomain: InterferenceDomain? = null
) : FlowMultiplexer {
/**
@@ -269,6 +269,11 @@ public class MaxMinFlowMultiplexer(
check(!_isClosed) { "Cannot re-use closed input" }
_activeInputs += this
+
+ if (parent != null) {
+ ctx.shouldConsumerConverge = true
+ }
+
super.start(ctx)
}
@@ -289,7 +294,7 @@ public class MaxMinFlowMultiplexer(
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now)
+ parent?.onConverge(now, delta)
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 0c39523f..6dd60d95 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -47,6 +47,12 @@ public class FlowSourceRateAdapter(
callback(0.0)
}
+ override fun onStart(conn: FlowConnection, now: Long) {
+ conn.shouldSourceConverge = true
+
+ delegate.onStart(conn, now)
+ }
+
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
try {
delegate.onStop(conn, now, delta)
@@ -60,9 +66,11 @@ public class FlowSourceRateAdapter(
}
override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
- delegate.onConverge(conn, now, delta)
-
- rate = conn.rate
+ try {
+ delegate.onConverge(conn, now, delta)
+ } finally {
+ rate = conn.rate
+ }
}
override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index 3690e681..d548451f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -297,6 +297,10 @@ internal class FlowForwarderTest {
try {
forwarder.consume(object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ conn.shouldSourceConverge = true
+ }
+
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return Long.MAX_VALUE
}