summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt15
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt43
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt37
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt17
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt11
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt2
29 files changed, 158 insertions, 170 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index c751463d..5245261c 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -120,6 +120,11 @@ public class SimTFDevice(
*/
private var activeWork: Work? = null
+ /**
+ * The timestamp of the last pull.
+ */
+ private var lastPull: Long = 0L
+
override fun onStart(ctx: SimMachineContext) {
for (cpu in ctx.cpus) {
cpu.startConsumer(this)
@@ -131,11 +136,15 @@ public class SimTFDevice(
override fun onStart(conn: FlowConnection, now: Long) {
ctx = conn
capacity = conn.capacity
-
+ lastPull = now
conn.shouldSourceConverge = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ val lastPull = lastPull
+ this.lastPull = now
+ val delta = (now - lastPull).coerceAtLeast(0)
+
val consumedWork = conn.rate * delta / 1000.0
capacity = conn.capacity
@@ -164,7 +173,7 @@ public class SimTFDevice(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
_usage.record(conn.rate)
_power.record(machine.psu.powerDraw)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 6a4c594d..e14ea507 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.simulator.compute.device.SimNetworkAdapter
import org.opendc.simulator.compute.device.SimPeripheral
@@ -87,8 +86,8 @@ public abstract class SimAbstractMachine(
_ctx?.close()
}
- override fun onConverge(now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ override fun onConverge(now: Long) {
+ parent?.onConverge(now)
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 5df03d45..68792c72 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -81,7 +81,7 @@ public class SimBareMetalMachine(
private var _lastConverge = Long.MAX_VALUE
- override fun onConverge(now: Long, delta: Long) {
+ override fun onConverge(now: Long) {
// Update the PSU stage
psu.update()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index 09defbb5..caff4dc3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -86,17 +86,17 @@ public class SimPsu(
conn.shouldSourceConverge = true
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_ctx = null
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0)
conn.push(powerDraw)
return Long.MAX_VALUE
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
_powerDraw = conn.rate
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index b7f70749..8e925bdf 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -134,9 +134,14 @@ public abstract class SimAbstractHypervisor(
private var _cpuCount = 0
private var _cpuCapacity = 0.0
+ private var _lastConverge = engine.clock.millis()
/* FlowConvergenceListener */
- override fun onConverge(now: Long, delta: Long) {
+ override fun onConverge(now: Long) {
+ val lastConverge = _lastConverge
+ _lastConverge = now
+ val delta = now - lastConverge
+
if (delta > 0) {
_counters.record()
}
@@ -146,7 +151,7 @@ public abstract class SimAbstractHypervisor(
governor.onLimit(load)
}
- listener?.onConverge(now, delta)
+ listener?.onConverge(now)
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
index 4cf60605..207e8579 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
@@ -217,7 +217,7 @@ public class SimTrace(
*/
private var _idx = 0
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val size = size
val nowOffset = now - offset
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index 742470a1..46113bb0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -61,17 +61,17 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
delegate.onStart(conn, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return delegate.onPull(conn, now, delta)
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ return delegate.onPull(conn, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
- delegate.onConverge(conn, now, delta)
+ override fun onConverge(conn: FlowConnection, now: Long) {
+ delegate.onConverge(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- delegate.onStop(conn, now, delta)
+ delegate.onStop(conn, now)
} finally {
complete(this)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
index 4685a755..a49826f4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -92,9 +92,9 @@ public suspend fun FlowConsumer.consume(source: FlowSource) {
}
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- source.onStop(conn, now, delta)
+ source.onStop(conn, now)
if (!cont.isCompleted) {
cont.resume(Unit)
@@ -105,18 +105,18 @@ public suspend fun FlowConsumer.consume(source: FlowSource) {
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return try {
- source.onPull(conn, now, delta)
+ source.onPull(conn, now)
} catch (cause: Throwable) {
cont.resumeWithException(cause)
throw cause
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- source.onConverge(conn, now, delta)
+ source.onConverge(conn, now)
} catch (cause: Throwable) {
cont.resumeWithException(cause)
throw cause
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
index 50fbc9c7..1d3adb10 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -31,10 +31,9 @@ public interface FlowConsumerLogic {
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
* @param rate The requested processing rate of the source.
*/
- public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {}
+ public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {}
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -44,17 +43,15 @@ public interface FlowConsumerLogic {
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
+ public fun onConverge(ctx: FlowConsumerContext, now: Long) {}
/**
* This method is invoked when the [FlowSource] completed or failed.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
* @param cause The cause of the failure or `null` if the source completed.
*/
- public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {}
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
index d1afda6f..62cb10d1 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
@@ -30,7 +30,6 @@ public interface FlowConvergenceListener {
* This method is invoked when the system has converged to a steady-state.
*
* @param now The timestamp at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(now: Long, delta: Long) {}
+ public fun onConverge(now: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index a5663293..0ad18f6a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -25,7 +25,11 @@ package org.opendc.simulator.flow
import mu.KotlinLogging
import org.opendc.simulator.flow.internal.D_MS_TO_S
import org.opendc.simulator.flow.internal.MutableFlowCounters
-import kotlin.math.max
+
+/**
+ * The logging instance of this connection.
+ */
+private val logger = KotlinLogging.logger {}
/**
* A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
@@ -40,11 +44,6 @@ public class FlowForwarder(
private val isCoupled: Boolean = false
) : FlowSource, FlowConsumer, AutoCloseable {
/**
- * The logging instance of this connection.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The delegate [FlowSource].
*/
private var delegate: FlowSource? = null
@@ -81,8 +80,6 @@ public class FlowForwarder(
_innerCtx?.pull(now)
}
- @JvmField var lastPull = Long.MAX_VALUE
-
override fun push(rate: Double) {
if (delegate == null) {
return
@@ -102,8 +99,7 @@ public class FlowForwarder(
if (hasDelegateStarted) {
val now = engine.clock.millis()
- val delta = max(0, now - lastPull)
- delegate.onStop(this, now, delta)
+ delegate.onStop(this, now)
}
}
}
@@ -163,7 +159,7 @@ public class FlowForwarder(
}
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_innerCtx = null
val delegate = delegate
@@ -171,25 +167,24 @@ public class FlowForwarder(
reset()
try {
- delegate.onStop(this._ctx, now, delta)
+ delegate.onStop(this._ctx, now)
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
}
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val delegate = delegate
if (!hasDelegateStarted) {
start()
}
- _ctx.lastPull = now
- updateCounters(conn, delta)
+ updateCounters(conn, now)
return try {
- delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE
+ delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
@@ -198,10 +193,10 @@ public class FlowForwarder(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- delegate?.onConverge(this._ctx, now, delta)
- listener?.onConverge(now, delta)
+ delegate?.onConverge(this._ctx, now)
+ listener?.onConverge(now)
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
@@ -217,8 +212,10 @@ public class FlowForwarder(
val delegate = delegate ?: return
try {
- delegate.onStart(_ctx, engine.clock.millis())
+ val now = engine.clock.millis()
+ delegate.onStart(_ctx, now)
hasDelegateStarted = true
+ _lastUpdate = now
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
reset()
@@ -242,11 +239,15 @@ public class FlowForwarder(
* The requested flow rate.
*/
private var _demand: Double = 0.0
+ private var _lastUpdate = 0L
/**
* Update the flow counters for the transformer.
*/
- private fun updateCounters(ctx: FlowConnection, delta: Long) {
+ private fun updateCounters(ctx: FlowConnection, now: Long) {
+ val lastUpdate = _lastUpdate
+ _lastUpdate = now
+ val delta = now - lastUpdate
if (delta <= 0) {
return
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
index 6867bcef..af702701 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
@@ -45,20 +45,20 @@ public class FlowMapper(
source.onStart(delegate, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
val delegate = checkNotNull(_conn) { "Invariant violation" }
_conn = null
- source.onStop(delegate, now, delta)
+ source.onStop(delegate, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val delegate = checkNotNull(_conn) { "Invariant violation" }
- return source.onPull(delegate, now, delta)
+ return source.onPull(delegate, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
val delegate = _conn ?: return
- source.onConverge(delegate, now, delta)
+ source.onConverge(delegate, now)
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index e9094443..d0324ce8 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -104,34 +104,39 @@ public class FlowSink(
* [FlowConsumerLogic] of a sink.
*/
private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
+
override fun onPush(
ctx: FlowConsumerContext,
now: Long,
- delta: Long,
rate: Double
) {
- updateCounters(ctx, delta, rate, ctx.capacity)
+ updateCounters(ctx, now, rate, ctx.capacity)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- updateCounters(ctx, delta, 0.0, 0.0)
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
+ updateCounters(ctx, now, 0.0, 0.0)
_ctx = null
}
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ override fun onConverge(ctx: FlowConsumerContext, now: Long) {
+ parent?.onConverge(now)
}
/**
* The previous demand and capacity for the consumer.
*/
private val _previous = DoubleArray(2)
+ private var _previousUpdate = Long.MAX_VALUE
/**
* Update the counters of the flow consumer.
*/
- private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) {
+ private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) {
+ val previousUpdate = _previousUpdate
+ _previousUpdate = now
+ val delta = now - previousUpdate
+
val counters = counters
val previous = _previous
val demand = previous[0]
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index 3a7e52aa..a48ac18e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -42,19 +42,17 @@ public interface FlowSource {
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the source finished.
- * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
*/
- public fun onStop(conn: FlowConnection, now: Long, delta: Long) {}
+ public fun onStop(conn: FlowConnection, now: Long) {}
/**
* This method is invoked when the source is pulled.
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the pull is occurring.
- * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
* @return The duration after which the resource consumer should be pulled again.
*/
- public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
+ public fun onPull(conn: FlowConnection, now: Long): Long
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -64,7 +62,6 @@ public interface FlowSource {
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {}
+ public fun onConverge(conn: FlowConnection, now: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 1d39d61c..bc6bae71 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.flow.internal
import mu.KotlinLogging
import org.opendc.simulator.flow.*
import java.util.*
-import kotlin.math.max
import kotlin.math.min
/**
@@ -124,14 +123,6 @@ internal class FlowConsumerContextImpl(
private var _flags: Int = 0
/**
- * The timestamp of calls to the callbacks.
- */
- private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull`
- private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush`
- private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence`
- private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence`
-
- /**
* The timers at which the context is scheduled to be interrupted.
*/
private var _timer: Long = Long.MAX_VALUE
@@ -238,15 +229,11 @@ internal class FlowConsumerContextImpl(
try {
// Pull the source if (1) `pull` is called or (2) the timer of the source has expired
newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
- val lastPull = _lastPull
- val delta = max(0, now - lastPull)
-
// Update state before calling into the outside world, so it observes a consistent state
- _lastPull = now
_flags = (flags and ConnPulled.inv()) or ConnUpdateActive
hasUpdated = true
- val duration = source.onPull(this, now, delta)
+ val duration = source.onPull(this, now)
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
@@ -266,15 +253,11 @@ internal class FlowConsumerContextImpl(
// Push to the consumer if the rate of the source has changed (after a call to `push`)
if (flags and ConnPushed != 0) {
- val lastPush = _lastPush
- val delta = max(0, now - lastPush)
-
// Update state before calling into the outside world, so it observes a consistent state
- _lastPush = now
_flags = (flags and ConnPushed.inv()) or ConnUpdateActive
hasUpdated = true
- logic.onPush(this, now, delta, _demand)
+ logic.onPush(this, now, _demand)
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
@@ -372,18 +355,12 @@ internal class FlowConsumerContextImpl(
// Call the source converge callback if it has enabled convergence
if (flags and ConnConvergeSource != 0) {
- val delta = max(0, now - _lastSourceConvergence)
- _lastSourceConvergence = now
-
- source.onConverge(this, now, delta)
+ source.onConverge(this, now)
}
// Call the consumer callback if it has enabled convergence
if (flags and ConnConvergeConsumer != 0) {
- val delta = max(0, now - _lastConsumerConvergence)
- _lastConsumerConvergence = now
-
- logic.onConverge(this, now, delta)
+ logic.onConverge(this, now)
}
} catch (cause: Throwable) {
// Invoke the finish callbacks
@@ -403,7 +380,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doStopSource(now: Long) {
try {
- source.onStop(this, now, max(0, now - _lastPull))
+ source.onStop(this, now)
doFinishConsumer(now, null)
} catch (cause: Throwable) {
doFinishConsumer(now, cause)
@@ -415,7 +392,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doFailSource(now: Long, cause: Throwable) {
try {
- source.onStop(this, now, max(0, now - _lastPull))
+ source.onStop(this, now)
} catch (e: Throwable) {
e.addSuppressed(cause)
doFinishConsumer(now, e)
@@ -427,7 +404,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doFinishConsumer(now: Long, cause: Throwable?) {
try {
- logic.onFinish(this, now, max(0, now - _lastPush), cause)
+ logic.onFinish(this, now, cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
logger.error(e) { "Uncaught exception" }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 7fe0c1b7..1d7d22ef 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.ArrayDeque
-import kotlin.math.max
/**
* A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
@@ -139,16 +138,8 @@ public class ForwardingFlowMultiplexer(
override fun flushCounters(input: FlowConsumer) {}
- private var _lastConverge = Long.MAX_VALUE
-
- override fun onConverge(now: Long, delta: Long) {
- val listener = listener
- if (listener != null) {
- val lastConverge = _lastConverge
- _lastConverge = now
- val duration = max(0, now - lastConverge)
- listener.onConverge(now, duration)
- }
+ override fun onConverge(now: Long) {
+ listener?.onConverge(now)
}
/**
@@ -167,9 +158,9 @@ public class ForwardingFlowMultiplexer(
forwarder.onStart(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
forwarder.cancel()
- forwarder.onStop(conn, now, delta)
+ forwarder.onStop(conn, now)
}
override fun toString(): String = "ForwardingFlowMultiplexer.Output"
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 7ee4d326..cc831862 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -257,7 +257,7 @@ public class MaxMinFlowMultiplexer(
_lastConverge = now
_lastConvergeInput = input
- parent.onConverge(now, max(0, now - lastConverge))
+ parent.onConverge(now)
}
}
@@ -285,13 +285,11 @@ public class MaxMinFlowMultiplexer(
* This method is invoked when one of the outputs converges.
*/
fun convergeOutput(output: Output, now: Long) {
- val lastConverge = _lastConverge
val parent = parent
if (parent != null) {
_lastConverge = now
-
- parent.onConverge(now, max(0, now - lastConverge))
+ parent.onConverge(now)
}
if (!output.isActive) {
@@ -489,7 +487,7 @@ public class MaxMinFlowMultiplexer(
val previousUpdate = _previousUpdate
_previousUpdate = now
- val delta = (now - previousUpdate).coerceAtLeast(0)
+ val delta = now - previousUpdate
if (delta <= 0) {
return
}
@@ -647,7 +645,6 @@ public class MaxMinFlowMultiplexer(
override fun onPush(
ctx: FlowConsumerContext,
now: Long,
- delta: Long,
rate: Double
) {
doUpdateCounters(now)
@@ -660,7 +657,7 @@ public class MaxMinFlowMultiplexer(
scheduler.trigger(now)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
doUpdateCounters(now)
limit = 0.0
@@ -672,7 +669,7 @@ public class MaxMinFlowMultiplexer(
_ctx = null
}
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ override fun onConverge(ctx: FlowConsumerContext, now: Long) {
scheduler.convergeInput(this, now)
}
@@ -777,7 +774,7 @@ public class MaxMinFlowMultiplexer(
scheduler.registerOutput(this)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_conn = null
capacity = 0.0
isActive = false
@@ -785,7 +782,7 @@ public class MaxMinFlowMultiplexer(
scheduler.deregisterOutput(this, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val capacity = capacity
if (capacity != conn.capacity) {
this.capacity = capacity
@@ -808,7 +805,7 @@ public class MaxMinFlowMultiplexer(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
if (_isActivationOutput) {
scheduler.convergeOutput(this, now)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
index d9779c6a..6cfcc82c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
@@ -37,8 +37,17 @@ public class FixedFlowSource(private val amount: Double, private val utilization
}
private var remainingAmount = amount
+ private var lastPull: Long = 0L
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ lastPull = now
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ val lastPull = lastPull
+ this.lastPull = now
+ val delta = (now - lastPull).coerceAtLeast(0)
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val consumed = conn.rate * delta / 1000.0
val limit = conn.capacity * utilization
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 6dd60d95..80127fb5 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -53,21 +53,21 @@ public class FlowSourceRateAdapter(
delegate.onStart(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- delegate.onStop(conn, now, delta)
+ delegate.onStop(conn, now)
} finally {
rate = 0.0
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return delegate.onPull(conn, now, delta)
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ return delegate.onPull(conn, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- delegate.onConverge(conn, now, delta)
+ delegate.onConverge(conn, now)
} finally {
rate = conn.rate
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
index ae537845..c9a52128 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -37,11 +37,11 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource
_iterator = trace.iterator()
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_iterator = null
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
// Check whether the trace fragment was fully consumed, otherwise wait until we have done so
val nextTarget = _nextTarget
if (nextTarget > now) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
index fe39eb2c..e7b25554 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -36,7 +36,7 @@ class FlowConsumerContextTest {
fun testFlushWithoutCommand() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(1.0)
1000
@@ -57,7 +57,7 @@ class FlowConsumerContextTest {
fun testDoubleStart() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(0.0)
1000
@@ -82,7 +82,7 @@ class FlowConsumerContextTest {
fun testIdempotentCapacityChange() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(1.0)
1000
@@ -99,6 +99,6 @@ class FlowConsumerContextTest {
context.start()
context.capacity = 4200.0
- verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onPull(any(), any()) }
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index 12e72b8f..8b090593 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -45,7 +45,7 @@ internal class FlowForwarderTest {
launch { source.consume(forwarder) }
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -66,7 +66,7 @@ internal class FlowForwarderTest {
forwarder.consume(object : FlowSource {
var isFirst = true
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -87,7 +87,7 @@ internal class FlowForwarderTest {
val engine = FlowEngineImpl(coroutineContext, clock)
val forwarder = FlowForwarder(engine)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -113,7 +113,7 @@ internal class FlowForwarderTest {
val forwarder = FlowForwarder(engine)
val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -122,7 +122,7 @@ internal class FlowForwarderTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 0) { consumer.onStop(any(), any()) }
}
@Test
@@ -140,7 +140,7 @@ internal class FlowForwarderTest {
forwarder.cancel()
verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any()) }
}
@Test
@@ -158,7 +158,7 @@ internal class FlowForwarderTest {
source.cancel()
verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any()) }
}
@Test
@@ -168,7 +168,7 @@ internal class FlowForwarderTest {
val source = FlowSink(engine, 2000.0)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -198,7 +198,7 @@ internal class FlowForwarderTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { source.onPull(any(), any(), any()) }
+ verify(exactly = 1) { source.onPull(any(), any()) }
}
@Test
@@ -214,11 +214,13 @@ internal class FlowForwarderTest {
yield()
- assertEquals(2.0, source.counters.actual)
- assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
- assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
- assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
- assertEquals(2000, clock.millis())
+ assertAll(
+ { assertEquals(2.0, source.counters.actual) },
+ { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } },
+ { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } },
+ { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } },
+ { assertEquals(2000, clock.millis()) }
+ )
}
@Test
@@ -246,7 +248,7 @@ internal class FlowForwarderTest {
try {
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
throw IllegalStateException("Test")
}
})
@@ -269,7 +271,7 @@ internal class FlowForwarderTest {
try {
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
@@ -301,11 +303,11 @@ internal class FlowForwarderTest {
conn.shouldSourceConverge = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
throw IllegalStateException("Test")
}
})
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
index 70c75864..726ddbf7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -65,7 +65,7 @@ internal class FlowSinkTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 3) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 3) { consumer.onPull(any(), any()) }
}
@Test
@@ -95,7 +95,7 @@ internal class FlowSinkTest {
val provider = FlowSink(engine, capacity)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -122,7 +122,7 @@ internal class FlowSinkTest {
resCtx = conn
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -154,7 +154,7 @@ internal class FlowSinkTest {
throw IllegalStateException("Hi")
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
}
@@ -173,7 +173,7 @@ internal class FlowSinkTest {
val consumer = object : FlowSource {
var isFirst = true
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -231,7 +231,7 @@ internal class FlowSinkTest {
val provider = FlowSink(engine, capacity)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
}
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
index 187dacd9..ef15f711 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -112,7 +112,7 @@ internal class ForwardingFlowMultiplexerTest {
isFirst = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
index 4b0d7bbd..675ac1c3 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
@@ -32,7 +32,7 @@ public class SimNetworkSink(
public val capacity: Double
) : SimNetworkPort() {
override fun createConsumer(): FlowSource = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
override fun toString(): String = "SimNetworkSink.Consumer"
}
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index 14d22162..2e6983c8 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -124,7 +124,7 @@ class SimNetworkSinkTest {
assertFalse(sink.isConnected)
assertFalse(source.isConnected)
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
private class Source(engine: FlowEngine) : SimNetworkPort() {
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index eb823eb1..7cc4b801 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -85,7 +85,7 @@ internal class SimPduTest {
outlet.connect(inlet)
outlet.disconnect()
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index 76142103..4f319e65 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -85,7 +85,7 @@ internal class SimPowerSourceTest {
source.connect(inlet)
source.disconnect()
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index a764a368..e19e72fa 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -92,7 +92,7 @@ internal class SimUpsTest {
ups.connect(inlet)
ups.disconnect()
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
class SimpleInlet : SimPowerInlet() {