summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt50
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt63
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt14
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt7
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt121
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt112
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt548
7 files changed, 565 insertions, 350 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index d654d58a..b8e0227a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -22,6 +22,7 @@
package org.opendc.simulator.compute
+import javafx.application.Application.launch
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -34,7 +35,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
import org.openjdk.jmh.annotations.*
@@ -47,48 +47,38 @@ import java.util.concurrent.TimeUnit
@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
- private lateinit var scope: SimulationCoroutineScope
- private lateinit var engine: FlowEngine
private lateinit var machineModel: MachineModel
+ private lateinit var trace: Sequence<SimTraceWorkload.Fragment>
@Setup
fun setUp() {
- scope = SimulationCoroutineScope()
- engine = FlowEngine(scope.coroutineContext, scope.clock)
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = MachineModel(
cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- }
- @State(Scope.Thread)
- class Workload {
- lateinit var trace: Sequence<SimTraceWorkload.Fragment>
-
- @Setup
- fun setUp() {
- val random = ThreadLocalRandom.current()
- val entries = List(10000) { SimTraceWorkload.Fragment(it * 1000L, 1000, random.nextDouble(0.0, 4500.0), 1) }
- trace = entries.asSequence()
- }
+ val random = ThreadLocalRandom.current()
+ val entries = List(10000) { SimTraceWorkload.Fragment(it * 1000L, 1000, random.nextDouble(0.0, 4500.0), 1) }
+ trace = entries.asSequence()
}
@Benchmark
- fun benchmarkBareMetal(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkBareMetal() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace))
+ return@runBlockingSimulation machine.run(SimTraceWorkload(trace))
}
}
@Benchmark
- fun benchmarkSpaceSharedHypervisor(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkSpaceSharedHypervisor() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -99,7 +89,7 @@ class SimMachineBenchmarks {
val vm = hypervisor.createMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(SimTraceWorkload(state.trace))
+ return@runBlockingSimulation vm.run(SimTraceWorkload(trace))
} finally {
vm.close()
machine.close()
@@ -108,8 +98,9 @@ class SimMachineBenchmarks {
}
@Benchmark
- fun benchmarkFairShareHypervisorSingle(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkFairShareHypervisorSingle() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -120,7 +111,7 @@ class SimMachineBenchmarks {
val vm = hypervisor.createMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(SimTraceWorkload(state.trace))
+ return@runBlockingSimulation vm.run(SimTraceWorkload(trace))
} finally {
vm.close()
machine.close()
@@ -129,8 +120,9 @@ class SimMachineBenchmarks {
}
@Benchmark
- fun benchmarkFairShareHypervisorDouble(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkFairShareHypervisorDouble() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -144,7 +136,7 @@ class SimMachineBenchmarks {
launch {
try {
- vm.run(SimTraceWorkload(state.trace))
+ vm.run(SimTraceWorkload(trace))
} finally {
machine.close()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index e927f81d..aabd2220 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.flow
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
-import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
@@ -39,61 +38,53 @@ import java.util.concurrent.TimeUnit
@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@OptIn(ExperimentalCoroutinesApi::class)
class FlowBenchmarks {
- private lateinit var scope: SimulationCoroutineScope
- private lateinit var engine: FlowEngine
+ private lateinit var trace: Sequence<TraceFlowSource.Fragment>
@Setup
fun setUp() {
- scope = SimulationCoroutineScope()
- engine = FlowEngine(scope.coroutineContext, scope.clock)
- }
-
- @State(Scope.Thread)
- class Workload {
- lateinit var trace: Sequence<TraceFlowSource.Fragment>
-
- @Setup
- fun setUp() {
- val random = ThreadLocalRandom.current()
- val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
- trace = entries.asSequence()
- }
+ val random = ThreadLocalRandom.current()
+ val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
+ trace = entries.asSequence()
}
@Benchmark
- fun benchmarkSink(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkSink() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val provider = FlowSink(engine, 4200.0)
- return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
+ return@runBlockingSimulation provider.consume(TraceFlowSource(trace))
}
}
@Benchmark
- fun benchmarkForward(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkForward() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val provider = FlowSink(engine, 4200.0)
val forwarder = FlowForwarder(engine)
provider.startConsumer(forwarder)
- return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace))
+ return@runBlockingSimulation forwarder.consume(TraceFlowSource(trace))
}
}
@Benchmark
- fun benchmarkMuxMaxMinSingleSource(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkMuxMaxMinSingleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val switch = MaxMinFlowMultiplexer(engine)
FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
- return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
+ return@runBlockingSimulation provider.consume(TraceFlowSource(trace))
}
}
@Benchmark
- fun benchmarkMuxMaxMinTripleSource(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkMuxMaxMinTripleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val switch = MaxMinFlowMultiplexer(engine)
FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
@@ -102,28 +93,30 @@ class FlowBenchmarks {
repeat(3) {
launch {
val provider = switch.newInput()
- provider.consume(TraceFlowSource(state.trace))
+ provider.consume(TraceFlowSource(trace))
}
}
}
}
@Benchmark
- fun benchmarkMuxExclusiveSingleSource(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkMuxExclusiveSingleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val switch = ForwardingFlowMultiplexer(engine)
FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
- return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
+ return@runBlockingSimulation provider.consume(TraceFlowSource(trace))
}
}
@Benchmark
- fun benchmarkMuxExclusiveTripleSource(state: Workload) {
- return scope.runBlockingSimulation {
+ fun benchmarkMuxExclusiveTripleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
val switch = ForwardingFlowMultiplexer(engine)
FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
@@ -132,7 +125,7 @@ class FlowBenchmarks {
repeat(2) {
launch {
val provider = switch.newInput()
- provider.consume(TraceFlowSource(state.trace))
+ provider.consume(TraceFlowSource(trace))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
index 15f9b93b..d7182497 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -29,6 +29,11 @@ package org.opendc.simulator.flow
*/
public interface FlowConsumerContext : FlowConnection {
/**
+ * The deadline of the source.
+ */
+ public val deadline: Long
+
+ /**
* The capacity of the connection.
*/
public override var capacity: Double
@@ -39,12 +44,17 @@ public interface FlowConsumerContext : FlowConnection {
public var shouldConsumerConverge: Boolean
/**
+ * A flag to control whether the timers for the [FlowSource] should be enabled.
+ */
+ public var enableTimers: Boolean
+
+ /**
* Start the flow over the connection.
*/
public fun start()
/**
- * Synchronously flush the changes of the connection.
+ * Synchronously pull the source of the connection.
*/
- public fun flush()
+ public fun pullSync()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
index 81ed9f26..97d56fff 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
@@ -35,9 +35,10 @@ internal const val ConnState = 0b11 // Mask for accessing the state of the flow
*/
internal const val ConnPulled = 1 shl 2 // The source should be pulled
internal const val ConnPushed = 1 shl 3 // The source has pushed a value
-internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active
-internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending
-internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary
+internal const val ConnClose = 1 shl 4 // The connection should be closed
+internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active
+internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending
internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
+internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 9d36483e..9a568897 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -69,23 +69,30 @@ internal class FlowConsumerContextImpl(
*/
override val demand: Double
get() = _demand
+ private var _demand: Double = 0.0 // The current (pending) demand of the source
+
+ /**
+ * The deadline of the source.
+ */
+ override val deadline: Long
+ get() = _deadline
+ private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
/**
* Flags to control the convergence of the consumer and source.
*/
- override var shouldSourceConverge: Boolean = false
+ override var shouldSourceConverge: Boolean
+ get() = _flags and ConnConvergeSource == ConnConvergeSource
set(value) {
- field = value
_flags =
if (value)
_flags or ConnConvergeSource
else
_flags and ConnConvergeSource.inv()
}
- override var shouldConsumerConverge: Boolean = false
+ override var shouldConsumerConverge: Boolean
+ get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer
set(value) {
- field = value
-
_flags =
if (value)
_flags or ConnConvergeConsumer
@@ -94,15 +101,22 @@ internal class FlowConsumerContextImpl(
}
/**
- * The clock to track simulation time.
+ * Flag to control the timers on the [FlowSource]
*/
- private val _clock = engine.clock
+ override var enableTimers: Boolean
+ get() = _flags and ConnDisableTimers != ConnDisableTimers
+ set(value) {
+ _flags =
+ if (!value)
+ _flags or ConnDisableTimers
+ else
+ _flags and ConnDisableTimers.inv()
+ }
/**
- * The current state of the connection.
+ * The clock to track simulation time.
*/
- private var _demand: Double = 0.0 // The current (pending) demand of the source
- private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
+ private val _clock = engine.clock
/**
* The flags of the flow connection, indicating certain actions.
@@ -136,23 +150,17 @@ internal class FlowConsumerContextImpl(
}
override fun close() {
- var flags = _flags
+ val flags = _flags
if (flags and ConnState == ConnClosed) {
return
}
- engine.batch {
- // Mark the connection as closed and pulled (in order to converge)
- flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled
- _flags = flags
-
- if (flags and ConnUpdateActive == 0) {
- val now = _clock.millis()
- doStopSource(now)
-
- // FIX: Make sure the context converges
- scheduleImmediate(now, flags)
- }
+ // Toggle the close bit. In case no update is active, schedule a new update.
+ if (flags and ConnUpdateActive == 0) {
+ val now = _clock.millis()
+ scheduleImmediate(now, flags or ConnClose)
+ } else {
+ _flags = flags or ConnClose
}
}
@@ -166,7 +174,7 @@ internal class FlowConsumerContextImpl(
scheduleImmediate(_clock.millis(), flags or ConnPulled)
}
- override fun flush() {
+ override fun pullSync() {
val flags = _flags
// Do not attempt to flush the connection if the connection is closed or an update is already active
@@ -174,7 +182,11 @@ internal class FlowConsumerContextImpl(
return
}
- engine.scheduleSync(_clock.millis(), this)
+ val now = _clock.millis()
+
+ if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
+ engine.scheduleSync(now, this)
+ }
}
override fun push(rate: Double) {
@@ -218,7 +230,7 @@ internal class FlowConsumerContextImpl(
val deadline = _deadline
val reachedDeadline = deadline == now
- var newDeadline = deadline
+ var newDeadline: Long
var hasUpdated = false
try {
@@ -245,9 +257,13 @@ internal class FlowConsumerContextImpl(
deadline
}
+ // Make the new deadline available for the consumer if it has changed
+ if (newDeadline != deadline) {
+ _deadline = newDeadline
+ }
+
// Push to the consumer if the rate of the source has changed (after a call to `push`)
- val newState = flags and ConnState
- if (newState == ConnActive && flags and ConnPushed != 0) {
+ if (flags and ConnPushed != 0) {
val lastPush = _lastPush
val delta = max(0, now - lastPush)
@@ -260,17 +276,33 @@ internal class FlowConsumerContextImpl(
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
- } else if (newState == ConnClosed) {
+ }
+
+ // Check whether the source or consumer have tried to close the connection
+ if (flags and ConnClose != 0) {
hasUpdated = true
// The source has called [FlowConnection.close], so clean up the connection
doStopSource(now)
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ // We now also mark the connection as closed
+ flags = (_flags and ConnState.inv()) or ConnClosed
+
+ _demand = 0.0
+ newDeadline = Long.MAX_VALUE
}
} catch (cause: Throwable) {
+ hasUpdated = true
+
+ // Clean up the connection
+ doFailSource(now, cause)
+
// Mark the connection as closed
flags = (flags and ConnState.inv()) or ConnClosed
- doFailSource(now, cause)
+ _demand = 0.0
+ newDeadline = Long.MAX_VALUE
}
// Check whether the connection needs to be added to the visited queue. This is the case when:
@@ -302,14 +334,16 @@ internal class FlowConsumerContextImpl(
_timer
}
- // Set the new deadline and schedule a delayed update for that deadline
- _deadline = newDeadline
-
// Check whether we need to schedule a new timer for this connection. That is the case when:
// (1) The deadline is valid (not the maximum value)
// (2) The connection is active
- // (3) The current active timer for the connection points to a later deadline
- if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) {
+ // (3) Timers are not disabled for the source
+ // (4) The current active timer for the connection points to a later deadline
+ if (newDeadline == Long.MAX_VALUE ||
+ flags and ConnState != ConnActive ||
+ flags and ConnDisableTimers != 0 ||
+ (timer != null && newDeadline >= timer.target)
+ ) {
// Ignore any deadline scheduled at the maximum value
// This indicates that the source does not want to register a timer
return
@@ -336,8 +370,8 @@ internal class FlowConsumerContextImpl(
// The connection is converging now, so unset the convergence pending flag
_flags = flags and ConnConvergePending.inv()
- // Call the source converge callback if it has enabled convergence and the connection is active
- if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) {
+ // Call the source converge callback if it has enabled convergence
+ if (flags and ConnConvergeSource != 0) {
val delta = max(0, now - _lastSourceConvergence)
_lastSourceConvergence = now
@@ -352,7 +386,13 @@ internal class FlowConsumerContextImpl(
logic.onConverge(this, now, delta)
}
} catch (cause: Throwable) {
+ // Invoke the finish callbacks
doFailSource(now, cause)
+
+ // Mark the connection as closed
+ _flags = (_flags and ConnState.inv()) or ConnClosed
+ _demand = 0.0
+ _deadline = Long.MAX_VALUE
}
}
@@ -367,10 +407,6 @@ internal class FlowConsumerContextImpl(
doFinishConsumer(now, null)
} catch (cause: Throwable) {
doFinishConsumer(now, cause)
- return
- } finally {
- _deadline = Long.MAX_VALUE
- _demand = 0.0
}
}
@@ -383,9 +419,6 @@ internal class FlowConsumerContextImpl(
} catch (e: Throwable) {
e.addSuppressed(cause)
doFinishConsumer(now, e)
- } finally {
- _deadline = Long.MAX_VALUE
- _demand = 0.0
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 019b5f10..a9234abf 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
-internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine {
+internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable {
/**
* The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
@@ -82,7 +82,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
return
}
- runEngine(now)
+ doRunEngine(now)
}
/**
@@ -100,7 +100,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
return
}
- runEngine(now)
+ doRunEngine(now)
}
override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
@@ -120,16 +120,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
}
}
- /**
- * Run the engine and mark as active while running.
- */
- private fun runEngine(now: Long) {
- try {
- batchIndex++
- doRunEngine(now)
- } finally {
- batchIndex--
- }
+ /* Runnable */
+ override fun run() {
+ val now = clock.millis()
+ val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
+ assert(now >= invocation.timestamp) { "Future invocations invariant violated" }
+
+ doRunEngine(now)
}
/**
@@ -141,44 +138,43 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
val futureInvocations = futureInvocations
val visited = visited
- // Remove any entries in the `futureInvocations` queue from the past
- while (true) {
- val head = futureInvocations.peek()
- if (head == null || head.timestamp > now) {
- break
- }
- futureInvocations.poll()
- }
-
- // Execute all scheduled updates at current timestamp
- while (true) {
- val timer = futureQueue.peek() ?: break
- val target = timer.target
+ try {
+ // Increment batch index so synchronous calls will not launch concurrent engine invocations
+ batchIndex++
- if (target > now) {
- break
- }
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ val timer = futureQueue.peek() ?: break
+ val target = timer.target
- assert(target >= now) { "Internal inconsistency: found update of the past" }
+ if (target > now) {
+ break
+ }
- futureQueue.poll()
- timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
- }
+ assert(target >= now) { "Internal inconsistency: found update of the past" }
- // Repeat execution of all immediate updates until the system has converged to a steady-state
- // We have to take into account that the onConverge callback can also trigger new actions.
- do {
- // Execute all immediate updates
- while (true) {
- val ctx = queue.poll() ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
+ futureQueue.poll()
+ timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
}
- while (true) {
- val ctx = visited.poll() ?: break
- ctx.onConverge(now)
- }
- } while (queue.isNotEmpty())
+ // Repeat execution of all immediate updates until the system has converged to a steady-state
+ // We have to take into account that the onConverge callback can also trigger new actions.
+ do {
+ // Execute all immediate updates
+ while (true) {
+ val ctx = queue.poll() ?: break
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
+ }
+
+ while (true) {
+ val ctx = visited.poll() ?: break
+ ctx.onConverge(now)
+ }
+ } while (queue.isNotEmpty())
+ } finally {
+ // Decrement batch index to indicate no engine is active at the moment
+ batchIndex--
+ }
// Schedule an engine invocation for the next update to occur.
val headTimer = futureQueue.peek()
@@ -195,24 +191,14 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
* @param scheduled The queue of scheduled invocations.
*/
private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- while (true) {
- val invocation = scheduled.peekFirst()
- if (invocation == null || invocation.timestamp > target) {
- // Case 2: A new timer was registered ahead of the other timers.
- // Solution: Schedule a new scheduler invocation
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context)
- scheduled.addFirst(Invocation(target, handle))
- break
- } else if (invocation.timestamp < target) {
- // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted
- // Solution: Cancel the next scheduler invocation
- scheduled.pollFirst()
-
- invocation.cancel()
- } else {
- break
- }
+ val head = scheduled.peek()
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(target - now, this, context)
+ scheduled.addFirst(Invocation(target, handle))
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 5ff0fb8d..97059e93 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -38,7 +38,7 @@ import kotlin.math.min
*/
public class MaxMinFlowMultiplexer(
private val engine: FlowEngine,
- private val parent: FlowConvergenceListener? = null,
+ parent: FlowConvergenceListener? = null,
private val interferenceDomain: InterferenceDomain? = null
) : FlowMultiplexer {
/**
@@ -47,7 +47,6 @@ public class MaxMinFlowMultiplexer(
override val inputs: Set<FlowConsumer>
get() = _inputs
private val _inputs = mutableSetOf<Input>()
- private val _activeInputs = mutableListOf<Input>()
/**
* The outputs of the multiplexer.
@@ -55,50 +54,38 @@ public class MaxMinFlowMultiplexer(
override val outputs: Set<FlowSource>
get() = _outputs
private val _outputs = mutableSetOf<Output>()
- private val _activeOutputs = mutableListOf<Output>()
/**
* The flow counters of this multiplexer.
*/
public override val counters: FlowCounters
- get() = _counters
- private val _counters = FlowCountersImpl()
+ get() = scheduler.counters
/**
* The actual processing rate of the multiplexer.
*/
public override val rate: Double
- get() = _rate
- private var _rate = 0.0
+ get() = scheduler.rate
/**
* The demanded processing rate of the input.
*/
public override val demand: Double
- get() = _demand
- private var _demand = 0.0
+ get() = scheduler.demand
/**
* The capacity of the outputs.
*/
public override val capacity: Double
- get() = _capacity
- private var _capacity = 0.0
+ get() = scheduler.capacity
/**
- * Flag to indicate that the scheduler is active.
+ * The [Scheduler] instance of this multiplexer.
*/
- private var _schedulerActive = false
- private var _lastSchedulerCycle = Long.MAX_VALUE
-
- /**
- * The last convergence timestamp and the input.
- */
- private var _lastConverge: Long = Long.MIN_VALUE
- private var _lastConvergeInput: Input? = null
+ private val scheduler = Scheduler(engine, parent)
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val provider = Input(_capacity, key)
+ val provider = Input(engine, scheduler, interferenceDomain, key)
_inputs.add(provider)
return provider
}
@@ -112,7 +99,7 @@ public class MaxMinFlowMultiplexer(
}
override fun newOutput(): FlowSource {
- val output = Output()
+ val output = Output(scheduler)
_outputs.add(output)
return output
}
@@ -146,147 +133,330 @@ public class MaxMinFlowMultiplexer(
}
/**
- * Converge the scheduler of the multiplexer.
+ * Helper class containing the scheduler state.
*/
- private fun runScheduler(now: Long) {
- if (_schedulerActive) {
- return
- }
- val lastSchedulerCycle = _lastSchedulerCycle
- val delta = max(0, now - lastSchedulerCycle)
- _schedulerActive = true
- _lastSchedulerCycle = now
-
- try {
- doSchedule(now, delta)
- } finally {
- _schedulerActive = false
- }
- }
+ private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) {
+ /**
+ * The flow counters of this scheduler.
+ */
+ @JvmField val counters = FlowCountersImpl()
- /**
- * Schedule the inputs over the outputs.
- */
- private fun doSchedule(now: Long, delta: Long) {
- val activeInputs = _activeInputs
- val activeOutputs = _activeOutputs
+ /**
+ * The flow rate of the multiplexer.
+ */
+ @JvmField var rate = 0.0
- // Update the counters of the scheduler
- updateCounters(delta)
+ /**
+ * The demand for the multiplexer.
+ */
+ @JvmField var demand = 0.0
- // If there is no work yet, mark the inputs as idle.
- if (activeInputs.isEmpty()) {
- _demand = 0.0
- _rate = 0.0
- return
+ /**
+ * The capacity of the multiplexer.
+ */
+ @JvmField var capacity = 0.0
+
+ /**
+ * An [Output] that is used to activate the scheduler.
+ */
+ @JvmField var activationOutput: Output? = null
+
+ /**
+ * The active inputs registered with the scheduler.
+ */
+ private val _activeInputs = mutableListOf<Input>()
+
+ /**
+ * The active outputs registered with the scheduler.
+ */
+ private val _activeOutputs = mutableListOf<Output>()
+
+ /**
+ * Flag to indicate that the scheduler is active.
+ */
+ private var _schedulerActive = false
+ private var _lastSchedulerCycle = Long.MAX_VALUE
+
+ /**
+ * The last convergence timestamp and the input.
+ */
+ private var _lastConverge: Long = Long.MIN_VALUE
+ private var _lastConvergeInput: Input? = null
+
+ /**
+ * Register the specified [input] to this scheduler.
+ */
+ fun registerInput(input: Input) {
+ _activeInputs.add(input)
+
+ val hasActivationOutput = activationOutput != null
+
+ // Disable timers and convergence of the source if one of the output manages it
+ input.shouldConsumerConverge = !hasActivationOutput
+ input.enableTimers = !hasActivationOutput
+ input.capacity = capacity
+ trigger(engine.clock.millis())
+ }
+
+ /**
+ * De-register the specified [input] from this scheduler.
+ */
+ fun deregisterInput(input: Input, now: Long) {
+ // Assign a new input responsible for handling the convergence events
+ if (_lastConvergeInput == input) {
+ _lastConvergeInput = null
+ }
+
+ // Re-run scheduler to distribute new load
+ trigger(now)
}
- val capacity = _capacity
- var availableCapacity = capacity
+ /**
+ * This method is invoked when one of the inputs converges.
+ */
+ fun convergeInput(input: Input, now: Long) {
+
+ val lastConverge = _lastConverge
+ val lastConvergeInput = _lastConvergeInput
+ val parent = parent
- // Pull in the work of the outputs
- val inputIterator = activeInputs.listIterator()
- for (input in inputIterator) {
- input.pull(now)
+ if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) {
+ _lastConverge = now
+ _lastConvergeInput = input
- // Remove outputs that have finished
- if (!input.isActive) {
- input.actualRate = 0.0
- inputIterator.remove()
+ parent.onConverge(now, max(0, now - lastConverge))
}
}
- var demand = 0.0
+ /**
+ * Register the specified [output] to this scheduler.
+ */
+ fun registerOutput(output: Output) {
+ _activeOutputs.add(output)
+
+ updateCapacity()
+ updateActivationOutput()
+ }
+
+ /**
+ * De-register the specified [output] from this scheduler.
+ */
+ fun deregisterOutput(output: Output, now: Long) {
+ _activeOutputs.remove(output)
+ updateCapacity()
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- activeInputs.sort()
+ trigger(now)
+ }
+
+ /**
+ * This method is invoked when one of the outputs converges.
+ */
+ fun convergeOutput(output: Output, now: Long) {
+ val lastConverge = _lastConverge
+ val parent = parent
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- var remaining = activeInputs.size
- for (i in activeInputs.indices) {
- val input = activeInputs[i]
- val availableShare = availableCapacity / remaining--
- val grantedRate = min(input.allowedRate, availableShare)
+ if (parent != null) {
+ _lastConverge = now
- // Ignore empty sources
- if (grantedRate <= 0.0) {
- input.actualRate = 0.0
- continue
+ parent.onConverge(now, max(0, now - lastConverge))
}
- input.actualRate = grantedRate
- demand += input.limit
- availableCapacity -= grantedRate
+ if (!output.isActive) {
+ output.isActivationOutput = false
+ updateActivationOutput()
+ }
}
- val rate = capacity - availableCapacity
+ /**
+ * Trigger the scheduler of the multiplexer.
+ *
+ * @param now The current virtual timestamp of the simulation.
+ */
+ fun trigger(now: Long) {
+ if (_schedulerActive) {
+ // No need to trigger the scheduler in case it is already active
+ return
+ }
+
+ val activationOutput = activationOutput
- _demand = demand
- _rate = rate
+ // We can run the scheduler in two ways:
+ // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
+ // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
+ // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
+ // a few inputs and little changes at the same timestamp.
+ // We always pick for option (1) unless there are no outputs available.
+ if (activationOutput != null) {
+ activationOutput.pull()
+ return
+ } else {
+ runScheduler(now)
+ }
+ }
- // Sort all consumers by their capacity
- activeOutputs.sort()
+ /**
+ * Synchronously run the scheduler of the multiplexer.
+ */
+ fun runScheduler(now: Long): Long {
+ val lastSchedulerCycle = _lastSchedulerCycle
+ _lastSchedulerCycle = now
- // Divide the requests over the available capacity of the input resources fairly
- for (i in activeOutputs.indices) {
- val output = activeOutputs[i]
- val inputCapacity = output.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
+ val delta = max(0, now - lastSchedulerCycle)
- output.push(grantedSpeed)
+ return try {
+ _schedulerActive = true
+ doRunScheduler(delta)
+ } finally {
+ _schedulerActive = false
+ }
}
- }
- /**
- * Recompute the capacity of the multiplexer.
- */
- private fun updateCapacity() {
- val newCapacity = _activeOutputs.sumOf(Output::capacity)
+ /**
+ * Recompute the capacity of the multiplexer.
+ */
+ fun updateCapacity() {
+ val newCapacity = _activeOutputs.sumOf(Output::capacity)
- // No-op if the capacity is unchanged
- if (_capacity == newCapacity) {
- return
+ // No-op if the capacity is unchanged
+ if (capacity == newCapacity) {
+ return
+ }
+
+ capacity = newCapacity
+
+ for (input in _activeInputs) {
+ input.capacity = newCapacity
+ }
+
+ // Sort outputs by their capacity
+ _activeOutputs.sort()
}
- _capacity = newCapacity
+ /**
+ * Updates the output that is used for scheduler activation.
+ */
+ private fun updateActivationOutput() {
+ val output = _activeOutputs.firstOrNull()
+ activationOutput = output
- for (input in _inputs) {
- input.capacity = newCapacity
+ if (output != null) {
+ output.isActivationOutput = true
+ }
+
+ val hasActivationOutput = output != null
+
+ for (input in _activeInputs) {
+ input.shouldConsumerConverge = !hasActivationOutput
+ input.enableTimers = !hasActivationOutput
+ }
}
- }
- /**
- * The previous capacity of the multiplexer.
- */
- private var _previousCapacity = 0.0
+ /**
+ * Schedule the inputs over the outputs.
+ *
+ * @return The deadline after which a new scheduling cycle should start.
+ */
+ private fun doRunScheduler(delta: Long): Long {
+ val activeInputs = _activeInputs
+ val activeOutputs = _activeOutputs
+
+ // Update the counters of the scheduler
+ updateCounters(delta)
+
+ // If there is no work yet, mark the inputs as idle.
+ if (activeInputs.isEmpty()) {
+ demand = 0.0
+ rate = 0.0
+ return Long.MAX_VALUE
+ }
- /**
- * Update the counters of the scheduler.
- */
- private fun updateCounters(delta: Long) {
- val previousCapacity = _previousCapacity
- _previousCapacity = _capacity
+ val capacity = capacity
+ var availableCapacity = capacity
+
+ // Pull in the work of the outputs
+ val inputIterator = activeInputs.listIterator()
+ for (input in inputIterator) {
+ input.pullSync()
+
+ // Remove outputs that have finished
+ if (!input.isActive) {
+ input.actualRate = 0.0
+ inputIterator.remove()
+ }
+ }
- if (delta <= 0) {
- return
+ var demand = 0.0
+ var deadline = Long.MAX_VALUE
+
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeInputs.sort()
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ val size = activeInputs.size
+ for (i in activeInputs.indices) {
+ val input = activeInputs[i]
+ val availableShare = availableCapacity / (size - i)
+ val grantedRate = min(input.allowedRate, availableShare)
+
+ demand += input.limit
+ deadline = min(deadline, input.deadline)
+ availableCapacity -= grantedRate
+
+ input.actualRate = grantedRate
+ }
+
+ val rate = capacity - availableCapacity
+
+ this.demand = demand
+ this.rate = rate
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
+
+ return deadline
}
- val deltaS = delta / 1000.0
+ /**
+ * The previous capacity of the multiplexer.
+ */
+ private var _previousCapacity = 0.0
+
+ /**
+ * Update the counters of the scheduler.
+ */
+ private fun updateCounters(delta: Long) {
+ val previousCapacity = _previousCapacity
+ _previousCapacity = capacity
+
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta / 1000.0
- _counters.demand += _demand * deltaS
- _counters.actual += _rate * deltaS
- _counters.remaining += (previousCapacity - _rate) * deltaS
+ counters.demand += demand * deltaS
+ counters.actual += rate * deltaS
+ counters.remaining += (previousCapacity - rate) * deltaS
+ }
}
/**
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
- private inner class Input(capacity: Double, val key: InterferenceKey?) :
- AbstractFlowConsumer(engine, capacity),
- FlowConsumerLogic,
- Comparable<Input> {
+ private class Input(
+ engine: FlowEngine,
+ private val scheduler: Scheduler,
+ private val interferenceDomain: InterferenceDomain?,
+ @JvmField val key: InterferenceKey?
+ ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> {
/**
* The requested limit.
*/
@@ -298,25 +468,39 @@ public class MaxMinFlowMultiplexer(
@JvmField var actualRate: Double = 0.0
/**
+ * The deadline of the input.
+ */
+ val deadline: Long
+ get() = ctx?.deadline ?: Long.MAX_VALUE
+
+ /**
* The processing rate that is allowed by the model constraints.
*/
val allowedRate: Double
get() = min(capacity, limit)
/**
- * A flag to indicate that the input is closed.
+ * A flag to enable timers for the input.
*/
- private var _isClosed: Boolean = false
+ var enableTimers: Boolean = true
+ set(value) {
+ field = value
+ ctx?.enableTimers = value
+ }
/**
- * The timestamp at which we received the last command.
+ * A flag to control whether the input should converge.
*/
- private var _lastPull: Long = Long.MIN_VALUE
+ var shouldConsumerConverge: Boolean = true
+ set(value) {
+ field = value
+ ctx?.shouldConsumerConverge = value
+ }
/**
- * The interference domain this input belongs to.
+ * A flag to indicate that the input is closed.
*/
- private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain
+ private var _isClosed: Boolean = false
/**
* Close the input.
@@ -333,12 +517,7 @@ public class MaxMinFlowMultiplexer(
override fun start(ctx: FlowConsumerContext) {
check(!_isClosed) { "Cannot re-use closed input" }
-
- _activeInputs += this
- if (parent != null) {
- ctx.shouldConsumerConverge = true
- }
-
+ scheduler.registerInput(this)
super.start(ctx)
}
@@ -353,21 +532,8 @@ public class MaxMinFlowMultiplexer(
actualRate = 0.0
limit = rate
- _lastPull = now
-
- runScheduler(now)
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- val lastConverge = _lastConverge
- val parent = parent
-
- if (parent != null && (lastConverge < now || _lastConvergeInput == null)) {
- _lastConverge = now
- _lastConvergeInput = this
- parent.onConverge(now, max(0, now - lastConverge))
- }
+ scheduler.trigger(now)
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
@@ -375,15 +541,15 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
- _lastPull = now
- // Assign a new input responsible for handling the convergence events
- if (_lastConvergeInput == this) {
- _lastConvergeInput = null
- }
+ scheduler.deregisterInput(this, now)
- // Re-run scheduler to distribute new load
- runScheduler(now)
+ // BUG: Cancel the connection so that `ctx` is set to `null`
+ cancel()
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ scheduler.convergeInput(this, now)
}
/* Comparable */
@@ -392,11 +558,8 @@ public class MaxMinFlowMultiplexer(
/**
* Pull the source if necessary.
*/
- fun pull(now: Long) {
- val ctx = ctx
- if (ctx != null && _lastPull < now) {
- ctx.flush()
- }
+ fun pullSync() {
+ ctx?.pullSync()
}
/**
@@ -409,7 +572,7 @@ public class MaxMinFlowMultiplexer(
// Compute the performance penalty due to flow interference
val perfScore = if (interferenceDomain != null) {
- val load = _rate / _capacity
+ val load = scheduler.rate / scheduler.capacity
interferenceDomain.apply(key, load)
} else {
1.0
@@ -422,14 +585,14 @@ public class MaxMinFlowMultiplexer(
updateCounters(demand, actual, remaining)
- _counters.interference += actual * max(0.0, 1 - perfScore)
+ scheduler.counters.interference += actual * max(0.0, 1 - perfScore)
}
}
/**
* An internal [FlowSource] implementation for multiplexer outputs.
*/
- private inner class Output : FlowSource, Comparable<Output> {
+ private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> {
/**
* The active [FlowConnection] of this source.
*/
@@ -441,6 +604,22 @@ public class MaxMinFlowMultiplexer(
@JvmField var capacity: Double = 0.0
/**
+ * A flag to indicate that this output is the activation output.
+ */
+ var isActivationOutput: Boolean
+ get() = _isActivationOutput
+ set(value) {
+ _isActivationOutput = value
+ _conn?.shouldSourceConverge = value
+ }
+ private var _isActivationOutput: Boolean = false
+
+ /**
+ * A flag to indicate that the output is active.
+ */
+ @JvmField var isActive = false
+
+ /**
* Push the specified rate to the consumer.
*/
fun push(rate: Double) {
@@ -454,35 +633,56 @@ public class MaxMinFlowMultiplexer(
_conn?.close()
}
+ /**
+ * Pull this output.
+ */
+ fun pull() {
+ _conn?.pull()
+ }
+
override fun onStart(conn: FlowConnection, now: Long) {
assert(_conn == null) { "Source running concurrently" }
_conn = conn
capacity = conn.capacity
- _activeOutputs.add(this)
+ isActive = true
- updateCapacity()
+ scheduler.registerOutput(this)
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
_conn = null
capacity = 0.0
- _activeOutputs.remove(this)
+ isActive = false
- updateCapacity()
-
- runScheduler(now)
+ scheduler.deregisterOutput(this, now)
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val capacity = capacity
if (capacity != conn.capacity) {
this.capacity = capacity
- updateCapacity()
+ scheduler.updateCapacity()
}
- // Re-run scheduler to distribute new load
- runScheduler(now)
- return Long.MAX_VALUE
+ return if (_isActivationOutput) {
+ // If this output is the activation output, synchronously run the scheduler and return the new deadline
+ val deadline = scheduler.runScheduler(now)
+ if (deadline == Long.MAX_VALUE)
+ deadline
+ else
+ deadline - now
+ } else {
+ // Output is not the activation output, so trigger activation output and do not install timer for this
+ // output (by returning `Long.MAX_VALUE`)
+ scheduler.trigger(now)
+ Long.MAX_VALUE
+ }
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ if (_isActivationOutput) {
+ scheduler.convergeOutput(this, now)
+ }
}
override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)