diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-02-19 11:32:38 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-02-19 11:32:38 +0100 |
| commit | a413367c039e1cbb640cbd7087068d78140b373f (patch) | |
| tree | 3737158b657c8cb2f102621f90d7f5dedddaba77 | |
| parent | 0cba027933e19254573f2488086db3e4660f93d2 (diff) | |
| parent | 5a0821e19eed87e91054289051213cb60b4235b4 (diff) | |
merge: Backport generic changes from Radice branch
This pull request backports changes from the Radice branch that are not related to Radice itself.
## Implementation Notes :hammer_and_pick:
* Adjust CPU capacity to number of vCPUs
* Fix reporting of CPU time
* Flush results before accessing counters
* Move logger field out of class
* Remove delta parameter from flow callbacks
## Breaking API Changes :warning:
* `delta` parameter from callbacks in `FlowSource` is removed.
37 files changed, 242 insertions, 200 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 43f33f27..4eb6392e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -418,6 +418,7 @@ public class SimHost( */ private fun collectCpuTime(result: ObservableLongMeasurement) { val counters = hypervisor.counters + counters.flush() result.record(counters.cpuActiveTime / 1000L, _activeState) result.record(counters.cpuIdleTime / 1000L, _idleState) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index f49c2824..bb378ee3 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -272,6 +272,7 @@ internal class Guest( */ fun collectCpuTime(result: ObservableLongMeasurement) { val counters = machine.counters + counters.flush() result.record(counters.cpuActiveTime / 1000, _activeState) result.record(counters.cpuIdleTime / 1000, _idleState) diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index dd13b60c..f0325023 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -173,7 +173,7 @@ internal class SimHostTest { assertAll( { assertEquals(658, activeTime, "Active time does not match") }, - { assertEquals(1741, idleTime, "Idle time does not match") }, + { assertEquals(2341, idleTime, "Idle time does not match") }, { assertEquals(637, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) @@ -278,7 +278,7 @@ internal class SimHostTest { meterProvider.close() assertAll( - { assertEquals(1175, idleTime, "Idle time does not match") }, + { assertEquals(1775, idleTime, "Idle time does not match") }, { assertEquals(624, activeTime, "Active time does not match") }, { assertEquals(900001, uptime, "Uptime does not match") }, { assertEquals(300000, downtime, "Downtime does not match") }, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index aefd8304..891fc8be 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -117,7 +117,7 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, @@ -166,7 +166,7 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, @@ -218,10 +218,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(468522, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -263,7 +263,7 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index c751463d..5245261c 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -120,6 +120,11 @@ public class SimTFDevice( */ private var activeWork: Work? = null + /** + * The timestamp of the last pull. + */ + private var lastPull: Long = 0L + override fun onStart(ctx: SimMachineContext) { for (cpu in ctx.cpus) { cpu.startConsumer(this) @@ -131,11 +136,15 @@ public class SimTFDevice( override fun onStart(conn: FlowConnection, now: Long) { ctx = conn capacity = conn.capacity - + lastPull = now conn.shouldSourceConverge = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) + val consumedWork = conn.rate * delta / 1000.0 capacity = conn.capacity @@ -164,7 +173,7 @@ public class SimTFDevice( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { _usage.record(conn.rate) _power.record(machine.psu.powerDraw) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 6a4c594d..e14ea507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral @@ -87,8 +86,8 @@ public abstract class SimAbstractMachine( _ctx?.close() } - override fun onConverge(now: Long, delta: Long) { - parent?.onConverge(now, delta) + override fun onConverge(now: Long) { + parent?.onConverge(now) } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 5df03d45..68792c72 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -81,7 +81,7 @@ public class SimBareMetalMachine( private var _lastConverge = Long.MAX_VALUE - override fun onConverge(now: Long, delta: Long) { + override fun onConverge(now: Long) { // Update the PSU stage psu.update() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 09defbb5..caff4dc3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -86,17 +86,17 @@ public class SimPsu( conn.shouldSourceConverge = true } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _ctx = null } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) conn.push(powerDraw) return Long.MAX_VALUE } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { _powerDraw = conn.rate } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 07465126..8e925bdf 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -134,17 +134,24 @@ public abstract class SimAbstractHypervisor( private var _cpuCount = 0 private var _cpuCapacity = 0.0 + private var _lastConverge = engine.clock.millis() /* FlowConvergenceListener */ - override fun onConverge(now: Long, delta: Long) { - _counters.record() + override fun onConverge(now: Long) { + val lastConverge = _lastConverge + _lastConverge = now + val delta = now - lastConverge + + if (delta > 0) { + _counters.record() + } val load = cpuDemand / cpuCapacity for (governor in governors) { governor.onLimit(load) } - listener?.onConverge(now, delta) + listener?.onConverge(now) } /** @@ -274,6 +281,10 @@ public abstract class SimAbstractHypervisor( fun close() { switch.removeInput(source) } + + fun flush() { + switch.flushCounters(source) + } } /** @@ -337,6 +348,11 @@ public abstract class SimAbstractHypervisor( cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() cpuTime[3] += (interferenceDelta * d).roundToLong() } + + override fun flush() { + hv.mux.flushCounters() + record() + } } /** @@ -348,10 +364,16 @@ public abstract class SimAbstractHypervisor( override val cpuActiveTime: Long get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong() override val cpuStealTime: Long get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() override val cpuLostTime: Long get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + + override fun flush() { + for (cpu in cpus) { + cpu.flush() + } + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt index 030d9c5f..63fee507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt @@ -45,4 +45,9 @@ public interface SimHypervisorCounters { * The amount of CPU time (in milliseconds) that was lost due to interference between virtual machines. */ public val cpuLostTime: Long + + /** + * Flush the counter values. + */ + public fun flush() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt index 4cf60605..207e8579 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -217,7 +217,7 @@ public class SimTrace( */ private var _idx = 0 - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val size = size val nowOffset = now - offset diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index 742470a1..46113bb0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -61,17 +61,17 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { delegate.onStart(conn, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + override fun onPull(conn: FlowConnection, now: Long): Long { + return delegate.onPull(conn, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { - delegate.onConverge(conn, now, delta) + override fun onConverge(conn: FlowConnection, now: Long) { + delegate.onConverge(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - delegate.onStop(conn, now, delta) + delegate.onStop(conn, now) } finally { complete(this) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index 4685a755..a49826f4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -92,9 +92,9 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - source.onStop(conn, now, delta) + source.onStop(conn, now) if (!cont.isCompleted) { cont.resume(Unit) @@ -105,18 +105,18 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return try { - source.onPull(conn, now, delta) + source.onPull(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - source.onConverge(conn, now, delta) + source.onConverge(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index 50fbc9c7..1d3adb10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -31,10 +31,9 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param rate The requested processing rate of the source. */ - public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} + public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {} /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -44,17 +43,15 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} + public fun onConverge(ctx: FlowConsumerContext, now: Long) {} /** * This method is invoked when the [FlowSource] completed or failed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param cause The cause of the failure or `null` if the source completed. */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt index d1afda6f..62cb10d1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt @@ -30,7 +30,6 @@ public interface FlowConvergenceListener { * This method is invoked when the system has converged to a steady-state. * * @param now The timestamp at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(now: Long, delta: Long) {} + public fun onConverge(now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index a5663293..0ad18f6a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -25,7 +25,11 @@ package org.opendc.simulator.flow import mu.KotlinLogging import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.max + +/** + * The logging instance of this connection. + */ +private val logger = KotlinLogging.logger {} /** * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. @@ -40,11 +44,6 @@ public class FlowForwarder( private val isCoupled: Boolean = false ) : FlowSource, FlowConsumer, AutoCloseable { /** - * The logging instance of this connection. - */ - private val logger = KotlinLogging.logger {} - - /** * The delegate [FlowSource]. */ private var delegate: FlowSource? = null @@ -81,8 +80,6 @@ public class FlowForwarder( _innerCtx?.pull(now) } - @JvmField var lastPull = Long.MAX_VALUE - override fun push(rate: Double) { if (delegate == null) { return @@ -102,8 +99,7 @@ public class FlowForwarder( if (hasDelegateStarted) { val now = engine.clock.millis() - val delta = max(0, now - lastPull) - delegate.onStop(this, now, delta) + delegate.onStop(this, now) } } } @@ -163,7 +159,7 @@ public class FlowForwarder( } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _innerCtx = null val delegate = delegate @@ -171,25 +167,24 @@ public class FlowForwarder( reset() try { - delegate.onStop(this._ctx, now, delta) + delegate.onStop(this._ctx, now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } } } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = delegate if (!hasDelegateStarted) { start() } - _ctx.lastPull = now - updateCounters(conn, delta) + updateCounters(conn, now) return try { - delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE + delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -198,10 +193,10 @@ public class FlowForwarder( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate?.onConverge(this._ctx, now, delta) - listener?.onConverge(now, delta) + delegate?.onConverge(this._ctx, now) + listener?.onConverge(now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -217,8 +212,10 @@ public class FlowForwarder( val delegate = delegate ?: return try { - delegate.onStart(_ctx, engine.clock.millis()) + val now = engine.clock.millis() + delegate.onStart(_ctx, now) hasDelegateStarted = true + _lastUpdate = now } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } reset() @@ -242,11 +239,15 @@ public class FlowForwarder( * The requested flow rate. */ private var _demand: Double = 0.0 + private var _lastUpdate = 0L /** * Update the flow counters for the transformer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long) { + private fun updateCounters(ctx: FlowConnection, now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + val delta = now - lastUpdate if (delta <= 0) { return } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt index 6867bcef..af702701 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt @@ -45,20 +45,20 @@ public class FlowMapper( source.onStart(delegate, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { val delegate = checkNotNull(_conn) { "Invariant violation" } _conn = null - source.onStop(delegate, now, delta) + source.onStop(delegate, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = checkNotNull(_conn) { "Invariant violation" } - return source.onPull(delegate, now, delta) + return source.onPull(delegate, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { val delegate = _conn ?: return - source.onConverge(delegate, now, delta) + source.onConverge(delegate, now) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index e9094443..d0324ce8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -104,34 +104,39 @@ public class FlowSink( * [FlowConsumerLogic] of a sink. */ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { - updateCounters(ctx, delta, rate, ctx.capacity) + updateCounters(ctx, now, rate, ctx.capacity) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta, 0.0, 0.0) + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { + updateCounters(ctx, now, 0.0, 0.0) _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + override fun onConverge(ctx: FlowConsumerContext, now: Long) { + parent?.onConverge(now) } /** * The previous demand and capacity for the consumer. */ private val _previous = DoubleArray(2) + private var _previousUpdate = Long.MAX_VALUE /** * Update the counters of the flow consumer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) { + val previousUpdate = _previousUpdate + _previousUpdate = now + val delta = now - previousUpdate + val counters = counters val previous = _previous val demand = previous[0] diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 3a7e52aa..a48ac18e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -42,19 +42,17 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the source finished. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. */ - public fun onStop(conn: FlowConnection, now: Long, delta: Long) {} + public fun onStop(conn: FlowConnection, now: Long) {} /** * This method is invoked when the source is pulled. * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the pull is occurring. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. * @return The duration after which the resource consumer should be pulled again. */ - public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long + public fun onPull(conn: FlowConnection, now: Long): Long /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -64,7 +62,6 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {} + public fun onConverge(conn: FlowConnection, now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 58ca918b..bc6bae71 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -25,10 +25,14 @@ package org.opendc.simulator.flow.internal import mu.KotlinLogging import org.opendc.simulator.flow.* import java.util.* -import kotlin.math.max import kotlin.math.min /** + * The logging instance of this connection. + */ +private val logger = KotlinLogging.logger {} + +/** * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. */ internal class FlowConsumerContextImpl( @@ -37,11 +41,6 @@ internal class FlowConsumerContextImpl( private val logic: FlowConsumerLogic ) : FlowConsumerContext { /** - * The logging instance of this connection. - */ - private val logger = KotlinLogging.logger {} - - /** * The capacity of the connection. */ override var capacity: Double @@ -124,14 +123,6 @@ internal class FlowConsumerContextImpl( private var _flags: Int = 0 /** - * The timestamp of calls to the callbacks. - */ - private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` - private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` - private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence` - private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence` - - /** * The timers at which the context is scheduled to be interrupted. */ private var _timer: Long = Long.MAX_VALUE @@ -238,15 +229,11 @@ internal class FlowConsumerContextImpl( try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { - val lastPull = _lastPull - val delta = max(0, now - lastPull) - // Update state before calling into the outside world, so it observes a consistent state - _lastPull = now _flags = (flags and ConnPulled.inv()) or ConnUpdateActive hasUpdated = true - val duration = source.onPull(this, now, delta) + val duration = source.onPull(this, now) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -266,15 +253,11 @@ internal class FlowConsumerContextImpl( // Push to the consumer if the rate of the source has changed (after a call to `push`) if (flags and ConnPushed != 0) { - val lastPush = _lastPush - val delta = max(0, now - lastPush) - // Update state before calling into the outside world, so it observes a consistent state - _lastPush = now _flags = (flags and ConnPushed.inv()) or ConnUpdateActive hasUpdated = true - logic.onPush(this, now, delta, _demand) + logic.onPush(this, now, _demand) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -372,18 +355,12 @@ internal class FlowConsumerContextImpl( // Call the source converge callback if it has enabled convergence if (flags and ConnConvergeSource != 0) { - val delta = max(0, now - _lastSourceConvergence) - _lastSourceConvergence = now - - source.onConverge(this, now, delta) + source.onConverge(this, now) } // Call the consumer callback if it has enabled convergence if (flags and ConnConvergeConsumer != 0) { - val delta = max(0, now - _lastConsumerConvergence) - _lastConsumerConvergence = now - - logic.onConverge(this, now, delta) + logic.onConverge(this, now) } } catch (cause: Throwable) { // Invoke the finish callbacks @@ -403,7 +380,7 @@ internal class FlowConsumerContextImpl( */ private fun doStopSource(now: Long) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) doFinishConsumer(now, null) } catch (cause: Throwable) { doFinishConsumer(now, cause) @@ -415,7 +392,7 @@ internal class FlowConsumerContextImpl( */ private fun doFailSource(now: Long, cause: Throwable) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) } catch (e: Throwable) { e.addSuppressed(cause) doFinishConsumer(now, e) @@ -427,7 +404,7 @@ internal class FlowConsumerContextImpl( */ private fun doFinishConsumer(now: Long, cause: Throwable?) { try { - logic.onFinish(this, now, max(0, now - _lastPush), cause) + logic.onFinish(this, now, cause) } catch (e: Throwable) { e.addSuppressed(cause) logger.error(e) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index a7877546..5f198944 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -105,4 +105,14 @@ public interface FlowMultiplexer { * Clear the outputs of the multiplexer. */ public fun clearOutputs() + + /** + * Flush the counters of the multiplexer. + */ + public fun flushCounters() + + /** + * Flush the counters of the specified [input]. + */ + public fun flushCounters(input: FlowConsumer) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index eff111b8..1d7d22ef 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque -import kotlin.math.max /** * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means @@ -135,16 +134,12 @@ public class ForwardingFlowMultiplexer( clearInputs() } - private var _lastConverge = Long.MAX_VALUE + override fun flushCounters() {} - override fun onConverge(now: Long, delta: Long) { - val listener = listener - if (listener != null) { - val lastConverge = _lastConverge - _lastConverge = now - val duration = max(0, now - lastConverge) - listener.onConverge(now, duration) - } + override fun flushCounters(input: FlowConsumer) {} + + override fun onConverge(now: Long) { + listener?.onConverge(now) } /** @@ -163,9 +158,9 @@ public class ForwardingFlowMultiplexer( forwarder.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { forwarder.cancel() - forwarder.onStop(conn, now, delta) + forwarder.onStop(conn, now) } override fun toString(): String = "ForwardingFlowMultiplexer.Output" diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 3d26efda..cc831862 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -141,6 +141,14 @@ public class MaxMinFlowMultiplexer( clearInputs() } + override fun flushCounters() { + scheduler.updateCounters(engine.clock.millis()) + } + + override fun flushCounters(input: FlowConsumer) { + (input as Input).doUpdateCounters(engine.clock.millis()) + } + /** * Helper class containing the scheduler state. */ @@ -189,7 +197,6 @@ public class MaxMinFlowMultiplexer( * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false - private var _lastSchedulerCycle = Long.MAX_VALUE /** * The last convergence timestamp and the input. @@ -250,7 +257,7 @@ public class MaxMinFlowMultiplexer( _lastConverge = now _lastConvergeInput = input - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } } @@ -278,13 +285,11 @@ public class MaxMinFlowMultiplexer( * This method is invoked when one of the outputs converges. */ fun convergeOutput(output: Output, now: Long) { - val lastConverge = _lastConverge val parent = parent if (parent != null) { _lastConverge = now - - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } if (!output.isActive) { @@ -324,14 +329,9 @@ public class MaxMinFlowMultiplexer( * Synchronously run the scheduler of the multiplexer. */ fun runScheduler(now: Long): Long { - val lastSchedulerCycle = _lastSchedulerCycle - _lastSchedulerCycle = now - - val delta = max(0, now - lastSchedulerCycle) - return try { _schedulerActive = true - doRunScheduler(now, delta) + doRunScheduler(now) } finally { _schedulerActive = false } @@ -384,14 +384,14 @@ public class MaxMinFlowMultiplexer( * * @return The deadline after which a new scheduling cycle should start. */ - private fun doRunScheduler(now: Long, delta: Long): Long { + private fun doRunScheduler(now: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs var inputArray = _inputArray var inputSize = _inputArray.size // Update the counters of the scheduler - updateCounters(delta) + updateCounters(now) // If there is no work yet, mark the inputs as idle. if (inputSize == 0) { @@ -475,14 +475,19 @@ public class MaxMinFlowMultiplexer( * The previous capacity of the multiplexer. */ private var _previousCapacity = 0.0 + private var _previousUpdate = Long.MIN_VALUE /** * Update the counters of the scheduler. */ - private fun updateCounters(delta: Long) { + fun updateCounters(now: Long) { val previousCapacity = _previousCapacity _previousCapacity = capacity + val previousUpdate = _previousUpdate + _previousUpdate = now + + val delta = now - previousUpdate if (delta <= 0) { return } @@ -640,10 +645,9 @@ public class MaxMinFlowMultiplexer( override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { - doUpdateCounters(delta) + doUpdateCounters(now) val allowed = min(rate, capacity) limit = rate @@ -653,8 +657,8 @@ public class MaxMinFlowMultiplexer( scheduler.trigger(now) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - doUpdateCounters(delta) + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { + doUpdateCounters(now) limit = 0.0 actualRate = 0.0 @@ -665,7 +669,7 @@ public class MaxMinFlowMultiplexer( _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onConverge(ctx: FlowConsumerContext, now: Long) { scheduler.convergeInput(this, now) } @@ -673,9 +677,18 @@ public class MaxMinFlowMultiplexer( override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) /** + * The timestamp that the counters where last updated. + */ + private var _lastUpdate = Long.MIN_VALUE + + /** * Helper method to update the flow counters of the multiplexer. */ - private fun doUpdateCounters(delta: Long) { + fun doUpdateCounters(now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + + val delta = (now - lastUpdate).coerceAtLeast(0) if (delta <= 0L) { return } @@ -761,7 +774,7 @@ public class MaxMinFlowMultiplexer( scheduler.registerOutput(this) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _conn = null capacity = 0.0 isActive = false @@ -769,7 +782,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterOutput(this, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val capacity = capacity if (capacity != conn.capacity) { this.capacity = capacity @@ -792,7 +805,7 @@ public class MaxMinFlowMultiplexer( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { if (_isActivationOutput) { scheduler.convergeOutput(this, now) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt index d9779c6a..6cfcc82c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -37,8 +37,17 @@ public class FixedFlowSource(private val amount: Double, private val utilization } private var remainingAmount = amount + private var lastPull: Long = 0L + + override fun onStart(conn: FlowConnection, now: Long) { + lastPull = now + } + + override fun onPull(conn: FlowConnection, now: Long): Long { + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val consumed = conn.rate * delta / 1000.0 val limit = conn.capacity * utilization diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 6dd60d95..80127fb5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -53,21 +53,21 @@ public class FlowSourceRateAdapter( delegate.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - delegate.onStop(conn, now, delta) + delegate.onStop(conn, now) } finally { rate = 0.0 } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + override fun onPull(conn: FlowConnection, now: Long): Long { + return delegate.onPull(conn, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate.onConverge(conn, now, delta) + delegate.onConverge(conn, now) } finally { rate = conn.rate } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index ae537845..c9a52128 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -37,11 +37,11 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource _iterator = trace.iterator() } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _iterator = null } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget if (nextTarget > now) { diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index fe39eb2c..e7b25554 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -36,7 +36,7 @@ class FlowConsumerContextTest { fun testFlushWithoutCommand() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -57,7 +57,7 @@ class FlowConsumerContextTest { fun testDoubleStart() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(0.0) 1000 @@ -82,7 +82,7 @@ class FlowConsumerContextTest { fun testIdempotentCapacityChange() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -99,6 +99,6 @@ class FlowConsumerContextTest { context.start() context.capacity = 4200.0 - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 1) { consumer.onPull(any(), any()) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 12e72b8f..8b090593 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -45,7 +45,7 @@ internal class FlowForwarderTest { launch { source.consume(forwarder) } forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -66,7 +66,7 @@ internal class FlowForwarderTest { forwarder.consume(object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -87,7 +87,7 @@ internal class FlowForwarderTest { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -113,7 +113,7 @@ internal class FlowForwarderTest { val forwarder = FlowForwarder(engine) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onStop(any(), any(), any()) } + verify(exactly = 0) { consumer.onStop(any(), any()) } } @Test @@ -140,7 +140,7 @@ internal class FlowForwarderTest { forwarder.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -158,7 +158,7 @@ internal class FlowForwarderTest { source.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -168,7 +168,7 @@ internal class FlowForwarderTest { val source = FlowSink(engine, 2000.0) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -198,7 +198,7 @@ internal class FlowForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { source.onPull(any(), any(), any()) } + verify(exactly = 1) { source.onPull(any(), any()) } } @Test @@ -214,11 +214,13 @@ internal class FlowForwarderTest { yield() - assertEquals(2.0, source.counters.actual) - assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } - assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } - assertEquals(2000, clock.millis()) + assertAll( + { assertEquals(2.0, source.counters.actual) }, + { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } }, + { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } }, + { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } }, + { assertEquals(2000, clock.millis()) } + ) } @Test @@ -246,7 +248,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { throw IllegalStateException("Test") } }) @@ -269,7 +271,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } @@ -301,11 +303,11 @@ internal class FlowForwarderTest { conn.shouldSourceConverge = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { throw IllegalStateException("Test") } }) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 70c75864..726ddbf7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -65,7 +65,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 3) { consumer.onPull(any(), any(), any()) } + verify(exactly = 3) { consumer.onPull(any(), any()) } } @Test @@ -95,7 +95,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowSinkTest { resCtx = conn } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -154,7 +154,7 @@ internal class FlowSinkTest { throw IllegalStateException("Hi") } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } } @@ -173,7 +173,7 @@ internal class FlowSinkTest { val consumer = object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -231,7 +231,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE } provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt index 187dacd9..ef15f711 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -112,7 +112,7 @@ internal class ForwardingFlowMultiplexerTest { isFirst = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index 4b0d7bbd..675ac1c3 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -32,7 +32,7 @@ public class SimNetworkSink( public val capacity: Double ) : SimNetworkPort() { override fun createConsumer(): FlowSource = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE override fun toString(): String = "SimNetworkSink.Consumer" } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index 14d22162..2e6983c8 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -124,7 +124,7 @@ class SimNetworkSinkTest { assertFalse(sink.isConnected) assertFalse(source.isConnected) - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } private class Source(engine: FlowEngine) : SimNetworkPort() { diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt index eb823eb1..7cc4b801 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt @@ -85,7 +85,7 @@ internal class SimPduTest { outlet.connect(inlet) outlet.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt index 76142103..4f319e65 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt @@ -85,7 +85,7 @@ internal class SimPowerSourceTest { source.connect(inlet) source.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt index a764a368..e19e72fa 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt @@ -92,7 +92,7 @@ internal class SimUpsTest { ups.connect(inlet) ups.disconnect() - verify { consumer.onStop(any(), any(), any()) } + verify { consumer.onStop(any(), any()) } } class SimpleInlet : SimPowerInlet() { diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index 418dc201..9557f680 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -493,10 +493,10 @@ public class ComputeMetricAggregator { fun reset() { previousUptime = _uptime previousDowntime = _downtime - previousCpuActiveTime = cpuActiveTime - previousCpuIdleTime = cpuIdleTime - previousCpuStealTime = cpuStealTime - previousCpuLostTime = cpuLostTime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime _host = null _cpuLimit = 0.0 diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 69fc79bb..5088b044 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -426,7 +426,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val id = reader.get(idCol) as String val resource = selected[id] ?: continue - val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz + val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() val delta = (timestamp - state.time) |
