summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-19 11:32:38 +0100
committerGitHub <noreply@github.com>2022-02-19 11:32:38 +0100
commita413367c039e1cbb640cbd7087068d78140b373f (patch)
tree3737158b657c8cb2f102621f90d7f5dedddaba77
parent0cba027933e19254573f2488086db3e4660f93d2 (diff)
parent5a0821e19eed87e91054289051213cb60b4235b4 (diff)
merge: Backport generic changes from Radice branch
This pull request backports changes from the Radice branch that are not related to Radice itself. ## Implementation Notes :hammer_and_pick: * Adjust CPU capacity to number of vCPUs * Fix reporting of CPU time * Flush results before accessing counters * Move logger field out of class * Remove delta parameter from flow callbacks ## Breaking API Changes :warning: * `delta` parameter from callbacks in `FlowSource` is removed.
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt1
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt10
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt15
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt30
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt43
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt47
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt59
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt11
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt2
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt8
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt2
37 files changed, 242 insertions, 200 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 43f33f27..4eb6392e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -418,6 +418,7 @@ public class SimHost(
*/
private fun collectCpuTime(result: ObservableLongMeasurement) {
val counters = hypervisor.counters
+ counters.flush()
result.record(counters.cpuActiveTime / 1000L, _activeState)
result.record(counters.cpuIdleTime / 1000L, _idleState)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index f49c2824..bb378ee3 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -272,6 +272,7 @@ internal class Guest(
*/
fun collectCpuTime(result: ObservableLongMeasurement) {
val counters = machine.counters
+ counters.flush()
result.record(counters.cpuActiveTime / 1000, _activeState)
result.record(counters.cpuIdleTime / 1000, _idleState)
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index dd13b60c..f0325023 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -173,7 +173,7 @@ internal class SimHostTest {
assertAll(
{ assertEquals(658, activeTime, "Active time does not match") },
- { assertEquals(1741, idleTime, "Idle time does not match") },
+ { assertEquals(2341, idleTime, "Idle time does not match") },
{ assertEquals(637, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
@@ -278,7 +278,7 @@ internal class SimHostTest {
meterProvider.close()
assertAll(
- { assertEquals(1175, idleTime, "Idle time does not match") },
+ { assertEquals(1775, idleTime, "Idle time does not match") },
{ assertEquals(624, activeTime, "Active time does not match") },
{ assertEquals(900001, uptime, "Uptime does not match") },
{ assertEquals(300000, downtime, "Downtime does not match") },
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index aefd8304..891fc8be 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -117,7 +117,7 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
{ assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
{ assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
@@ -166,7 +166,7 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
@@ -218,10 +218,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(468522, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -263,7 +263,7 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index c751463d..5245261c 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -120,6 +120,11 @@ public class SimTFDevice(
*/
private var activeWork: Work? = null
+ /**
+ * The timestamp of the last pull.
+ */
+ private var lastPull: Long = 0L
+
override fun onStart(ctx: SimMachineContext) {
for (cpu in ctx.cpus) {
cpu.startConsumer(this)
@@ -131,11 +136,15 @@ public class SimTFDevice(
override fun onStart(conn: FlowConnection, now: Long) {
ctx = conn
capacity = conn.capacity
-
+ lastPull = now
conn.shouldSourceConverge = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ val lastPull = lastPull
+ this.lastPull = now
+ val delta = (now - lastPull).coerceAtLeast(0)
+
val consumedWork = conn.rate * delta / 1000.0
capacity = conn.capacity
@@ -164,7 +173,7 @@ public class SimTFDevice(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
_usage.record(conn.rate)
_power.record(machine.psu.powerDraw)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 6a4c594d..e14ea507 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.simulator.compute.device.SimNetworkAdapter
import org.opendc.simulator.compute.device.SimPeripheral
@@ -87,8 +86,8 @@ public abstract class SimAbstractMachine(
_ctx?.close()
}
- override fun onConverge(now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ override fun onConverge(now: Long) {
+ parent?.onConverge(now)
}
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 5df03d45..68792c72 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -81,7 +81,7 @@ public class SimBareMetalMachine(
private var _lastConverge = Long.MAX_VALUE
- override fun onConverge(now: Long, delta: Long) {
+ override fun onConverge(now: Long) {
// Update the PSU stage
psu.update()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index 09defbb5..caff4dc3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -86,17 +86,17 @@ public class SimPsu(
conn.shouldSourceConverge = true
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_ctx = null
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0)
conn.push(powerDraw)
return Long.MAX_VALUE
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
_powerDraw = conn.rate
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index 07465126..8e925bdf 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -134,17 +134,24 @@ public abstract class SimAbstractHypervisor(
private var _cpuCount = 0
private var _cpuCapacity = 0.0
+ private var _lastConverge = engine.clock.millis()
/* FlowConvergenceListener */
- override fun onConverge(now: Long, delta: Long) {
- _counters.record()
+ override fun onConverge(now: Long) {
+ val lastConverge = _lastConverge
+ _lastConverge = now
+ val delta = now - lastConverge
+
+ if (delta > 0) {
+ _counters.record()
+ }
val load = cpuDemand / cpuCapacity
for (governor in governors) {
governor.onLimit(load)
}
- listener?.onConverge(now, delta)
+ listener?.onConverge(now)
}
/**
@@ -274,6 +281,10 @@ public abstract class SimAbstractHypervisor(
fun close() {
switch.removeInput(source)
}
+
+ fun flush() {
+ switch.flushCounters(source)
+ }
}
/**
@@ -337,6 +348,11 @@ public abstract class SimAbstractHypervisor(
cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong()
cpuTime[3] += (interferenceDelta * d).roundToLong()
}
+
+ override fun flush() {
+ hv.mux.flushCounters()
+ record()
+ }
}
/**
@@ -348,10 +364,16 @@ public abstract class SimAbstractHypervisor(
override val cpuActiveTime: Long
get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
override val cpuIdleTime: Long
- get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
+ get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong()
override val cpuStealTime: Long
get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
override val cpuLostTime: Long
get() = (cpus.sumOf { it.counters.interference } * d).roundToLong()
+
+ override fun flush() {
+ for (cpu in cpus) {
+ cpu.flush()
+ }
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt
index 030d9c5f..63fee507 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt
@@ -45,4 +45,9 @@ public interface SimHypervisorCounters {
* The amount of CPU time (in milliseconds) that was lost due to interference between virtual machines.
*/
public val cpuLostTime: Long
+
+ /**
+ * Flush the counter values.
+ */
+ public fun flush()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
index 4cf60605..207e8579 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
@@ -217,7 +217,7 @@ public class SimTrace(
*/
private var _idx = 0
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val size = size
val nowOffset = now - offset
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index 742470a1..46113bb0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -61,17 +61,17 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
delegate.onStart(conn, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return delegate.onPull(conn, now, delta)
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ return delegate.onPull(conn, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
- delegate.onConverge(conn, now, delta)
+ override fun onConverge(conn: FlowConnection, now: Long) {
+ delegate.onConverge(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- delegate.onStop(conn, now, delta)
+ delegate.onStop(conn, now)
} finally {
complete(this)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
index 4685a755..a49826f4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -92,9 +92,9 @@ public suspend fun FlowConsumer.consume(source: FlowSource) {
}
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- source.onStop(conn, now, delta)
+ source.onStop(conn, now)
if (!cont.isCompleted) {
cont.resume(Unit)
@@ -105,18 +105,18 @@ public suspend fun FlowConsumer.consume(source: FlowSource) {
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return try {
- source.onPull(conn, now, delta)
+ source.onPull(conn, now)
} catch (cause: Throwable) {
cont.resumeWithException(cause)
throw cause
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- source.onConverge(conn, now, delta)
+ source.onConverge(conn, now)
} catch (cause: Throwable) {
cont.resumeWithException(cause)
throw cause
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
index 50fbc9c7..1d3adb10 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -31,10 +31,9 @@ public interface FlowConsumerLogic {
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
* @param rate The requested processing rate of the source.
*/
- public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {}
+ public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {}
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -44,17 +43,15 @@ public interface FlowConsumerLogic {
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
+ public fun onConverge(ctx: FlowConsumerContext, now: Long) {}
/**
* This method is invoked when the [FlowSource] completed or failed.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
* @param cause The cause of the failure or `null` if the source completed.
*/
- public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {}
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
index d1afda6f..62cb10d1 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
@@ -30,7 +30,6 @@ public interface FlowConvergenceListener {
* This method is invoked when the system has converged to a steady-state.
*
* @param now The timestamp at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(now: Long, delta: Long) {}
+ public fun onConverge(now: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index a5663293..0ad18f6a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -25,7 +25,11 @@ package org.opendc.simulator.flow
import mu.KotlinLogging
import org.opendc.simulator.flow.internal.D_MS_TO_S
import org.opendc.simulator.flow.internal.MutableFlowCounters
-import kotlin.math.max
+
+/**
+ * The logging instance of this connection.
+ */
+private val logger = KotlinLogging.logger {}
/**
* A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
@@ -40,11 +44,6 @@ public class FlowForwarder(
private val isCoupled: Boolean = false
) : FlowSource, FlowConsumer, AutoCloseable {
/**
- * The logging instance of this connection.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The delegate [FlowSource].
*/
private var delegate: FlowSource? = null
@@ -81,8 +80,6 @@ public class FlowForwarder(
_innerCtx?.pull(now)
}
- @JvmField var lastPull = Long.MAX_VALUE
-
override fun push(rate: Double) {
if (delegate == null) {
return
@@ -102,8 +99,7 @@ public class FlowForwarder(
if (hasDelegateStarted) {
val now = engine.clock.millis()
- val delta = max(0, now - lastPull)
- delegate.onStop(this, now, delta)
+ delegate.onStop(this, now)
}
}
}
@@ -163,7 +159,7 @@ public class FlowForwarder(
}
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_innerCtx = null
val delegate = delegate
@@ -171,25 +167,24 @@ public class FlowForwarder(
reset()
try {
- delegate.onStop(this._ctx, now, delta)
+ delegate.onStop(this._ctx, now)
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
}
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val delegate = delegate
if (!hasDelegateStarted) {
start()
}
- _ctx.lastPull = now
- updateCounters(conn, delta)
+ updateCounters(conn, now)
return try {
- delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE
+ delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
@@ -198,10 +193,10 @@ public class FlowForwarder(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- delegate?.onConverge(this._ctx, now, delta)
- listener?.onConverge(now, delta)
+ delegate?.onConverge(this._ctx, now)
+ listener?.onConverge(now)
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
@@ -217,8 +212,10 @@ public class FlowForwarder(
val delegate = delegate ?: return
try {
- delegate.onStart(_ctx, engine.clock.millis())
+ val now = engine.clock.millis()
+ delegate.onStart(_ctx, now)
hasDelegateStarted = true
+ _lastUpdate = now
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
reset()
@@ -242,11 +239,15 @@ public class FlowForwarder(
* The requested flow rate.
*/
private var _demand: Double = 0.0
+ private var _lastUpdate = 0L
/**
* Update the flow counters for the transformer.
*/
- private fun updateCounters(ctx: FlowConnection, delta: Long) {
+ private fun updateCounters(ctx: FlowConnection, now: Long) {
+ val lastUpdate = _lastUpdate
+ _lastUpdate = now
+ val delta = now - lastUpdate
if (delta <= 0) {
return
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
index 6867bcef..af702701 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
@@ -45,20 +45,20 @@ public class FlowMapper(
source.onStart(delegate, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
val delegate = checkNotNull(_conn) { "Invariant violation" }
_conn = null
- source.onStop(delegate, now, delta)
+ source.onStop(delegate, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val delegate = checkNotNull(_conn) { "Invariant violation" }
- return source.onPull(delegate, now, delta)
+ return source.onPull(delegate, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
val delegate = _conn ?: return
- source.onConverge(delegate, now, delta)
+ source.onConverge(delegate, now)
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index e9094443..d0324ce8 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -104,34 +104,39 @@ public class FlowSink(
* [FlowConsumerLogic] of a sink.
*/
private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
+
override fun onPush(
ctx: FlowConsumerContext,
now: Long,
- delta: Long,
rate: Double
) {
- updateCounters(ctx, delta, rate, ctx.capacity)
+ updateCounters(ctx, now, rate, ctx.capacity)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- updateCounters(ctx, delta, 0.0, 0.0)
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
+ updateCounters(ctx, now, 0.0, 0.0)
_ctx = null
}
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ override fun onConverge(ctx: FlowConsumerContext, now: Long) {
+ parent?.onConverge(now)
}
/**
* The previous demand and capacity for the consumer.
*/
private val _previous = DoubleArray(2)
+ private var _previousUpdate = Long.MAX_VALUE
/**
* Update the counters of the flow consumer.
*/
- private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) {
+ private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) {
+ val previousUpdate = _previousUpdate
+ _previousUpdate = now
+ val delta = now - previousUpdate
+
val counters = counters
val previous = _previous
val demand = previous[0]
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index 3a7e52aa..a48ac18e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -42,19 +42,17 @@ public interface FlowSource {
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the source finished.
- * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
*/
- public fun onStop(conn: FlowConnection, now: Long, delta: Long) {}
+ public fun onStop(conn: FlowConnection, now: Long) {}
/**
* This method is invoked when the source is pulled.
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the pull is occurring.
- * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
* @return The duration after which the resource consumer should be pulled again.
*/
- public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
+ public fun onPull(conn: FlowConnection, now: Long): Long
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -64,7 +62,6 @@ public interface FlowSource {
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {}
+ public fun onConverge(conn: FlowConnection, now: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 58ca918b..bc6bae71 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -25,10 +25,14 @@ package org.opendc.simulator.flow.internal
import mu.KotlinLogging
import org.opendc.simulator.flow.*
import java.util.*
-import kotlin.math.max
import kotlin.math.min
/**
+ * The logging instance of this connection.
+ */
+private val logger = KotlinLogging.logger {}
+
+/**
* Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
*/
internal class FlowConsumerContextImpl(
@@ -37,11 +41,6 @@ internal class FlowConsumerContextImpl(
private val logic: FlowConsumerLogic
) : FlowConsumerContext {
/**
- * The logging instance of this connection.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The capacity of the connection.
*/
override var capacity: Double
@@ -124,14 +123,6 @@ internal class FlowConsumerContextImpl(
private var _flags: Int = 0
/**
- * The timestamp of calls to the callbacks.
- */
- private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull`
- private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush`
- private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence`
- private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence`
-
- /**
* The timers at which the context is scheduled to be interrupted.
*/
private var _timer: Long = Long.MAX_VALUE
@@ -238,15 +229,11 @@ internal class FlowConsumerContextImpl(
try {
// Pull the source if (1) `pull` is called or (2) the timer of the source has expired
newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
- val lastPull = _lastPull
- val delta = max(0, now - lastPull)
-
// Update state before calling into the outside world, so it observes a consistent state
- _lastPull = now
_flags = (flags and ConnPulled.inv()) or ConnUpdateActive
hasUpdated = true
- val duration = source.onPull(this, now, delta)
+ val duration = source.onPull(this, now)
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
@@ -266,15 +253,11 @@ internal class FlowConsumerContextImpl(
// Push to the consumer if the rate of the source has changed (after a call to `push`)
if (flags and ConnPushed != 0) {
- val lastPush = _lastPush
- val delta = max(0, now - lastPush)
-
// Update state before calling into the outside world, so it observes a consistent state
- _lastPush = now
_flags = (flags and ConnPushed.inv()) or ConnUpdateActive
hasUpdated = true
- logic.onPush(this, now, delta, _demand)
+ logic.onPush(this, now, _demand)
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
@@ -372,18 +355,12 @@ internal class FlowConsumerContextImpl(
// Call the source converge callback if it has enabled convergence
if (flags and ConnConvergeSource != 0) {
- val delta = max(0, now - _lastSourceConvergence)
- _lastSourceConvergence = now
-
- source.onConverge(this, now, delta)
+ source.onConverge(this, now)
}
// Call the consumer callback if it has enabled convergence
if (flags and ConnConvergeConsumer != 0) {
- val delta = max(0, now - _lastConsumerConvergence)
- _lastConsumerConvergence = now
-
- logic.onConverge(this, now, delta)
+ logic.onConverge(this, now)
}
} catch (cause: Throwable) {
// Invoke the finish callbacks
@@ -403,7 +380,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doStopSource(now: Long) {
try {
- source.onStop(this, now, max(0, now - _lastPull))
+ source.onStop(this, now)
doFinishConsumer(now, null)
} catch (cause: Throwable) {
doFinishConsumer(now, cause)
@@ -415,7 +392,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doFailSource(now: Long, cause: Throwable) {
try {
- source.onStop(this, now, max(0, now - _lastPull))
+ source.onStop(this, now)
} catch (e: Throwable) {
e.addSuppressed(cause)
doFinishConsumer(now, e)
@@ -427,7 +404,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doFinishConsumer(now: Long, cause: Throwable?) {
try {
- logic.onFinish(this, now, max(0, now - _lastPush), cause)
+ logic.onFinish(this, now, cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
logger.error(e) { "Uncaught exception" }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index a7877546..5f198944 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -105,4 +105,14 @@ public interface FlowMultiplexer {
* Clear the outputs of the multiplexer.
*/
public fun clearOutputs()
+
+ /**
+ * Flush the counters of the multiplexer.
+ */
+ public fun flushCounters()
+
+ /**
+ * Flush the counters of the specified [input].
+ */
+ public fun flushCounters(input: FlowConsumer)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index eff111b8..1d7d22ef 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.ArrayDeque
-import kotlin.math.max
/**
* A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
@@ -135,16 +134,12 @@ public class ForwardingFlowMultiplexer(
clearInputs()
}
- private var _lastConverge = Long.MAX_VALUE
+ override fun flushCounters() {}
- override fun onConverge(now: Long, delta: Long) {
- val listener = listener
- if (listener != null) {
- val lastConverge = _lastConverge
- _lastConverge = now
- val duration = max(0, now - lastConverge)
- listener.onConverge(now, duration)
- }
+ override fun flushCounters(input: FlowConsumer) {}
+
+ override fun onConverge(now: Long) {
+ listener?.onConverge(now)
}
/**
@@ -163,9 +158,9 @@ public class ForwardingFlowMultiplexer(
forwarder.onStart(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
forwarder.cancel()
- forwarder.onStop(conn, now, delta)
+ forwarder.onStop(conn, now)
}
override fun toString(): String = "ForwardingFlowMultiplexer.Output"
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 3d26efda..cc831862 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -141,6 +141,14 @@ public class MaxMinFlowMultiplexer(
clearInputs()
}
+ override fun flushCounters() {
+ scheduler.updateCounters(engine.clock.millis())
+ }
+
+ override fun flushCounters(input: FlowConsumer) {
+ (input as Input).doUpdateCounters(engine.clock.millis())
+ }
+
/**
* Helper class containing the scheduler state.
*/
@@ -189,7 +197,6 @@ public class MaxMinFlowMultiplexer(
* Flag to indicate that the scheduler is active.
*/
private var _schedulerActive = false
- private var _lastSchedulerCycle = Long.MAX_VALUE
/**
* The last convergence timestamp and the input.
@@ -250,7 +257,7 @@ public class MaxMinFlowMultiplexer(
_lastConverge = now
_lastConvergeInput = input
- parent.onConverge(now, max(0, now - lastConverge))
+ parent.onConverge(now)
}
}
@@ -278,13 +285,11 @@ public class MaxMinFlowMultiplexer(
* This method is invoked when one of the outputs converges.
*/
fun convergeOutput(output: Output, now: Long) {
- val lastConverge = _lastConverge
val parent = parent
if (parent != null) {
_lastConverge = now
-
- parent.onConverge(now, max(0, now - lastConverge))
+ parent.onConverge(now)
}
if (!output.isActive) {
@@ -324,14 +329,9 @@ public class MaxMinFlowMultiplexer(
* Synchronously run the scheduler of the multiplexer.
*/
fun runScheduler(now: Long): Long {
- val lastSchedulerCycle = _lastSchedulerCycle
- _lastSchedulerCycle = now
-
- val delta = max(0, now - lastSchedulerCycle)
-
return try {
_schedulerActive = true
- doRunScheduler(now, delta)
+ doRunScheduler(now)
} finally {
_schedulerActive = false
}
@@ -384,14 +384,14 @@ public class MaxMinFlowMultiplexer(
*
* @return The deadline after which a new scheduling cycle should start.
*/
- private fun doRunScheduler(now: Long, delta: Long): Long {
+ private fun doRunScheduler(now: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
var inputArray = _inputArray
var inputSize = _inputArray.size
// Update the counters of the scheduler
- updateCounters(delta)
+ updateCounters(now)
// If there is no work yet, mark the inputs as idle.
if (inputSize == 0) {
@@ -475,14 +475,19 @@ public class MaxMinFlowMultiplexer(
* The previous capacity of the multiplexer.
*/
private var _previousCapacity = 0.0
+ private var _previousUpdate = Long.MIN_VALUE
/**
* Update the counters of the scheduler.
*/
- private fun updateCounters(delta: Long) {
+ fun updateCounters(now: Long) {
val previousCapacity = _previousCapacity
_previousCapacity = capacity
+ val previousUpdate = _previousUpdate
+ _previousUpdate = now
+
+ val delta = now - previousUpdate
if (delta <= 0) {
return
}
@@ -640,10 +645,9 @@ public class MaxMinFlowMultiplexer(
override fun onPush(
ctx: FlowConsumerContext,
now: Long,
- delta: Long,
rate: Double
) {
- doUpdateCounters(delta)
+ doUpdateCounters(now)
val allowed = min(rate, capacity)
limit = rate
@@ -653,8 +657,8 @@ public class MaxMinFlowMultiplexer(
scheduler.trigger(now)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- doUpdateCounters(delta)
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
+ doUpdateCounters(now)
limit = 0.0
actualRate = 0.0
@@ -665,7 +669,7 @@ public class MaxMinFlowMultiplexer(
_ctx = null
}
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ override fun onConverge(ctx: FlowConsumerContext, now: Long) {
scheduler.convergeInput(this, now)
}
@@ -673,9 +677,18 @@ public class MaxMinFlowMultiplexer(
override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
/**
+ * The timestamp that the counters where last updated.
+ */
+ private var _lastUpdate = Long.MIN_VALUE
+
+ /**
* Helper method to update the flow counters of the multiplexer.
*/
- private fun doUpdateCounters(delta: Long) {
+ fun doUpdateCounters(now: Long) {
+ val lastUpdate = _lastUpdate
+ _lastUpdate = now
+
+ val delta = (now - lastUpdate).coerceAtLeast(0)
if (delta <= 0L) {
return
}
@@ -761,7 +774,7 @@ public class MaxMinFlowMultiplexer(
scheduler.registerOutput(this)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_conn = null
capacity = 0.0
isActive = false
@@ -769,7 +782,7 @@ public class MaxMinFlowMultiplexer(
scheduler.deregisterOutput(this, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val capacity = capacity
if (capacity != conn.capacity) {
this.capacity = capacity
@@ -792,7 +805,7 @@ public class MaxMinFlowMultiplexer(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
if (_isActivationOutput) {
scheduler.convergeOutput(this, now)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
index d9779c6a..6cfcc82c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
@@ -37,8 +37,17 @@ public class FixedFlowSource(private val amount: Double, private val utilization
}
private var remainingAmount = amount
+ private var lastPull: Long = 0L
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ lastPull = now
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ val lastPull = lastPull
+ this.lastPull = now
+ val delta = (now - lastPull).coerceAtLeast(0)
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val consumed = conn.rate * delta / 1000.0
val limit = conn.capacity * utilization
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 6dd60d95..80127fb5 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -53,21 +53,21 @@ public class FlowSourceRateAdapter(
delegate.onStart(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- delegate.onStop(conn, now, delta)
+ delegate.onStop(conn, now)
} finally {
rate = 0.0
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return delegate.onPull(conn, now, delta)
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ return delegate.onPull(conn, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- delegate.onConverge(conn, now, delta)
+ delegate.onConverge(conn, now)
} finally {
rate = conn.rate
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
index ae537845..c9a52128 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -37,11 +37,11 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource
_iterator = trace.iterator()
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_iterator = null
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
// Check whether the trace fragment was fully consumed, otherwise wait until we have done so
val nextTarget = _nextTarget
if (nextTarget > now) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
index fe39eb2c..e7b25554 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -36,7 +36,7 @@ class FlowConsumerContextTest {
fun testFlushWithoutCommand() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(1.0)
1000
@@ -57,7 +57,7 @@ class FlowConsumerContextTest {
fun testDoubleStart() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(0.0)
1000
@@ -82,7 +82,7 @@ class FlowConsumerContextTest {
fun testIdempotentCapacityChange() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(1.0)
1000
@@ -99,6 +99,6 @@ class FlowConsumerContextTest {
context.start()
context.capacity = 4200.0
- verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onPull(any(), any()) }
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index 12e72b8f..8b090593 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -45,7 +45,7 @@ internal class FlowForwarderTest {
launch { source.consume(forwarder) }
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -66,7 +66,7 @@ internal class FlowForwarderTest {
forwarder.consume(object : FlowSource {
var isFirst = true
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -87,7 +87,7 @@ internal class FlowForwarderTest {
val engine = FlowEngineImpl(coroutineContext, clock)
val forwarder = FlowForwarder(engine)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -113,7 +113,7 @@ internal class FlowForwarderTest {
val forwarder = FlowForwarder(engine)
val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -122,7 +122,7 @@ internal class FlowForwarderTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 0) { consumer.onStop(any(), any()) }
}
@Test
@@ -140,7 +140,7 @@ internal class FlowForwarderTest {
forwarder.cancel()
verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any()) }
}
@Test
@@ -158,7 +158,7 @@ internal class FlowForwarderTest {
source.cancel()
verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any()) }
}
@Test
@@ -168,7 +168,7 @@ internal class FlowForwarderTest {
val source = FlowSink(engine, 2000.0)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -198,7 +198,7 @@ internal class FlowForwarderTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { source.onPull(any(), any(), any()) }
+ verify(exactly = 1) { source.onPull(any(), any()) }
}
@Test
@@ -214,11 +214,13 @@ internal class FlowForwarderTest {
yield()
- assertEquals(2.0, source.counters.actual)
- assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
- assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
- assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
- assertEquals(2000, clock.millis())
+ assertAll(
+ { assertEquals(2.0, source.counters.actual) },
+ { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } },
+ { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } },
+ { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } },
+ { assertEquals(2000, clock.millis()) }
+ )
}
@Test
@@ -246,7 +248,7 @@ internal class FlowForwarderTest {
try {
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
throw IllegalStateException("Test")
}
})
@@ -269,7 +271,7 @@ internal class FlowForwarderTest {
try {
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
@@ -301,11 +303,11 @@ internal class FlowForwarderTest {
conn.shouldSourceConverge = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
throw IllegalStateException("Test")
}
})
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
index 70c75864..726ddbf7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -65,7 +65,7 @@ internal class FlowSinkTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 3) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 3) { consumer.onPull(any(), any()) }
}
@Test
@@ -95,7 +95,7 @@ internal class FlowSinkTest {
val provider = FlowSink(engine, capacity)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -122,7 +122,7 @@ internal class FlowSinkTest {
resCtx = conn
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -154,7 +154,7 @@ internal class FlowSinkTest {
throw IllegalStateException("Hi")
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
}
@@ -173,7 +173,7 @@ internal class FlowSinkTest {
val consumer = object : FlowSource {
var isFirst = true
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -231,7 +231,7 @@ internal class FlowSinkTest {
val provider = FlowSink(engine, capacity)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
}
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
index 187dacd9..ef15f711 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -112,7 +112,7 @@ internal class ForwardingFlowMultiplexerTest {
isFirst = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
index 4b0d7bbd..675ac1c3 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
@@ -32,7 +32,7 @@ public class SimNetworkSink(
public val capacity: Double
) : SimNetworkPort() {
override fun createConsumer(): FlowSource = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
override fun toString(): String = "SimNetworkSink.Consumer"
}
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index 14d22162..2e6983c8 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -124,7 +124,7 @@ class SimNetworkSinkTest {
assertFalse(sink.isConnected)
assertFalse(source.isConnected)
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
private class Source(engine: FlowEngine) : SimNetworkPort() {
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index eb823eb1..7cc4b801 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -85,7 +85,7 @@ internal class SimPduTest {
outlet.connect(inlet)
outlet.disconnect()
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index 76142103..4f319e65 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -85,7 +85,7 @@ internal class SimPowerSourceTest {
source.connect(inlet)
source.disconnect()
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index a764a368..e19e72fa 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -92,7 +92,7 @@ internal class SimUpsTest {
ups.connect(inlet)
ups.disconnect()
- verify { consumer.onStop(any(), any(), any()) }
+ verify { consumer.onStop(any(), any()) }
}
class SimpleInlet : SimPowerInlet() {
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
index 418dc201..9557f680 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -493,10 +493,10 @@ public class ComputeMetricAggregator {
fun reset() {
previousUptime = _uptime
previousDowntime = _downtime
- previousCpuActiveTime = cpuActiveTime
- previousCpuIdleTime = cpuIdleTime
- previousCpuStealTime = cpuStealTime
- previousCpuLostTime = cpuLostTime
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
_host = null
_cpuLimit = 0.0
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index 69fc79bb..5088b044 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -426,7 +426,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val id = reader.get(idCol) as String
val resource = selected[id] ?: continue
- val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz
+ val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz
val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) }
val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
val delta = (timestamp - state.time)