From a49a3878758438fe8d04bf4c4d3e3ffc5873aace Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 29 Nov 2024 13:54:31 +0100 Subject: Multiplexer update (#278) * Fixed the Multiplexer.java to properly divide the supply over the different consumers. Fixed a bug where fragments were being loaded in reversed order. * Optimized the Multiplexer.java, by only updating the supply of the consumer that updated its demand when possible. --- .../compute/simulator/telemetry/GuestCpuStats.java | 1 + .../org/opendc/compute/simulator/internal/Guest.kt | 1 + .../telemetry/parquet/DfltTaskExportColumns.kt | 10 ++ .../simulator/telemetry/table/TaskTableReader.kt | 10 ++ .../telemetry/table/TaskTableReaderImpl.kt | 26 +++-- .../experiments/base/ScenarioIntegrationTest.kt | 14 +-- .../org/opendc/simulator/compute/cpu/SimCpu.java | 12 ++- .../java/org/opendc/simulator/Multiplexer.java | 114 ++++++++++++--------- 8 files changed, 123 insertions(+), 65 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java index 97202104..61c4145c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/telemetry/GuestCpuStats.java @@ -40,4 +40,5 @@ public record GuestCpuStats( long lostTime, double capacity, double usage, + double demand, double utilization) {} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 3a923222..3a2416a8 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -233,6 +233,7 @@ public class Guest( counters.cpuLostTime / 1000L, counters.cpuCapacity, counters.cpuSupply, + counters.cpuDemand, counters.cpuSupply / cpuLimit, ) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt index 6658e444..cf315947 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt @@ -97,6 +97,16 @@ public object DfltTaskExportColumns { field = Types.required(FLOAT).named("cpu_limit"), ) { it.cpuLimit } + public val CPU_USAGE: ExportColumn = + ExportColumn( + field = Types.required(FLOAT).named("cpu_usage"), + ) { it.cpuUsage } + + public val CPU_DEMAND: ExportColumn = + ExportColumn( + field = Types.required(FLOAT).named("cpu_demand"), + ) { it.cpuDemand } + public val CPU_TIME_ACTIVE: ExportColumn = ExportColumn( field = Types.required(INT64).named("cpu_time_active"), diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt index 50ffa5fc..0a752e75 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReader.kt @@ -99,6 +99,16 @@ public interface TaskTableReader : Exportable { */ public val cpuLimit: Double + /** + * The CPU given to this task (in MHz). + */ + public val cpuUsage: Double + + /** + * The CPU demanded by this task (in MHz). + */ + public val cpuDemand: Double + /** * The duration (in seconds) that a CPU was active in the task. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt index 5a0897f7..edcc8b20 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/TaskTableReaderImpl.kt @@ -55,6 +55,8 @@ public class TaskTableReaderImpl( _timestampAbsolute = table.timestampAbsolute _cpuLimit = table.cpuLimit + _cpuDemand = table.cpuDemand + _cpuUsage = table.cpuUsage _cpuActiveTime = table.cpuActiveTime _cpuIdleTime = table.cpuIdleTime _cpuStealTime = table.cpuStealTime @@ -128,6 +130,14 @@ public class TaskTableReaderImpl( get() = _cpuLimit private var _cpuLimit = 0.0 + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + override val cpuActiveTime: Long get() = _cpuActiveTime - previousCpuActiveTime private var _cpuActiveTime = 0L @@ -181,14 +191,16 @@ public class TaskTableReaderImpl( _timestampAbsolute = now + startTime _cpuLimit = cpuStats?.capacity ?: 0.0 - _cpuActiveTime = cpuStats?.activeTime ?: 0 - _cpuIdleTime = cpuStats?.idleTime ?: 0 - _cpuStealTime = cpuStats?.stealTime ?: 0 - _cpuLostTime = cpuStats?.lostTime ?: 0 - _uptime = sysStats?.uptime?.toMillis() ?: 0 - _downtime = sysStats?.downtime?.toMillis() ?: 0 + _cpuDemand = cpuStats?.demand ?: 0.0 + _cpuUsage = cpuStats?.usage ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: _cpuActiveTime + _cpuIdleTime = cpuStats?.idleTime ?: _cpuIdleTime + _cpuStealTime = cpuStats?.stealTime ?: _cpuStealTime + _cpuLostTime = cpuStats?.lostTime ?: _cpuLostTime + _uptime = sysStats?.uptime?.toMillis() ?: _uptime + _downtime = sysStats?.downtime?.toMillis() ?: _downtime _provisionTime = task.launchedAt - _bootTime = sysStats?.bootTime + _bootTime = sysStats?.bootTime ?: _bootTime _creationTime = task.createdAt _finishTime = task.finishedAt diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt index 9ce25c99..10478174 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt @@ -253,9 +253,9 @@ class ScenarioIntegrationTest { assertAll( { assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } }, { assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } }, - { assertEquals(4297000, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(5003000, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, ) @@ -294,8 +294,8 @@ class ScenarioIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(1803918431, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(787181569, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, { assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, @@ -341,8 +341,8 @@ class ScenarioIntegrationTest { { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, - { assertEquals(43101787498, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(3489412502, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } }, { assertEquals(0, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, { assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } }, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index d3edc957..63331a6c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -40,8 +40,9 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer private double currentCpuDemand = 0.0f; // cpu capacity demanded by the mux private double currentCpuUtilization = 0.0f; - private double currentPowerDemand = 0.0f; // power demanded of the psu private double currentCpuSupplied = 0.0f; // cpu capacity supplied to the mux + + private double currentPowerDemand = 0.0f; // power demanded of the psu private double currentPowerSupplied = 0.0f; // cpu capacity supplied by the psu private double maxCapacity; @@ -122,7 +123,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer public long onUpdate(long now) { updateCounters(now); - this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity; + this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0); // Calculate Power Demand and send to PSU double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); @@ -132,7 +133,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer } // Calculate the amount of cpu this can provide - double cpuSupply = this.currentCpuDemand; + double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity); if (cpuSupply != this.currentCpuSupplied) { this.pushSupply(this.muxEdge, cpuSupply); @@ -205,6 +206,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer this.currentCpuDemand = newCpuDemand; this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity; + this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0); + // Calculate Power Demand and send to PSU double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); @@ -223,7 +226,8 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer this.currentPowerSupplied = newPowerSupply; // Calculate the amount of cpu this can provide - double cpuSupply = this.currentCpuDemand; + double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity); + ; if (cpuSupply != this.currentCpuSupplied) { this.pushSupply(this.muxEdge, cpuSupply); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java index 8cd2fa6f..ece90c20 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java @@ -39,6 +39,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer private double totalDemand; // The total demand of all the consumers private double totalSupply; // The total supply from the supplier + + private boolean overProvisioned = false; + private int currentConsumerIdx = -1; + private double capacity; // What is the max capacity public Multiplexer(FlowGraph graph) { @@ -59,52 +63,63 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer public long onUpdate(long now) { - if (this.totalDemand > this.capacity) { - redistributeSupply(this.consumerEdges, this.supplies, this.capacity); - } else { - for (int i = 0; i < this.demands.size(); i++) { - this.supplies.set(i, this.demands.get(i)); + return Long.MAX_VALUE; + } + + private void distributeSupply() { + // if supply >= demand -> push supplies to all tasks + // TODO: possible optimization -> Only has to be done for the specific consumer that changed demand + if (this.totalSupply >= this.totalDemand) { + + // If this came from a state of over provisioning, provide all consumers with their demand + if (this.overProvisioned) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); + } } - } - double totalSupply = 0; - for (int i = 0; i < this.consumerEdges.size(); i++) { - this.pushSupply(this.consumerEdges.get(i), this.supplies.get(i)); - totalSupply += this.supplies.get(i); + if (this.currentConsumerIdx != -1) { + this.pushSupply( + this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); + this.currentConsumerIdx = -1; + } + + this.overProvisioned = false; } - // Only update supplier if supply has changed - if (this.totalSupply != totalSupply) { - this.totalSupply = totalSupply; + // if supply < demand -> distribute the supply over all consumers + else { + this.overProvisioned = true; + double[] supplies = redistributeSupply(this.demands, this.totalSupply); - pushDemand(this.supplierEdge, this.totalSupply); + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); + } } - - return Long.MAX_VALUE; } - private static double redistributeSupply( - ArrayList consumerEdges, ArrayList supplies, double capacity) { - final long[] consumers = new long[consumerEdges.size()]; + private record Demand(int idx, double value) {} - for (int i = 0; i < consumers.length; i++) { - FlowEdge consumer = consumerEdges.get(i); + private static double[] redistributeSupply(ArrayList demands, double totalSupply) { + int inputSize = demands.size(); - if (consumer == null) { - break; - } + final double[] supplies = new double[inputSize]; + final Demand[] tempDemands = new Demand[inputSize]; - consumers[i] = (Double.doubleToRawLongBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL); + for (int i = 0; i < inputSize; i++) { + tempDemands[i] = new Demand(i, demands.get(i)); } - Arrays.sort(consumers); - double availableCapacity = capacity; - int inputSize = consumers.length; + Arrays.sort(tempDemands, (o1, o2) -> { + Double i1 = o1.value; + Double i2 = o2.value; + return i1.compareTo(i2); + }); + + double availableCapacity = totalSupply; // totalSupply for (int i = 0; i < inputSize; i++) { - long v = consumers[i]; - int slot = (int) v; - double d = Double.longBitsToDouble((int) (v >> 32)); + double d = tempDemands[i].value; if (d == 0.0) { continue; @@ -113,12 +128,13 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer double availableShare = availableCapacity / (inputSize - i); double r = Math.min(d, availableShare); - supplies.set(slot, r); // Update the rates + int idx = tempDemands[i].idx; + supplies[idx] = r; // Update the rates availableCapacity -= r; } // Return the used capacity - return capacity - availableCapacity; + return supplies; } /** @@ -132,8 +148,6 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer this.consumerEdges.add(consumerEdge); this.demands.add(0.0); this.supplies.add(0.0); - - this.invalidate(); } @Override @@ -141,8 +155,6 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer this.supplierEdge = supplierEdge; this.capacity = supplierEdge.getCapacity(); this.totalSupply = 0; - - this.invalidate(); } @Override @@ -164,7 +176,9 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer this.consumerEdges.get(i).setConsumerIndex(i); } - this.invalidate(); + this.currentConsumerIdx = -1; + + this.pushDemand(this.supplierEdge, this.totalDemand); } @Override @@ -178,28 +192,34 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer public void handleDemand(FlowEdge consumerEdge, double newDemand) { int idx = consumerEdge.getConsumerIndex(); + this.currentConsumerIdx = idx; + if (idx == -1) { System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer"); return; } + // Update the total demand (This is cheaper than summing over all demands) double prevDemand = demands.get(idx); - demands.set(idx, newDemand); + demands.set(idx, newDemand); this.totalDemand += (newDemand - prevDemand); - if (this.totalDemand <= this.capacity) { - - this.totalSupply = this.totalDemand; - this.pushDemand(this.supplierEdge, this.totalSupply); - - this.pushSupply(consumerEdge, newDemand); + if (overProvisioned) { + distributeSupply(); } - // TODO: add behaviour if capacity is reached + + // Send new totalDemand to CPU + // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) + this.pushDemand(this.supplierEdge, this.totalDemand); } @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) {} + public void handleSupply(FlowEdge supplierEdge, double newSupply) { + this.totalSupply = newSupply; // Currently this is from a single supply, might turn into multiple suppliers + + this.distributeSupply(); + } @Override public void pushDemand(FlowEdge supplierEdge, double newDemand) { -- cgit v1.2.3