From df60042e59fe4ad476642262889808346f850c2c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 15 Nov 2021 10:27:07 +0100 Subject: bug(trace): Adjust CPU capacity to number of vCPUs This change fixes an issue where the number of vCPUs was not taken into account when converting from CPU Usage percentage to MHz. --- .../src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 69fc79bb..5088b044 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -426,7 +426,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val id = reader.get(idCol) as String val resource = selected[id] ?: continue - val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz + val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() val delta = (timestamp - state.time) -- cgit v1.2.3 From 2615c1c3e6c3f79bc14386398bb1d14d65c17512 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Jan 2022 13:44:48 +0100 Subject: fix(telemetry): Fix reporting of CPU time This change fixes an issue with the CPU time aggregation for VMs in the ComputeMetricAggregator, leading to incorrect values. --- .../org/opendc/telemetry/compute/ComputeMetricAggregator.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index 418dc201..9557f680 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -493,10 +493,10 @@ public class ComputeMetricAggregator { fun reset() { previousUptime = _uptime previousDowntime = _downtime - previousCpuActiveTime = cpuActiveTime - previousCpuIdleTime = cpuIdleTime - previousCpuStealTime = cpuStealTime - previousCpuLostTime = cpuLostTime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime _host = null _cpuLimit = 0.0 -- cgit v1.2.3 From 841eaeb84a96cb1b20172b1ab293ebef0bb573a5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Jan 2022 13:53:07 +0100 Subject: fix(simulator): Flush results before accessing counters This change updates the simulator implementation to flush the active progress when accessing the hypervisor counters. Previously, if the counters were accessed, while the mux or consumer was in progress, its counter values were not accurate. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 1 + .../org/opendc/compute/simulator/internal/Guest.kt | 1 + .../org/opendc/compute/simulator/SimHostTest.kt | 4 +-- .../experiments/capelin/CapelinIntegrationTest.kt | 10 +++--- .../compute/kernel/SimAbstractHypervisor.kt | 21 +++++++++-- .../compute/kernel/SimHypervisorCounters.kt | 5 +++ .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 10 ++++++ .../flow/mux/ForwardingFlowMultiplexer.kt | 4 +++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 42 +++++++++++++++------- 9 files changed, 76 insertions(+), 22 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 43f33f27..4eb6392e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -418,6 +418,7 @@ public class SimHost( */ private fun collectCpuTime(result: ObservableLongMeasurement) { val counters = hypervisor.counters + counters.flush() result.record(counters.cpuActiveTime / 1000L, _activeState) result.record(counters.cpuIdleTime / 1000L, _idleState) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index f49c2824..bb378ee3 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -272,6 +272,7 @@ internal class Guest( */ fun collectCpuTime(result: ObservableLongMeasurement) { val counters = machine.counters + counters.flush() result.record(counters.cpuActiveTime / 1000, _activeState) result.record(counters.cpuIdleTime / 1000, _idleState) diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index dd13b60c..f0325023 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -173,7 +173,7 @@ internal class SimHostTest { assertAll( { assertEquals(658, activeTime, "Active time does not match") }, - { assertEquals(1741, idleTime, "Idle time does not match") }, + { assertEquals(2341, idleTime, "Idle time does not match") }, { assertEquals(637, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) @@ -278,7 +278,7 @@ internal class SimHostTest { meterProvider.close() assertAll( - { assertEquals(1175, idleTime, "Idle time does not match") }, + { assertEquals(1775, idleTime, "Idle time does not match") }, { assertEquals(624, activeTime, "Active time does not match") }, { assertEquals(900001, uptime, "Uptime does not match") }, { assertEquals(300000, downtime, "Downtime does not match") }, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index aefd8304..891fc8be 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -117,7 +117,7 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, @@ -166,7 +166,7 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, @@ -218,10 +218,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(468522, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -263,7 +263,7 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 07465126..b7f70749 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -137,7 +137,9 @@ public abstract class SimAbstractHypervisor( /* FlowConvergenceListener */ override fun onConverge(now: Long, delta: Long) { - _counters.record() + if (delta > 0) { + _counters.record() + } val load = cpuDemand / cpuCapacity for (governor in governors) { @@ -274,6 +276,10 @@ public abstract class SimAbstractHypervisor( fun close() { switch.removeInput(source) } + + fun flush() { + switch.flushCounters(source) + } } /** @@ -337,6 +343,11 @@ public abstract class SimAbstractHypervisor( cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() cpuTime[3] += (interferenceDelta * d).roundToLong() } + + override fun flush() { + hv.mux.flushCounters() + record() + } } /** @@ -348,10 +359,16 @@ public abstract class SimAbstractHypervisor( override val cpuActiveTime: Long get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong() override val cpuStealTime: Long get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() override val cpuLostTime: Long get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + + override fun flush() { + for (cpu in cpus) { + cpu.flush() + } + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt index 030d9c5f..63fee507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt @@ -45,4 +45,9 @@ public interface SimHypervisorCounters { * The amount of CPU time (in milliseconds) that was lost due to interference between virtual machines. */ public val cpuLostTime: Long + + /** + * Flush the counter values. + */ + public fun flush() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index a7877546..5f198944 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -105,4 +105,14 @@ public interface FlowMultiplexer { * Clear the outputs of the multiplexer. */ public fun clearOutputs() + + /** + * Flush the counters of the multiplexer. + */ + public fun flushCounters() + + /** + * Flush the counters of the specified [input]. + */ + public fun flushCounters(input: FlowConsumer) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index eff111b8..7fe0c1b7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -135,6 +135,10 @@ public class ForwardingFlowMultiplexer( clearInputs() } + override fun flushCounters() {} + + override fun flushCounters(input: FlowConsumer) {} + private var _lastConverge = Long.MAX_VALUE override fun onConverge(now: Long, delta: Long) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 3d26efda..7ee4d326 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -141,6 +141,14 @@ public class MaxMinFlowMultiplexer( clearInputs() } + override fun flushCounters() { + scheduler.updateCounters(engine.clock.millis()) + } + + override fun flushCounters(input: FlowConsumer) { + (input as Input).doUpdateCounters(engine.clock.millis()) + } + /** * Helper class containing the scheduler state. */ @@ -189,7 +197,6 @@ public class MaxMinFlowMultiplexer( * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false - private var _lastSchedulerCycle = Long.MAX_VALUE /** * The last convergence timestamp and the input. @@ -324,14 +331,9 @@ public class MaxMinFlowMultiplexer( * Synchronously run the scheduler of the multiplexer. */ fun runScheduler(now: Long): Long { - val lastSchedulerCycle = _lastSchedulerCycle - _lastSchedulerCycle = now - - val delta = max(0, now - lastSchedulerCycle) - return try { _schedulerActive = true - doRunScheduler(now, delta) + doRunScheduler(now) } finally { _schedulerActive = false } @@ -384,14 +386,14 @@ public class MaxMinFlowMultiplexer( * * @return The deadline after which a new scheduling cycle should start. */ - private fun doRunScheduler(now: Long, delta: Long): Long { + private fun doRunScheduler(now: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs var inputArray = _inputArray var inputSize = _inputArray.size // Update the counters of the scheduler - updateCounters(delta) + updateCounters(now) // If there is no work yet, mark the inputs as idle. if (inputSize == 0) { @@ -475,14 +477,19 @@ public class MaxMinFlowMultiplexer( * The previous capacity of the multiplexer. */ private var _previousCapacity = 0.0 + private var _previousUpdate = Long.MIN_VALUE /** * Update the counters of the scheduler. */ - private fun updateCounters(delta: Long) { + fun updateCounters(now: Long) { val previousCapacity = _previousCapacity _previousCapacity = capacity + val previousUpdate = _previousUpdate + _previousUpdate = now + + val delta = (now - previousUpdate).coerceAtLeast(0) if (delta <= 0) { return } @@ -643,7 +650,7 @@ public class MaxMinFlowMultiplexer( delta: Long, rate: Double ) { - doUpdateCounters(delta) + doUpdateCounters(now) val allowed = min(rate, capacity) limit = rate @@ -654,7 +661,7 @@ public class MaxMinFlowMultiplexer( } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - doUpdateCounters(delta) + doUpdateCounters(now) limit = 0.0 actualRate = 0.0 @@ -672,10 +679,19 @@ public class MaxMinFlowMultiplexer( /* Comparable */ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) + /** + * The timestamp that the counters where last updated. + */ + private var _lastUpdate = Long.MIN_VALUE + /** * Helper method to update the flow counters of the multiplexer. */ - private fun doUpdateCounters(delta: Long) { + fun doUpdateCounters(now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + + val delta = (now - lastUpdate).coerceAtLeast(0) if (delta <= 0L) { return } -- cgit v1.2.3 From 9e69eaf1c7b0c4985d37f3f4595e2e2478d389f2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Jan 2022 14:12:30 +0100 Subject: perf(simulator): Move logger field out of class This change updates the FlowConsumerContextImpl by moving the logger outside of the class. This prevents each instance of this class from having to construct a logger instance. --- .../opendc/simulator/flow/internal/FlowConsumerContextImpl.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 58ca918b..1d39d61c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -28,6 +28,11 @@ import java.util.* import kotlin.math.max import kotlin.math.min +/** + * The logging instance of this connection. + */ +private val logger = KotlinLogging.logger {} + /** * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. */ @@ -36,11 +41,6 @@ internal class FlowConsumerContextImpl( private val source: FlowSource, private val logic: FlowConsumerLogic ) : FlowConsumerContext { - /** - * The logging instance of this connection. - */ - private val logger = KotlinLogging.logger {} - /** * The capacity of the connection. */ -- cgit v1.2.3 From 5a0821e19eed87e91054289051213cb60b4235b4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 12 Jan 2022 23:17:33 +0100 Subject: refactor(simulator): Remove delta parameter from flow callbacks This change removes the delta parameter from the callbacks of the flow framework. This parameter was used to indicate the duration in time between the last call and the current call. However, its usefulness was limited since the actual delta values needed by implementors of this method had to be bridged across different flow callbacks. --- .../opendc/experiments/tf20/core/SimTFDevice.kt | 15 ++++++-- .../opendc/simulator/compute/SimAbstractMachine.kt | 5 +-- .../simulator/compute/SimBareMetalMachine.kt | 2 +- .../org/opendc/simulator/compute/device/SimPsu.kt | 6 +-- .../compute/kernel/SimAbstractHypervisor.kt | 9 ++++- .../opendc/simulator/compute/workload/SimTrace.kt | 2 +- .../compute/workload/SimWorkloadLifecycle.kt | 12 +++--- .../org/opendc/simulator/flow/FlowConsumer.kt | 12 +++--- .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 9 ++--- .../simulator/flow/FlowConvergenceListener.kt | 3 +- .../org/opendc/simulator/flow/FlowForwarder.kt | 43 +++++++++++----------- .../kotlin/org/opendc/simulator/flow/FlowMapper.kt | 12 +++--- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 19 ++++++---- .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 9 ++--- .../flow/internal/FlowConsumerContextImpl.kt | 37 ++++--------------- .../flow/mux/ForwardingFlowMultiplexer.kt | 17 ++------- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 19 ++++------ .../simulator/flow/source/FixedFlowSource.kt | 11 +++++- .../simulator/flow/source/FlowSourceRateAdapter.kt | 12 +++--- .../simulator/flow/source/TraceFlowSource.kt | 4 +- .../simulator/flow/FlowConsumerContextTest.kt | 8 ++-- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 38 ++++++++++--------- .../org/opendc/simulator/flow/FlowSinkTest.kt | 12 +++--- .../flow/mux/ForwardingFlowMultiplexerTest.kt | 2 +- .../org/opendc/simulator/network/SimNetworkSink.kt | 2 +- .../opendc/simulator/network/SimNetworkSinkTest.kt | 2 +- .../org/opendc/simulator/power/SimPduTest.kt | 2 +- .../opendc/simulator/power/SimPowerSourceTest.kt | 2 +- .../org/opendc/simulator/power/SimUpsTest.kt | 2 +- 29 files changed, 158 insertions(+), 170 deletions(-) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index c751463d..5245261c 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -120,6 +120,11 @@ public class SimTFDevice( */ private var activeWork: Work? = null + /** + * The timestamp of the last pull. + */ + private var lastPull: Long = 0L + override fun onStart(ctx: SimMachineContext) { for (cpu in ctx.cpus) { cpu.startConsumer(this) @@ -131,11 +136,15 @@ public class SimTFDevice( override fun onStart(conn: FlowConnection, now: Long) { ctx = conn capacity = conn.capacity - + lastPull = now conn.shouldSourceConverge = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) + val consumedWork = conn.rate * delta / 1000.0 capacity = conn.capacity @@ -164,7 +173,7 @@ public class SimTFDevice( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { _usage.record(conn.rate) _power.record(machine.psu.powerDraw) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 6a4c594d..e14ea507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral @@ -87,8 +86,8 @@ public abstract class SimAbstractMachine( _ctx?.close() } - override fun onConverge(now: Long, delta: Long) { - parent?.onConverge(now, delta) + override fun onConverge(now: Long) { + parent?.onConverge(now) } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 5df03d45..68792c72 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -81,7 +81,7 @@ public class SimBareMetalMachine( private var _lastConverge = Long.MAX_VALUE - override fun onConverge(now: Long, delta: Long) { + override fun onConverge(now: Long) { // Update the PSU stage psu.update() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 09defbb5..caff4dc3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -86,17 +86,17 @@ public class SimPsu( conn.shouldSourceConverge = true } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _ctx = null } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) conn.push(powerDraw) return Long.MAX_VALUE } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { _powerDraw = conn.rate } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index b7f70749..8e925bdf 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -134,9 +134,14 @@ public abstract class SimAbstractHypervisor( private var _cpuCount = 0 private var _cpuCapacity = 0.0 + private var _lastConverge = engine.clock.millis() /* FlowConvergenceListener */ - override fun onConverge(now: Long, delta: Long) { + override fun onConverge(now: Long) { + val lastConverge = _lastConverge + _lastConverge = now + val delta = now - lastConverge + if (delta > 0) { _counters.record() } @@ -146,7 +151,7 @@ public abstract class SimAbstractHypervisor( governor.onLimit(load) } - listener?.onConverge(now, delta) + listener?.onConverge(now) } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt index 4cf60605..207e8579 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -217,7 +217,7 @@ public class SimTrace( */ private var _idx = 0 - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val size = size val nowOffset = now - offset diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index 742470a1..46113bb0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -61,17 +61,17 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { delegate.onStart(conn, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + override fun onPull(conn: FlowConnection, now: Long): Long { + return delegate.onPull(conn, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { - delegate.onConverge(conn, now, delta) + override fun onConverge(conn: FlowConnection, now: Long) { + delegate.onConverge(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - delegate.onStop(conn, now, delta) + delegate.onStop(conn, now) } finally { complete(this) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index 4685a755..a49826f4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -92,9 +92,9 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - source.onStop(conn, now, delta) + source.onStop(conn, now) if (!cont.isCompleted) { cont.resume(Unit) @@ -105,18 +105,18 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return try { - source.onPull(conn, now, delta) + source.onPull(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - source.onConverge(conn, now, delta) + source.onConverge(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index 50fbc9c7..1d3adb10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -31,10 +31,9 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param rate The requested processing rate of the source. */ - public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} + public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {} /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -44,17 +43,15 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} + public fun onConverge(ctx: FlowConsumerContext, now: Long) {} /** * This method is invoked when the [FlowSource] completed or failed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param cause The cause of the failure or `null` if the source completed. */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt index d1afda6f..62cb10d1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt @@ -30,7 +30,6 @@ public interface FlowConvergenceListener { * This method is invoked when the system has converged to a steady-state. * * @param now The timestamp at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(now: Long, delta: Long) {} + public fun onConverge(now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index a5663293..0ad18f6a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -25,7 +25,11 @@ package org.opendc.simulator.flow import mu.KotlinLogging import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.max + +/** + * The logging instance of this connection. + */ +private val logger = KotlinLogging.logger {} /** * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. @@ -39,11 +43,6 @@ public class FlowForwarder( private val listener: FlowConvergenceListener? = null, private val isCoupled: Boolean = false ) : FlowSource, FlowConsumer, AutoCloseable { - /** - * The logging instance of this connection. - */ - private val logger = KotlinLogging.logger {} - /** * The delegate [FlowSource]. */ @@ -81,8 +80,6 @@ public class FlowForwarder( _innerCtx?.pull(now) } - @JvmField var lastPull = Long.MAX_VALUE - override fun push(rate: Double) { if (delegate == null) { return @@ -102,8 +99,7 @@ public class FlowForwarder( if (hasDelegateStarted) { val now = engine.clock.millis() - val delta = max(0, now - lastPull) - delegate.onStop(this, now, delta) + delegate.onStop(this, now) } } } @@ -163,7 +159,7 @@ public class FlowForwarder( } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _innerCtx = null val delegate = delegate @@ -171,25 +167,24 @@ public class FlowForwarder( reset() try { - delegate.onStop(this._ctx, now, delta) + delegate.onStop(this._ctx, now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } } } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = delegate if (!hasDelegateStarted) { start() } - _ctx.lastPull = now - updateCounters(conn, delta) + updateCounters(conn, now) return try { - delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE + delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -198,10 +193,10 @@ public class FlowForwarder( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate?.onConverge(this._ctx, now, delta) - listener?.onConverge(now, delta) + delegate?.onConverge(this._ctx, now) + listener?.onConverge(now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -217,8 +212,10 @@ public class FlowForwarder( val delegate = delegate ?: return try { - delegate.onStart(_ctx, engine.clock.millis()) + val now = engine.clock.millis() + delegate.onStart(_ctx, now) hasDelegateStarted = true + _lastUpdate = now } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } reset() @@ -242,11 +239,15 @@ public class FlowForwarder( * The requested flow rate. */ private var _demand: Double = 0.0 + private var _lastUpdate = 0L /** * Update the flow counters for the transformer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long) { + private fun updateCounters(ctx: FlowConnection, now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + val delta = now - lastUpdate if (delta <= 0) { return } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt index 6867bcef..af702701 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt @@ -45,20 +45,20 @@ public class FlowMapper( source.onStart(delegate, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { val delegate = checkNotNull(_conn) { "Invariant violation" } _conn = null - source.onStop(delegate, now, delta) + source.onStop(delegate, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = checkNotNull(_conn) { "Invariant violation" } - return source.onPull(delegate, now, delta) + return source.onPull(delegate, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { val delegate = _conn ?: return - source.onConverge(delegate, now, delta) + source.onConverge(delegate, now) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index e9094443..d0324ce8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -104,34 +104,39 @@ public class FlowSink( * [FlowConsumerLogic] of a sink. */ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { - updateCounters(ctx, delta, rate, ctx.capacity) + updateCounters(ctx, now, rate, ctx.capacity) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta, 0.0, 0.0) + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { + updateCounters(ctx, now, 0.0, 0.0) _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + override fun onConverge(ctx: FlowConsumerContext, now: Long) { + parent?.onConverge(now) } /** * The previous demand and capacity for the consumer. */ private val _previous = DoubleArray(2) + private var _previousUpdate = Long.MAX_VALUE /** * Update the counters of the flow consumer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) { + val previousUpdate = _previousUpdate + _previousUpdate = now + val delta = now - previousUpdate + val counters = counters val previous = _previous val demand = previous[0] diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 3a7e52aa..a48ac18e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -42,19 +42,17 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the source finished. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. */ - public fun onStop(conn: FlowConnection, now: Long, delta: Long) {} + public fun onStop(conn: FlowConnection, now: Long) {} /** * This method is invoked when the source is pulled. * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the pull is occurring. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. * @return The duration after which the resource consumer should be pulled again. */ - public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long + public fun onPull(conn: FlowConnection, now: Long): Long /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -64,7 +62,6 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {} + public fun onConverge(conn: FlowConnection, now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 1d39d61c..bc6bae71 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.internal import mu.KotlinLogging import org.opendc.simulator.flow.* import java.util.* -import kotlin.math.max import kotlin.math.min /** @@ -123,14 +122,6 @@ internal class FlowConsumerContextImpl( */ private var _flags: Int = 0 - /** - * The timestamp of calls to the callbacks. - */ - private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` - private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` - private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence` - private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence` - /** * The timers at which the context is scheduled to be interrupted. */ @@ -238,15 +229,11 @@ internal class FlowConsumerContextImpl( try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { - val lastPull = _lastPull - val delta = max(0, now - lastPull) - // Update state before calling into the outside world, so it observes a consistent state - _lastPull = now _flags = (flags and ConnPulled.inv()) or ConnUpdateActive hasUpdated = true - val duration = source.onPull(this, now, delta) + val duration = source.onPull(this, now) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -266,15 +253,11 @@ internal class FlowConsumerContextImpl( // Push to the consumer if the rate of the source has changed (after a call to `push`) if (flags and ConnPushed != 0) { - val lastPush = _lastPush - val delta = max(0, now - lastPush) - // Update state before calling into the outside world, so it observes a consistent state - _lastPush = now _flags = (flags and ConnPushed.inv()) or ConnUpdateActive hasUpdated = true - logic.onPush(this, now, delta, _demand) + logic.onPush(this, now, _demand) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -372,18 +355,12 @@ internal class FlowConsumerContextImpl( // Call the source converge callback if it has enabled convergence if (flags and ConnConvergeSource != 0) { - val delta = max(0, now - _lastSourceConvergence) - _lastSourceConvergence = now - - source.onConverge(this, now, delta) + source.onConverge(this, now) } // Call the consumer callback if it has enabled convergence if (flags and ConnConvergeConsumer != 0) { - val delta = max(0, now - _lastConsumerConvergence) - _lastConsumerConvergence = now - - logic.onConverge(this, now, delta) + logic.onConverge(this, now) } } catch (cause: Throwable) { // Invoke the finish callbacks @@ -403,7 +380,7 @@ internal class FlowConsumerContextImpl( */ private fun doStopSource(now: Long) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) doFinishConsumer(now, null) } catch (cause: Throwable) { doFinishConsumer(now, cause) @@ -415,7 +392,7 @@ internal class FlowConsumerContextImpl( */ private fun doFailSource(now: Long, cause: Throwable) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) } catch (e: Throwable) { e.addSuppressed(cause) doFinishConsumer(now, e) @@ -427,7 +404,7 @@ internal class FlowConsumerContextImpl( */ private fun doFinishConsumer(now: Long, cause: Throwable?) { try { - logic.onFinish(this, now, max(0, now - _lastPush), cause) + logic.onFinish(this, now, cause) } catch (e: Throwable) { e.addSuppressed(cause) logger.error(e) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 7fe0c1b7..1d7d22ef 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque -import kotlin.math.max /** * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means @@ -139,16 +138,8 @@ public class ForwardingFlowMultiplexer( override fun flushCounters(input: FlowConsumer) {} - private var _lastConverge = Long.MAX_VALUE - - override fun onConverge(now: Long, delta: Long) { - val listener = listener - if (listener != null) { - val lastConverge = _lastConverge - _lastConverge = now - val duration = max(0, now - lastConverge) - listener.onConverge(now, duration) - } + override fun onConverge(now: Long) { + listener?.onConverge(now) } /** @@ -167,9 +158,9 @@ public class ForwardingFlowMultiplexer( forwarder.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { forwarder.cancel() - forwarder.onStop(conn, now, delta) + forwarder.onStop(conn, now) } override fun toString(): String = "ForwardingFlowMultiplexer.Output" diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7ee4d326..cc831862 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -257,7 +257,7 @@ public class MaxMinFlowMultiplexer( _lastConverge = now _lastConvergeInput = input - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } } @@ -285,13 +285,11 @@ public class MaxMinFlowMultiplexer( * This method is invoked when one of the outputs converges. */ fun convergeOutput(output: Output, now: Long) { - val lastConverge = _lastConverge val parent = parent if (parent != null) { _lastConverge = now - - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } if (!output.isActive) { @@ -489,7 +487,7 @@ public class MaxMinFlowMultiplexer( val previousUpdate = _previousUpdate _previousUpdate = now - val delta = (now - previousUpdate).coerceAtLeast(0) + val delta = now - previousUpdate if (delta <= 0) { return } @@ -647,7 +645,6 @@ public class MaxMinFlowMultiplexer( override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { doUpdateCounters(now) @@ -660,7 +657,7 @@ public class MaxMinFlowMultiplexer( scheduler.trigger(now) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { doUpdateCounters(now) limit = 0.0 @@ -672,7 +669,7 @@ public class MaxMinFlowMultiplexer( _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onConverge(ctx: FlowConsumerContext, now: Long) { scheduler.convergeInput(this, now) } @@ -777,7 +774,7 @@ public class MaxMinFlowMultiplexer( scheduler.registerOutput(this) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _conn = null capacity = 0.0 isActive = false @@ -785,7 +782,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterOutput(this, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val capacity = capacity if (capacity != conn.capacity) { this.capacity = capacity @@ -808,7 +805,7 @@ public class MaxMinFlowMultiplexer( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { if (_isActivationOutput) { scheduler.convergeOutput(this, now) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt index d9779c6a..6cfcc82c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -37,8 +37,17 @@ public class FixedFlowSource(private val amount: Double, private val utilization } private var remainingAmount = amount + private var lastPull: Long = 0L + + override fun onStart(conn: FlowConnection, now: Long) { + lastPull = now + } + + override fun onPull(conn: FlowConnection, now: Long): Long { + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val consumed = conn.rate * delta / 1000.0 val limit = conn.capacity * utilization diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 6dd60d95..80127fb5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -53,21 +53,21 @@ public class FlowSourceRateAdapter( delegate.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - delegate.onStop(conn, now, delta) + delegate.onStop(conn, now) } finally { rate = 0.0 } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + override fun onPull(conn: FlowConnection, now: Long): Long { + return delegate.onPull(conn, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate.onConverge(conn, now, delta) + delegate.onConverge(conn, now) } finally { rate = conn.rate } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index ae537845..c9a52128 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -37,11 +37,11 @@ public class TraceFlowSource(private val trace: Sequence) : FlowSource _iterator = trace.iterator() } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _iterator = null } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget if (nextTarget > now) { diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index fe39eb2c..e7b25554 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -36,7 +36,7 @@ class FlowConsumerContextTest { fun testFlushWithoutCommand() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -57,7 +57,7 @@ class FlowConsumerContextTest { fun testDoubleStart() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(0.0) 1000 @@ -82,7 +82,7 @@ class FlowConsumerContextTest { fun testIdempotentCapacityChange() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -99,6 +99,6 @@ class FlowConsumerContextTest { context.start() context.capacity = 4200.0 - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 1) { consumer.onPull(any(), any()) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 12e72b8f..8b090593 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -45,7 +45,7 @@ internal class FlowForwarderTest { launch { source.consume(forwarder) } forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -66,7 +66,7 @@ internal class FlowForwarderTest { forwarder.consume(object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -87,7 +87,7 @@ internal class FlowForwarderTest { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -113,7 +113,7 @@ internal class FlowForwarderTest { val forwarder = FlowForwarder(engine) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onStop(any(), any(), any()) } + verify(exactly = 0) { consumer.onStop(any(), any()) } } @Test @@ -140,7 +140,7 @@ internal class FlowForwarderTest { forwarder.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -158,7 +158,7 @@ internal class FlowForwarderTest { source.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -168,7 +168,7 @@ internal class FlowForwarderTest { val source = FlowSink(engine, 2000.0) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -198,7 +198,7 @@ internal class FlowForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { source.onPull(any(), any(), any()) } + verify(exactly = 1) { source.onPull(any(), any()) } } @Test @@ -214,11 +214,13 @@ internal class FlowForwarderTest { yield() - assertEquals(2.0, source.counters.actual) - assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } - assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } - assertEquals(2000, clock.millis()) + assertAll( + { assertEquals(2.0, source.counters.actual) }, + { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } }, + { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } }, + { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } }, + { assertEquals(2000, clock.millis()) } + ) } @Test @@ -246,7 +248,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { throw IllegalStateException("Test") } }) @@ -269,7 +271,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } @@ -301,11 +303,11 @@ internal class FlowForwarderTest { conn.shouldSourceConverge = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { throw IllegalStateException("Test") } }) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 70c75864..726ddbf7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -65,7 +65,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 3) { consumer.onPull(any(), any(), any()) } + verify(exactly = 3) { consumer.onPull(any(), any()) } } @Test @@ -95,7 +95,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowSinkTest { resCtx = conn } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -154,7 +154,7 @@ internal class FlowSinkTest { throw IllegalStateException("Hi") } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } } @@ -173,7 +173,7 @@ internal class FlowSinkTest { val consumer = object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -231,7 +231,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE } provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt index 187dacd9..ef15f711 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -112,7 +112,7 @@ internal class ForwardingFlowMultiplexerTest { isFirst = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index 4b0d7bbd..675ac1c3 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -32,7 +32,7 @@ public class SimNetworkSink( public val capacity: Double ) : SimNetworkPort() { override fun createConsumer(): FlowSource = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE override fun toString(): String = "SimNetworkSink.Consumer" } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index 14d22162..2e6983c8 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -124,7 +124,7 @@ class SimNetworkSinkTest { assertFalse(sink.isConnected) assertFalse(source.isConnected) - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } private class Source(engine: FlowEngine) : SimNetworkPort() { diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt index eb823eb1..7cc4b801 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt @@ -85,7 +85,7 @@ internal class SimPduTest { outlet.connect(inlet) outlet.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt index 76142103..4f319e65 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt @@ -85,7 +85,7 @@ internal class SimPowerSourceTest { source.connect(inlet) source.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt index a764a368..e19e72fa 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt @@ -92,7 +92,7 @@ internal class SimUpsTest { ups.connect(inlet) ups.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } class SimpleInlet : SimPowerInlet() { -- cgit v1.2.3