From 5a0821e19eed87e91054289051213cb60b4235b4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 12 Jan 2022 23:17:33 +0100 Subject: refactor(simulator): Remove delta parameter from flow callbacks This change removes the delta parameter from the callbacks of the flow framework. This parameter was used to indicate the duration in time between the last call and the current call. However, its usefulness was limited since the actual delta values needed by implementors of this method had to be bridged across different flow callbacks. --- .../opendc/experiments/tf20/core/SimTFDevice.kt | 15 ++++++-- .../opendc/simulator/compute/SimAbstractMachine.kt | 5 +-- .../simulator/compute/SimBareMetalMachine.kt | 2 +- .../org/opendc/simulator/compute/device/SimPsu.kt | 6 +-- .../compute/kernel/SimAbstractHypervisor.kt | 9 ++++- .../opendc/simulator/compute/workload/SimTrace.kt | 2 +- .../compute/workload/SimWorkloadLifecycle.kt | 12 +++--- .../org/opendc/simulator/flow/FlowConsumer.kt | 12 +++--- .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 9 ++--- .../simulator/flow/FlowConvergenceListener.kt | 3 +- .../org/opendc/simulator/flow/FlowForwarder.kt | 43 +++++++++++----------- .../kotlin/org/opendc/simulator/flow/FlowMapper.kt | 12 +++--- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 19 ++++++---- .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 9 ++--- .../flow/internal/FlowConsumerContextImpl.kt | 37 ++++--------------- .../flow/mux/ForwardingFlowMultiplexer.kt | 17 ++------- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 19 ++++------ .../simulator/flow/source/FixedFlowSource.kt | 11 +++++- .../simulator/flow/source/FlowSourceRateAdapter.kt | 12 +++--- .../simulator/flow/source/TraceFlowSource.kt | 4 +- .../simulator/flow/FlowConsumerContextTest.kt | 8 ++-- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 38 ++++++++++--------- .../org/opendc/simulator/flow/FlowSinkTest.kt | 12 +++--- .../flow/mux/ForwardingFlowMultiplexerTest.kt | 2 +- .../org/opendc/simulator/network/SimNetworkSink.kt | 2 +- .../opendc/simulator/network/SimNetworkSinkTest.kt | 2 +- .../org/opendc/simulator/power/SimPduTest.kt | 2 +- .../opendc/simulator/power/SimPowerSourceTest.kt | 2 +- .../org/opendc/simulator/power/SimUpsTest.kt | 2 +- 29 files changed, 158 insertions(+), 170 deletions(-) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index c751463d..5245261c 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -120,6 +120,11 @@ public class SimTFDevice( */ private var activeWork: Work? = null + /** + * The timestamp of the last pull. + */ + private var lastPull: Long = 0L + override fun onStart(ctx: SimMachineContext) { for (cpu in ctx.cpus) { cpu.startConsumer(this) @@ -131,11 +136,15 @@ public class SimTFDevice( override fun onStart(conn: FlowConnection, now: Long) { ctx = conn capacity = conn.capacity - + lastPull = now conn.shouldSourceConverge = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) + val consumedWork = conn.rate * delta / 1000.0 capacity = conn.capacity @@ -164,7 +173,7 @@ public class SimTFDevice( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { _usage.record(conn.rate) _power.record(machine.psu.powerDraw) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 6a4c594d..e14ea507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral @@ -87,8 +86,8 @@ public abstract class SimAbstractMachine( _ctx?.close() } - override fun onConverge(now: Long, delta: Long) { - parent?.onConverge(now, delta) + override fun onConverge(now: Long) { + parent?.onConverge(now) } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 5df03d45..68792c72 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -81,7 +81,7 @@ public class SimBareMetalMachine( private var _lastConverge = Long.MAX_VALUE - override fun onConverge(now: Long, delta: Long) { + override fun onConverge(now: Long) { // Update the PSU stage psu.update() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 09defbb5..caff4dc3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -86,17 +86,17 @@ public class SimPsu( conn.shouldSourceConverge = true } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _ctx = null } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) conn.push(powerDraw) return Long.MAX_VALUE } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { _powerDraw = conn.rate } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index b7f70749..8e925bdf 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -134,9 +134,14 @@ public abstract class SimAbstractHypervisor( private var _cpuCount = 0 private var _cpuCapacity = 0.0 + private var _lastConverge = engine.clock.millis() /* FlowConvergenceListener */ - override fun onConverge(now: Long, delta: Long) { + override fun onConverge(now: Long) { + val lastConverge = _lastConverge + _lastConverge = now + val delta = now - lastConverge + if (delta > 0) { _counters.record() } @@ -146,7 +151,7 @@ public abstract class SimAbstractHypervisor( governor.onLimit(load) } - listener?.onConverge(now, delta) + listener?.onConverge(now) } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt index 4cf60605..207e8579 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -217,7 +217,7 @@ public class SimTrace( */ private var _idx = 0 - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val size = size val nowOffset = now - offset diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index 742470a1..46113bb0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -61,17 +61,17 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { delegate.onStart(conn, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + override fun onPull(conn: FlowConnection, now: Long): Long { + return delegate.onPull(conn, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { - delegate.onConverge(conn, now, delta) + override fun onConverge(conn: FlowConnection, now: Long) { + delegate.onConverge(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - delegate.onStop(conn, now, delta) + delegate.onStop(conn, now) } finally { complete(this) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index 4685a755..a49826f4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -92,9 +92,9 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - source.onStop(conn, now, delta) + source.onStop(conn, now) if (!cont.isCompleted) { cont.resume(Unit) @@ -105,18 +105,18 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return try { - source.onPull(conn, now, delta) + source.onPull(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - source.onConverge(conn, now, delta) + source.onConverge(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index 50fbc9c7..1d3adb10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -31,10 +31,9 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param rate The requested processing rate of the source. */ - public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} + public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {} /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -44,17 +43,15 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} + public fun onConverge(ctx: FlowConsumerContext, now: Long) {} /** * This method is invoked when the [FlowSource] completed or failed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param cause The cause of the failure or `null` if the source completed. */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt index d1afda6f..62cb10d1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt @@ -30,7 +30,6 @@ public interface FlowConvergenceListener { * This method is invoked when the system has converged to a steady-state. * * @param now The timestamp at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(now: Long, delta: Long) {} + public fun onConverge(now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index a5663293..0ad18f6a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -25,7 +25,11 @@ package org.opendc.simulator.flow import mu.KotlinLogging import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.max + +/** + * The logging instance of this connection. + */ +private val logger = KotlinLogging.logger {} /** * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. @@ -39,11 +43,6 @@ public class FlowForwarder( private val listener: FlowConvergenceListener? = null, private val isCoupled: Boolean = false ) : FlowSource, FlowConsumer, AutoCloseable { - /** - * The logging instance of this connection. - */ - private val logger = KotlinLogging.logger {} - /** * The delegate [FlowSource]. */ @@ -81,8 +80,6 @@ public class FlowForwarder( _innerCtx?.pull(now) } - @JvmField var lastPull = Long.MAX_VALUE - override fun push(rate: Double) { if (delegate == null) { return @@ -102,8 +99,7 @@ public class FlowForwarder( if (hasDelegateStarted) { val now = engine.clock.millis() - val delta = max(0, now - lastPull) - delegate.onStop(this, now, delta) + delegate.onStop(this, now) } } } @@ -163,7 +159,7 @@ public class FlowForwarder( } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _innerCtx = null val delegate = delegate @@ -171,25 +167,24 @@ public class FlowForwarder( reset() try { - delegate.onStop(this._ctx, now, delta) + delegate.onStop(this._ctx, now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } } } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = delegate if (!hasDelegateStarted) { start() } - _ctx.lastPull = now - updateCounters(conn, delta) + updateCounters(conn, now) return try { - delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE + delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -198,10 +193,10 @@ public class FlowForwarder( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate?.onConverge(this._ctx, now, delta) - listener?.onConverge(now, delta) + delegate?.onConverge(this._ctx, now) + listener?.onConverge(now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -217,8 +212,10 @@ public class FlowForwarder( val delegate = delegate ?: return try { - delegate.onStart(_ctx, engine.clock.millis()) + val now = engine.clock.millis() + delegate.onStart(_ctx, now) hasDelegateStarted = true + _lastUpdate = now } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } reset() @@ -242,11 +239,15 @@ public class FlowForwarder( * The requested flow rate. */ private var _demand: Double = 0.0 + private var _lastUpdate = 0L /** * Update the flow counters for the transformer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long) { + private fun updateCounters(ctx: FlowConnection, now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + val delta = now - lastUpdate if (delta <= 0) { return } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt index 6867bcef..af702701 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt @@ -45,20 +45,20 @@ public class FlowMapper( source.onStart(delegate, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { val delegate = checkNotNull(_conn) { "Invariant violation" } _conn = null - source.onStop(delegate, now, delta) + source.onStop(delegate, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = checkNotNull(_conn) { "Invariant violation" } - return source.onPull(delegate, now, delta) + return source.onPull(delegate, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { val delegate = _conn ?: return - source.onConverge(delegate, now, delta) + source.onConverge(delegate, now) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index e9094443..d0324ce8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -104,34 +104,39 @@ public class FlowSink( * [FlowConsumerLogic] of a sink. */ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { - updateCounters(ctx, delta, rate, ctx.capacity) + updateCounters(ctx, now, rate, ctx.capacity) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta, 0.0, 0.0) + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { + updateCounters(ctx, now, 0.0, 0.0) _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + override fun onConverge(ctx: FlowConsumerContext, now: Long) { + parent?.onConverge(now) } /** * The previous demand and capacity for the consumer. */ private val _previous = DoubleArray(2) + private var _previousUpdate = Long.MAX_VALUE /** * Update the counters of the flow consumer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) { + val previousUpdate = _previousUpdate + _previousUpdate = now + val delta = now - previousUpdate + val counters = counters val previous = _previous val demand = previous[0] diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 3a7e52aa..a48ac18e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -42,19 +42,17 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the source finished. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. */ - public fun onStop(conn: FlowConnection, now: Long, delta: Long) {} + public fun onStop(conn: FlowConnection, now: Long) {} /** * This method is invoked when the source is pulled. * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the pull is occurring. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. * @return The duration after which the resource consumer should be pulled again. */ - public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long + public fun onPull(conn: FlowConnection, now: Long): Long /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -64,7 +62,6 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {} + public fun onConverge(conn: FlowConnection, now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 1d39d61c..bc6bae71 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.internal import mu.KotlinLogging import org.opendc.simulator.flow.* import java.util.* -import kotlin.math.max import kotlin.math.min /** @@ -123,14 +122,6 @@ internal class FlowConsumerContextImpl( */ private var _flags: Int = 0 - /** - * The timestamp of calls to the callbacks. - */ - private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` - private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` - private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence` - private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence` - /** * The timers at which the context is scheduled to be interrupted. */ @@ -238,15 +229,11 @@ internal class FlowConsumerContextImpl( try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { - val lastPull = _lastPull - val delta = max(0, now - lastPull) - // Update state before calling into the outside world, so it observes a consistent state - _lastPull = now _flags = (flags and ConnPulled.inv()) or ConnUpdateActive hasUpdated = true - val duration = source.onPull(this, now, delta) + val duration = source.onPull(this, now) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -266,15 +253,11 @@ internal class FlowConsumerContextImpl( // Push to the consumer if the rate of the source has changed (after a call to `push`) if (flags and ConnPushed != 0) { - val lastPush = _lastPush - val delta = max(0, now - lastPush) - // Update state before calling into the outside world, so it observes a consistent state - _lastPush = now _flags = (flags and ConnPushed.inv()) or ConnUpdateActive hasUpdated = true - logic.onPush(this, now, delta, _demand) + logic.onPush(this, now, _demand) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -372,18 +355,12 @@ internal class FlowConsumerContextImpl( // Call the source converge callback if it has enabled convergence if (flags and ConnConvergeSource != 0) { - val delta = max(0, now - _lastSourceConvergence) - _lastSourceConvergence = now - - source.onConverge(this, now, delta) + source.onConverge(this, now) } // Call the consumer callback if it has enabled convergence if (flags and ConnConvergeConsumer != 0) { - val delta = max(0, now - _lastConsumerConvergence) - _lastConsumerConvergence = now - - logic.onConverge(this, now, delta) + logic.onConverge(this, now) } } catch (cause: Throwable) { // Invoke the finish callbacks @@ -403,7 +380,7 @@ internal class FlowConsumerContextImpl( */ private fun doStopSource(now: Long) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) doFinishConsumer(now, null) } catch (cause: Throwable) { doFinishConsumer(now, cause) @@ -415,7 +392,7 @@ internal class FlowConsumerContextImpl( */ private fun doFailSource(now: Long, cause: Throwable) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) } catch (e: Throwable) { e.addSuppressed(cause) doFinishConsumer(now, e) @@ -427,7 +404,7 @@ internal class FlowConsumerContextImpl( */ private fun doFinishConsumer(now: Long, cause: Throwable?) { try { - logic.onFinish(this, now, max(0, now - _lastPush), cause) + logic.onFinish(this, now, cause) } catch (e: Throwable) { e.addSuppressed(cause) logger.error(e) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 7fe0c1b7..1d7d22ef 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque -import kotlin.math.max /** * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means @@ -139,16 +138,8 @@ public class ForwardingFlowMultiplexer( override fun flushCounters(input: FlowConsumer) {} - private var _lastConverge = Long.MAX_VALUE - - override fun onConverge(now: Long, delta: Long) { - val listener = listener - if (listener != null) { - val lastConverge = _lastConverge - _lastConverge = now - val duration = max(0, now - lastConverge) - listener.onConverge(now, duration) - } + override fun onConverge(now: Long) { + listener?.onConverge(now) } /** @@ -167,9 +158,9 @@ public class ForwardingFlowMultiplexer( forwarder.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { forwarder.cancel() - forwarder.onStop(conn, now, delta) + forwarder.onStop(conn, now) } override fun toString(): String = "ForwardingFlowMultiplexer.Output" diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7ee4d326..cc831862 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -257,7 +257,7 @@ public class MaxMinFlowMultiplexer( _lastConverge = now _lastConvergeInput = input - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } } @@ -285,13 +285,11 @@ public class MaxMinFlowMultiplexer( * This method is invoked when one of the outputs converges. */ fun convergeOutput(output: Output, now: Long) { - val lastConverge = _lastConverge val parent = parent if (parent != null) { _lastConverge = now - - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } if (!output.isActive) { @@ -489,7 +487,7 @@ public class MaxMinFlowMultiplexer( val previousUpdate = _previousUpdate _previousUpdate = now - val delta = (now - previousUpdate).coerceAtLeast(0) + val delta = now - previousUpdate if (delta <= 0) { return } @@ -647,7 +645,6 @@ public class MaxMinFlowMultiplexer( override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { doUpdateCounters(now) @@ -660,7 +657,7 @@ public class MaxMinFlowMultiplexer( scheduler.trigger(now) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { doUpdateCounters(now) limit = 0.0 @@ -672,7 +669,7 @@ public class MaxMinFlowMultiplexer( _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onConverge(ctx: FlowConsumerContext, now: Long) { scheduler.convergeInput(this, now) } @@ -777,7 +774,7 @@ public class MaxMinFlowMultiplexer( scheduler.registerOutput(this) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _conn = null capacity = 0.0 isActive = false @@ -785,7 +782,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterOutput(this, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val capacity = capacity if (capacity != conn.capacity) { this.capacity = capacity @@ -808,7 +805,7 @@ public class MaxMinFlowMultiplexer( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { if (_isActivationOutput) { scheduler.convergeOutput(this, now) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt index d9779c6a..6cfcc82c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -37,8 +37,17 @@ public class FixedFlowSource(private val amount: Double, private val utilization } private var remainingAmount = amount + private var lastPull: Long = 0L + + override fun onStart(conn: FlowConnection, now: Long) { + lastPull = now + } + + override fun onPull(conn: FlowConnection, now: Long): Long { + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val consumed = conn.rate * delta / 1000.0 val limit = conn.capacity * utilization diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 6dd60d95..80127fb5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -53,21 +53,21 @@ public class FlowSourceRateAdapter( delegate.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - delegate.onStop(conn, now, delta) + delegate.onStop(conn, now) } finally { rate = 0.0 } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + override fun onPull(conn: FlowConnection, now: Long): Long { + return delegate.onPull(conn, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate.onConverge(conn, now, delta) + delegate.onConverge(conn, now) } finally { rate = conn.rate } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index ae537845..c9a52128 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -37,11 +37,11 @@ public class TraceFlowSource(private val trace: Sequence) : FlowSource _iterator = trace.iterator() } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _iterator = null } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget if (nextTarget > now) { diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index fe39eb2c..e7b25554 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -36,7 +36,7 @@ class FlowConsumerContextTest { fun testFlushWithoutCommand() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -57,7 +57,7 @@ class FlowConsumerContextTest { fun testDoubleStart() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(0.0) 1000 @@ -82,7 +82,7 @@ class FlowConsumerContextTest { fun testIdempotentCapacityChange() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -99,6 +99,6 @@ class FlowConsumerContextTest { context.start() context.capacity = 4200.0 - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 1) { consumer.onPull(any(), any()) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 12e72b8f..8b090593 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -45,7 +45,7 @@ internal class FlowForwarderTest { launch { source.consume(forwarder) } forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -66,7 +66,7 @@ internal class FlowForwarderTest { forwarder.consume(object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -87,7 +87,7 @@ internal class FlowForwarderTest { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -113,7 +113,7 @@ internal class FlowForwarderTest { val forwarder = FlowForwarder(engine) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onStop(any(), any(), any()) } + verify(exactly = 0) { consumer.onStop(any(), any()) } } @Test @@ -140,7 +140,7 @@ internal class FlowForwarderTest { forwarder.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -158,7 +158,7 @@ internal class FlowForwarderTest { source.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -168,7 +168,7 @@ internal class FlowForwarderTest { val source = FlowSink(engine, 2000.0) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -198,7 +198,7 @@ internal class FlowForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { source.onPull(any(), any(), any()) } + verify(exactly = 1) { source.onPull(any(), any()) } } @Test @@ -214,11 +214,13 @@ internal class FlowForwarderTest { yield() - assertEquals(2.0, source.counters.actual) - assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } - assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } - assertEquals(2000, clock.millis()) + assertAll( + { assertEquals(2.0, source.counters.actual) }, + { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } }, + { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } }, + { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } }, + { assertEquals(2000, clock.millis()) } + ) } @Test @@ -246,7 +248,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { throw IllegalStateException("Test") } }) @@ -269,7 +271,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } @@ -301,11 +303,11 @@ internal class FlowForwarderTest { conn.shouldSourceConverge = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { throw IllegalStateException("Test") } }) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 70c75864..726ddbf7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -65,7 +65,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 3) { consumer.onPull(any(), any(), any()) } + verify(exactly = 3) { consumer.onPull(any(), any()) } } @Test @@ -95,7 +95,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowSinkTest { resCtx = conn } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -154,7 +154,7 @@ internal class FlowSinkTest { throw IllegalStateException("Hi") } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } } @@ -173,7 +173,7 @@ internal class FlowSinkTest { val consumer = object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -231,7 +231,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE } provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt index 187dacd9..ef15f711 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -112,7 +112,7 @@ internal class ForwardingFlowMultiplexerTest { isFirst = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index 4b0d7bbd..675ac1c3 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -32,7 +32,7 @@ public class SimNetworkSink( public val capacity: Double ) : SimNetworkPort() { override fun createConsumer(): FlowSource = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE override fun toString(): String = "SimNetworkSink.Consumer" } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index 14d22162..2e6983c8 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -124,7 +124,7 @@ class SimNetworkSinkTest { assertFalse(sink.isConnected) assertFalse(source.isConnected) - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } private class Source(engine: FlowEngine) : SimNetworkPort() { diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt index eb823eb1..7cc4b801 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt @@ -85,7 +85,7 @@ internal class SimPduTest { outlet.connect(inlet) outlet.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt index 76142103..4f319e65 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt @@ -85,7 +85,7 @@ internal class SimPowerSourceTest { source.connect(inlet) source.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt index a764a368..e19e72fa 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt @@ -92,7 +92,7 @@ internal class SimUpsTest { ups.connect(inlet) ups.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } class SimpleInlet : SimPowerInlet() { -- cgit v1.2.3