From f71e07f55a5176c5bd5447cdb3bcfebf2f5f47ee Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 7 Jan 2025 11:25:48 +0100 Subject: Updated the FlowDistributor (#285) * Updated the FlowDistributor to work in more cases and be more performant. * Removed old FlowDistributor --- .../org/opendc/simulator/compute/cpu/SimCpu.java | 50 ++--- .../simulator/compute/machine/SimMachine.java | 18 +- .../simulator/compute/machine/VirtualMachine.java | 14 +- .../simulator/compute/power/SimPowerSource.java | 14 +- .../org/opendc/simulator/compute/power/SimPsu.java | 14 +- .../compute/workload/SimChainWorkload.java | 12 +- .../compute/workload/SimTraceWorkload.java | 10 +- .../opendc/simulator/engine/engine/FlowEngine.java | 8 +- .../simulator/engine/engine/FlowEventQueue.java | 206 +++++++++++++++++++++ .../simulator/engine/engine/FlowTimerQueue.java | 206 --------------------- .../simulator/engine/graph/FlowConsumer.java | 4 +- .../simulator/engine/graph/FlowDistributor.java | 160 +++++++++------- .../opendc/simulator/engine/graph/FlowEdge.java | 26 ++- .../opendc/simulator/engine/graph/FlowNode.java | 4 +- .../simulator/engine/graph/FlowSupplier.java | 4 +- 15 files changed, 388 insertions(+), 362 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java index c5b8a9ea..a9edaa97 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/cpu/SimCpu.java @@ -51,7 +51,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer private long lastCounterUpdate; private final double cpuFrequencyInv; - private FlowEdge muxEdge; + private FlowEdge distributorEdge; private FlowEdge psuEdge; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -123,21 +123,16 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer public long onUpdate(long now) { updateCounters(now); - this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0); - - // Calculate Power Demand and send to PSU - double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); + // Check if supply == demand + if (this.currentPowerDemand != this.currentPowerSupplied) { + this.pushOutgoingDemand(this.psuEdge, this.currentPowerDemand); - if (powerDemand != this.currentPowerDemand) { - this.pushDemand(this.psuEdge, powerDemand); + return Long.MAX_VALUE; } - // Calculate the amount of cpu this can provide - double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity); + this.currentCpuSupplied = Math.min(this.currentCpuDemand, this.maxCapacity); - if (cpuSupply != this.currentCpuSupplied) { - this.pushSupply(this.muxEdge, cpuSupply); - } + this.pushOutgoingSupply(this.distributorEdge, this.currentCpuSupplied); return Long.MAX_VALUE; } @@ -181,7 +176,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer * Push new demand to the psu */ @Override - public void pushDemand(FlowEdge supplierEdge, double newPowerDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newPowerDemand) { updateCounters(); this.currentPowerDemand = newPowerDemand; this.psuEdge.pushDemand(newPowerDemand); @@ -191,47 +186,38 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer * Push updated supply to the mux */ @Override - public void pushSupply(FlowEdge consumerEdge, double newCpuSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newCpuSupply) { updateCounters(); this.currentCpuSupplied = newCpuSupply; - this.muxEdge.pushSupply(newCpuSupply); + + this.distributorEdge.pushSupply(newCpuSupply, true); } /** * Handle new demand coming in from the mux */ @Override - public void handleDemand(FlowEdge consumerEdge, double newCpuDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newCpuDemand) { updateCounters(); this.currentCpuDemand = newCpuDemand; - this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity; this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0); // Calculate Power Demand and send to PSU - double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); + this.currentPowerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization); - if (powerDemand != this.currentPowerDemand) { - this.pushDemand(this.psuEdge, powerDemand); - } + this.invalidate(); } /** * Handle updated supply from the psu */ @Override - public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) { - // TODO: Implement this + public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) { updateCounters(); this.currentPowerSupplied = newPowerSupply; - // Calculate the amount of cpu this can provide - double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity); - ; - - if (cpuSupply != this.currentCpuSupplied) { - this.pushSupply(this.muxEdge, cpuSupply); - } + this.invalidate(); } /** @@ -239,7 +225,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer */ @Override public void addConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = consumerEdge; + this.distributorEdge = consumerEdge; } /** @@ -257,7 +243,7 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer */ @Override public void removeConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = null; + this.distributorEdge = null; this.invalidate(); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index 074f0ed8..dab0c421 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -44,7 +44,7 @@ public class SimMachine { private final InstantSource clock; private SimCpu cpu; - private FlowDistributor cpuMux; + private FlowDistributor cpuDistributor; private SimPsu psu; private Memory memory; @@ -74,8 +74,8 @@ public class SimMachine { return cpu; } - public FlowDistributor getCpuMux() { - return cpuMux; + public FlowDistributor getCpuDistributor() { + return cpuDistributor; } public Memory getMemory() { @@ -114,7 +114,7 @@ public class SimMachine { public SimMachine( FlowGraph graph, MachineModel machineModel, - FlowDistributor powerMux, + FlowDistributor powerDistributor, CpuPowerModel cpuPowerModel, Consumer completion) { this.graph = graph; @@ -124,7 +124,7 @@ public class SimMachine { // Create the psu and cpu and connect them this.psu = new SimPsu(graph); - graph.addEdge(this.psu, powerMux); + graph.addEdge(this.psu, powerDistributor); this.cpu = new SimCpu(graph, this.machineModel.getCpuModel(), cpuPowerModel, 0); @@ -133,8 +133,8 @@ public class SimMachine { this.memory = new Memory(graph, this.machineModel.getMemory()); // Create a FlowDistributor and add the cpu as supplier - this.cpuMux = new FlowDistributor(this.graph); - graph.addEdge(this.cpuMux, this.cpu); + this.cpuDistributor = new FlowDistributor(this.graph); + graph.addEdge(this.cpuDistributor, this.cpu); this.completion = completion; } @@ -153,8 +153,8 @@ public class SimMachine { this.graph.removeNode(this.cpu); this.cpu = null; - this.graph.removeNode(this.cpuMux); - this.cpuMux = null; + this.graph.removeNode(this.cpuDistributor); + this.cpuDistributor = null; this.memory = null; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java index b8a9c738..1946eecb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/VirtualMachine.java @@ -100,7 +100,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli this.clock = this.machine.getClock(); this.parentGraph = machine.getGraph(); - this.parentGraph.addEdge(this, this.machine.getCpuMux()); + this.parentGraph.addEdge(this, this.machine.getCpuDistributor()); this.lastUpdate = clock.millis(); this.lastUpdate = clock.millis(); @@ -185,7 +185,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli * Push demand to the cpuMux if the demand has changed **/ @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.cpuEdge.pushDemand(newDemand); } @@ -193,7 +193,7 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli * Push supply to the workload if the supply has changed **/ @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.workloadEdge.pushSupply(newSupply); } @@ -201,24 +201,24 @@ public class VirtualMachine extends FlowNode implements FlowConsumer, FlowSuppli * Handle new demand from the workload by sending it through to the cpuMux **/ @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { updateCounters(this.clock.millis()); this.cpuDemand = newDemand; - pushDemand(this.cpuEdge, newDemand); + pushOutgoingDemand(this.cpuEdge, newDemand); } /** * Handle a new supply pushed by the cpuMux by sending it through to the workload **/ @Override - public void handleSupply(FlowEdge supplierEdge, double newCpuSupply) { + public void handleIncomingSupply(FlowEdge supplierEdge, double newCpuSupply) { updateCounters(this.clock.millis()); this.cpuSupply = newCpuSupply; - pushSupply(this.workloadEdge, newCpuSupply); + pushOutgoingSupply(this.workloadEdge, newCpuSupply); } @Override diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java index e8626e40..d2270888 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPowerSource.java @@ -43,7 +43,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { private double totalCarbonEmission = 0.0f; private CarbonModel carbonModel = null; - private FlowEdge muxEdge; + private FlowEdge distributorEdge; private double capacity = Long.MAX_VALUE; @@ -57,7 +57,7 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { * @return true if the InPort is connected to an OutPort, false otherwise. */ public boolean isConnected() { - return muxEdge != null; + return distributorEdge != null; } /** @@ -156,30 +156,30 @@ public final class SimPowerSource extends FlowNode implements FlowSupplier { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @Override - public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) { this.powerDemand = newPowerDemand; double powerSupply = this.powerDemand; if (powerSupply != this.powerSupplied) { - this.pushSupply(this.muxEdge, powerSupply); + this.pushOutgoingSupply(this.distributorEdge, powerSupply); } } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.powerSupplied = newSupply; consumerEdge.pushSupply(newSupply); } @Override public void addConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = consumerEdge; + this.distributorEdge = consumerEdge; } @Override public void removeConsumerEdge(FlowEdge consumerEdge) { - this.muxEdge = null; + this.distributorEdge = null; } // Update the carbon intensity of the power source diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index c1e8a1b9..dc5129d6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -106,7 +106,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer double powerSupply = this.powerDemand; if (powerSupply != this.powerSupplied) { - this.pushSupply(this.cpuEdge, powerSupply); + this.pushOutgoingSupply(this.cpuEdge, powerSupply); } return Long.MAX_VALUE; @@ -135,33 +135,33 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.powerDemand = newDemand; powerSupplyEdge.pushDemand(newDemand); } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.powerSupplied = newSupply; cpuEdge.pushSupply(newSupply); } @Override - public void handleDemand(FlowEdge consumerEdge, double newPowerDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) { updateCounters(); this.powerDemand = newPowerDemand; - pushDemand(this.powerSupplyEdge, newPowerDemand); + pushOutgoingDemand(this.powerSupplyEdge, newPowerDemand); } @Override - public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) { + public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) { updateCounters(); this.powerSupplied = newPowerSupply; - pushSupply(this.cpuEdge, newPowerSupply); + pushOutgoingSupply(this.cpuEdge, newPowerSupply); } @Override diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index b612de2c..da6b8334 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -195,7 +195,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newDemand new demand to sent to the cpu */ @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.demand = newDemand; this.machineEdge.pushDemand(newDemand); @@ -208,7 +208,7 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newSupply new supply to sent to the workload */ @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { this.supply = newSupply; this.workloadEdge.pushSupply(newSupply); @@ -221,8 +221,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newDemand new demand coming from the workload */ @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { - this.pushDemand(this.machineEdge, newDemand); + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { + this.pushOutgoingDemand(this.machineEdge, newDemand); } /** @@ -232,8 +232,8 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { * @param newSupply The new supply that is sent to the workload */ @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.pushSupply(this.machineEdge, newSupply); + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + this.pushOutgoingSupply(this.machineEdge, newSupply); } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java index 8487fbc2..0735d8ae 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java @@ -94,7 +94,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { graph.addEdge(this, supplier); this.currentFragment = this.getNextFragment(); - pushDemand(machineEdge, this.currentFragment.cpuUsage()); + pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); this.startOfFragment = now; } @@ -131,7 +131,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.startOfFragment = now - passedTime; // Change the cpu Usage to the new Fragment - pushDemand(machineEdge, this.currentFragment.cpuUsage()); + pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); // Return the time when the current fragment will complete return this.startOfFragment + duration; @@ -190,7 +190,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.fragmentIndex = -1; this.currentFragment = getNextFragment(); - pushDemand(this.machineEdge, this.currentFragment.cpuUsage()); + pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage()); this.startOfFragment = now; this.invalidate(); @@ -207,7 +207,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { * @param newSupply The new demand that needs to be sent to the VM */ @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { if (newSupply == this.currentSupply) { return; } @@ -222,7 +222,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { * @param newDemand The new demand that needs to be sent to the VM */ @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { if (newDemand == this.currentDemand) { return; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 24476048..67540f4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -44,7 +44,7 @@ public final class FlowEngine implements Runnable { /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. */ - private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + private final FlowEventQueue eventQueue = new FlowEventQueue(256); /** * The stack of engine invocations to occur in the future. @@ -127,7 +127,7 @@ public final class FlowEngine implements Runnable { return; } - long deadline = timerQueue.peekDeadline(); + long deadline = eventQueue.peekDeadline(); if (deadline != Long.MAX_VALUE) { trySchedule(futureInvocations, clock.millis(), deadline); } @@ -139,7 +139,7 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleDelayedInContext(FlowNode ctx) { - FlowTimerQueue timerQueue = this.timerQueue; + FlowEventQueue timerQueue = this.eventQueue; timerQueue.enqueue(ctx); } @@ -148,7 +148,7 @@ public final class FlowEngine implements Runnable { */ private void doRunEngine(long now) { final FlowCycleQueue queue = this.cycleQueue; - final FlowTimerQueue timerQueue = this.timerQueue; + final FlowEventQueue timerQueue = this.eventQueue; try { // Mark the engine as active to prevent concurrent calls to this method diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java new file mode 100644 index 00000000..53649d29 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A specialized priority queue for future event of {@link FlowNode}s sorted on time. + *

+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation + * being generic. + */ +public final class FlowEventQueue { + /** + * Array representation of binary heap of {@link FlowNode} instances. + */ + private FlowNode[] queue; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link FlowEventQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public FlowEventQueue(int initialCapacity) { + this.queue = new FlowNode[initialCapacity]; + } + + /** + * Enqueue a timer for the specified context or update the existing timer. + */ + public void enqueue(FlowNode node) { + FlowNode[] es = queue; + int k = node.getTimerIndex(); + + if (node.getDeadline() != Long.MAX_VALUE) { + if (k >= 0) { + update(es, node, k); + } else { + add(es, node); + } + } else if (k >= 0) { + delete(es, k); + } + } + + /** + * Retrieve the head of the queue if its deadline does not exceed now. + * + * @param now The timestamp that the deadline of the head of the queue should not exceed. + * @return The head of the queue if its deadline does not exceed now, otherwise null. + */ + public FlowNode poll(long now) { + if (this.size == 0) { + return null; + } + + final FlowNode[] es = queue; + final FlowNode head = es[0]; + + if (now < head.getDeadline()) { + return null; + } + + int n = size - 1; + this.size = n; + final FlowNode next = es[n]; + es[n] = null; // Clear the last element of the queue + + if (n > 0) { + siftDown(0, next, es, n); + } + + head.setTimerIndex(-1); + return head; + } + + /** + * Find the earliest deadline in the queue. + */ + public long peekDeadline() { + if (this.size > 0) { + return this.queue[0].getDeadline(); + } + + return Long.MAX_VALUE; + } + + /** + * Add a new entry to the queue. + */ + private void add(FlowNode[] es, FlowNode node) { + if (this.size >= es.length) { + // Re-fetch the resized array + es = grow(); + } + + siftUp(this.size, node, es); + + this.size++; + } + + /** + * Update the deadline of an existing entry in the queue. + */ + private void update(FlowNode[] es, FlowNode node, int k) { + if (k > 0) { + int parent = (k - 1) >>> 1; + if (es[parent].getDeadline() > node.getDeadline()) { + siftUp(k, node, es); + return; + } + } + + siftDown(k, node, es, this.size); + } + + /** + * Deadline an entry from the queue. + */ + private void delete(FlowNode[] es, int k) { + int s = --this.size; + if (s == k) { + es[k] = null; // Element is last in the queue + } else { + FlowNode moved = es[s]; + es[s] = null; + + siftDown(k, moved, es, s); + + if (es[k] == moved) { + siftUp(k, moved, es); + } + } + } + + /** + * Increases the capacity of the array. + */ + private FlowNode[] grow() { + FlowNode[] queue = this.queue; + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + + queue = Arrays.copyOf(queue, newCapacity); + this.queue = queue; + return queue; + } + + private static void siftUp(int k, FlowNode key, FlowNode[] es) { + while (k > 0) { + int parent = (k - 1) >>> 1; + FlowNode e = es[parent]; + if (key.getDeadline() >= e.getDeadline()) break; + es[k] = e; + e.setTimerIndex(k); + k = parent; + } + es[k] = key; + key.setTimerIndex(k); + } + + private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { + int half = n >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + FlowNode c = es[child]; + int right = child + 1; + if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; + + if (key.getDeadline() <= c.getDeadline()) break; + + es[k] = c; + c.setTimerIndex(k); + k = child; + } + + es[k] = key; + key.setTimerIndex(k); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java deleted file mode 100644 index 049eb40d..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.engine; - -import java.util.Arrays; -import org.opendc.simulator.engine.graph.FlowNode; - -/** - * A specialized priority queue for timers of {@link FlowNode}s. - *

- * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation - * being generic. - */ -public final class FlowTimerQueue { - /** - * Array representation of binary heap of {@link FlowNode} instances. - */ - private FlowNode[] queue; - - /** - * The number of elements in the priority queue. - */ - private int size = 0; - - /** - * Construct a {@link FlowTimerQueue} with the specified initial capacity. - * - * @param initialCapacity The initial capacity of the queue. - */ - public FlowTimerQueue(int initialCapacity) { - this.queue = new FlowNode[initialCapacity]; - } - - /** - * Enqueue a timer for the specified context or update the existing timer. - */ - public void enqueue(FlowNode node) { - FlowNode[] es = queue; - int k = node.getTimerIndex(); - - if (node.getDeadline() != Long.MAX_VALUE) { - if (k >= 0) { - update(es, node, k); - } else { - add(es, node); - } - } else if (k >= 0) { - delete(es, k); - } - } - - /** - * Retrieve the head of the queue if its deadline does not exceed now. - * - * @param now The timestamp that the deadline of the head of the queue should not exceed. - * @return The head of the queue if its deadline does not exceed now, otherwise null. - */ - public FlowNode poll(long now) { - if (this.size == 0) { - return null; - } - - final FlowNode[] es = queue; - final FlowNode head = es[0]; - - if (now < head.getDeadline()) { - return null; - } - - int n = size - 1; - this.size = n; - final FlowNode next = es[n]; - es[n] = null; // Clear the last element of the queue - - if (n > 0) { - siftDown(0, next, es, n); - } - - head.setTimerIndex(-1); - return head; - } - - /** - * Find the earliest deadline in the queue. - */ - public long peekDeadline() { - if (this.size > 0) { - return this.queue[0].getDeadline(); - } - - return Long.MAX_VALUE; - } - - /** - * Add a new entry to the queue. - */ - private void add(FlowNode[] es, FlowNode node) { - if (this.size >= es.length) { - // Re-fetch the resized array - es = grow(); - } - - siftUp(this.size, node, es); - - this.size++; - } - - /** - * Update the deadline of an existing entry in the queue. - */ - private void update(FlowNode[] es, FlowNode node, int k) { - if (k > 0) { - int parent = (k - 1) >>> 1; - if (es[parent].getDeadline() > node.getDeadline()) { - siftUp(k, node, es); - return; - } - } - - siftDown(k, node, es, this.size); - } - - /** - * Deadline an entry from the queue. - */ - private void delete(FlowNode[] es, int k) { - int s = --this.size; - if (s == k) { - es[k] = null; // Element is last in the queue - } else { - FlowNode moved = es[s]; - es[s] = null; - - siftDown(k, moved, es, s); - - if (es[k] == moved) { - siftUp(k, moved, es); - } - } - } - - /** - * Increases the capacity of the array. - */ - private FlowNode[] grow() { - FlowNode[] queue = this.queue; - int oldCapacity = queue.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - - queue = Arrays.copyOf(queue, newCapacity); - this.queue = queue; - return queue; - } - - private static void siftUp(int k, FlowNode key, FlowNode[] es) { - while (k > 0) { - int parent = (k - 1) >>> 1; - FlowNode e = es[parent]; - if (key.getDeadline() >= e.getDeadline()) break; - es[k] = e; - e.setTimerIndex(k); - k = parent; - } - es[k] = key; - key.setTimerIndex(k); - } - - private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { - int half = n >>> 1; // loop while a non-leaf - while (k < half) { - int child = (k << 1) + 1; // assume left child is least - FlowNode c = es[child]; - int right = child + 1; - if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; - - if (key.getDeadline() <= c.getDeadline()) break; - - es[k] = c; - c.setTimerIndex(k); - k = child; - } - - es[k] = key; - key.setTimerIndex(k); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java index 2130d376..a9da6f5d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowConsumer { - void handleSupply(FlowEdge supplierEdge, double newSupply); + void handleIncomingSupply(FlowEdge supplierEdge, double newSupply); - void pushDemand(FlowEdge supplierEdge, double newDemand); + void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand); void addSupplierEdge(FlowEdge supplierEdge); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 7ef091f8..16bb161f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -29,27 +29,30 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private final ArrayList consumerEdges = new ArrayList<>(); private FlowEdge supplierEdge; - private final ArrayList demands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList supplies = new ArrayList<>(); // What is supplied to the consumers + private final ArrayList incomingDemands = new ArrayList<>(); // What is demanded by the consumers + private final ArrayList outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers - private double totalDemand; // The total demand of all the consumers - private double totalSupply; // The total supply from the supplier + private double totalIncomingDemand; // The total demand of all the consumers + private double currentIncomingSupply; // The current supply provided by the supplier - private boolean overLoaded = false; - private int currentConsumerIdx = -1; + private boolean outgoingDemandUpdateNeeded = false; - private double capacity; // What is the max capacity + private boolean overloaded = false; + + private double capacity; // What is the max capacity. Can probably be removed + + private final ArrayList updatedDemands = new ArrayList<>(); public FlowDistributor(FlowGraph graph) { super(graph); } - public double getTotalDemand() { - return totalDemand; + public double getTotalIncomingDemand() { + return totalIncomingDemand; } - public double getTotalSupply() { - return totalSupply; + public double getCurrentIncomingSupply() { + return currentIncomingSupply; } public double getCapacity() { @@ -58,38 +61,61 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu public long onUpdate(long now) { + // Check if current supply is different from total demand + if (this.outgoingDemandUpdateNeeded) { + this.updateOutgoingDemand(); + + return Long.MAX_VALUE; + } + + this.updateOutgoingSupplies(); + return Long.MAX_VALUE; } - private void distributeSupply() { - // if supply >= demand -> push supplies to all tasks - if (this.totalSupply > this.totalDemand) { + private void updateOutgoingDemand() { + this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand); - // If this came from a state of overload, provide all consumers with their demand - if (this.overLoaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); - } - } + this.outgoingDemandUpdateNeeded = false; - if (this.currentConsumerIdx != -1) { - this.pushSupply( - this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); - this.currentConsumerIdx = -1; - } + this.invalidate(); + } - this.overLoaded = false; - } + private void updateOutgoingSupplies() { + + // If the demand is higher than the current supply, the system is overloaded. + // The available supply is distributed based on the current distribution function. + if (this.totalIncomingDemand > this.currentIncomingSupply) { + this.overloaded = true; - // if supply < demand -> distribute the supply over all consumers - else { - this.overLoaded = true; - double[] supplies = redistributeSupply(this.demands, this.totalSupply); + double[] supplies = distributeSupply(this.incomingDemands, this.currentIncomingSupply); for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx]); + } + + } else { + + // If the distributor was overloaded before, but is not anymore: + // provide all consumers with their demand + if (this.overloaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } + } + this.overloaded = false; + } + + // Update the supplies of the consumers that changed their demand in the current cycle + else { + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } } } + + this.updatedDemands.clear(); } private record Demand(int idx, double value) {} @@ -97,10 +123,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu /** * Distributed the available supply over the different demands. * The supply is distributed using MaxMin Fairness. - * - * TODO: Move this outside of the Distributor so we can easily add different redistribution methods */ - private static double[] redistributeSupply(ArrayList demands, double totalSupply) { + private static double[] distributeSupply(ArrayList demands, double currentSupply) { int inputSize = demands.size(); final double[] supplies = new double[inputSize]; @@ -116,7 +140,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return i1.compareTo(i2); }); - double availableCapacity = totalSupply; // totalSupply + double availableCapacity = currentSupply; // totalSupply for (int i = 0; i < inputSize; i++) { double d = tempDemands[i].value; @@ -133,7 +157,6 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu availableCapacity -= r; } - // Return the used capacity return supplies; } @@ -146,15 +169,15 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu consumerEdge.setConsumerIndex(this.consumerEdges.size()); this.consumerEdges.add(consumerEdge); - this.demands.add(0.0); - this.supplies.add(0.0); + this.incomingDemands.add(0.0); + this.outgoingSupplies.add(0.0); } @Override public void addSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = supplierEdge; this.capacity = supplierEdge.getCapacity(); - this.totalSupply = 0; + this.currentIncomingSupply = 0; } @Override @@ -165,84 +188,87 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return; } - this.totalDemand -= consumerEdge.getDemand(); + this.totalIncomingDemand -= consumerEdge.getDemand(); this.consumerEdges.remove(idx); - this.demands.remove(idx); - this.supplies.remove(idx); + this.incomingDemands.remove(idx); + this.outgoingSupplies.remove(idx); // update the consumer index for all consumerEdges higher than this. for (int i = idx; i < this.consumerEdges.size(); i++) { this.consumerEdges.get(i).setConsumerIndex(i); } - this.currentConsumerIdx = -1; + for (int i = 0; i < this.updatedDemands.size(); i++) { + int j = this.updatedDemands.get(i); - if (this.overLoaded) { - this.distributeSupply(); + if (j == idx) { + this.updatedDemands.remove(idx); + } + if (j > idx) { + this.updatedDemands.set(i, j - 1); + } } - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override public void removeSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = null; this.capacity = 0; - this.totalSupply = 0; + this.currentIncomingSupply = 0; + + this.invalidate(); } @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { int idx = consumerEdge.getConsumerIndex(); - this.currentConsumerIdx = idx; - if (idx == -1) { System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); return; } // Update the total demand (This is cheaper than summing over all demands) - double prevDemand = demands.get(idx); + double prevDemand = incomingDemands.get(idx); - demands.set(idx, newDemand); - this.totalDemand += (newDemand - prevDemand); + incomingDemands.set(idx, newDemand); + this.totalIncomingDemand += (newDemand - prevDemand); - if (overLoaded) { - distributeSupply(); - } + this.updatedDemands.add(idx); - // Send new totalDemand to CPU - // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.totalSupply = newSupply; + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + this.currentIncomingSupply = newSupply; - this.distributeSupply(); + this.invalidate(); } @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.supplierEdge.pushDemand(newDemand); } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); } - if (supplies.get(idx) == newSupply) { + if (outgoingSupplies.get(idx) == newSupply) { return; } - supplies.set(idx, newSupply); + outgoingSupplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index b7162508..9521f2ce 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -108,24 +108,38 @@ public class FlowEdge { /** * Push new demand from the Consumer to the Supplier */ - public void pushDemand(double newDemand) { - if (newDemand == this.demand) { + public void pushDemand(double newDemand, boolean forceThrough) { + if ((newDemand == this.demand) && !forceThrough) { return; } this.demand = newDemand; - this.supplier.handleDemand(this, newDemand); + this.supplier.handleIncomingDemand(this, newDemand); + } + + /** + * Push new demand from the Consumer to the Supplier + */ + public void pushDemand(double newDemand) { + this.pushDemand(newDemand, false); } /** * Push new supply from the Supplier to the Consumer */ - public void pushSupply(double newSupply) { - if (newSupply == this.supply) { + public void pushSupply(double newSupply, boolean forceThrough) { + if ((newSupply == this.supply) && !forceThrough) { return; } this.supply = newSupply; - this.consumer.handleSupply(this, newSupply); + this.consumer.handleIncomingSupply(this, newSupply); + } + + /** + * Push new supply from the Supplier to the Consumer + */ + public void pushSupply(double newSupply) { + this.pushSupply(newSupply, false); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index 64cd0d8c..e24e9f93 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -24,7 +24,7 @@ package org.opendc.simulator.engine.graph; import java.time.InstantSource; import org.opendc.simulator.engine.engine.FlowEngine; -import org.opendc.simulator.engine.engine.FlowTimerQueue; +import org.opendc.simulator.engine.engine.FlowEventQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +109,7 @@ public abstract class FlowNode { private long deadline = Long.MAX_VALUE; /** - * The index of the timer in the {@link FlowTimerQueue}. + * The index of the timer in the {@link FlowEventQueue}. */ private int timerIndex = -1; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java index 84602ee0..da65392b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowSupplier { - void handleDemand(FlowEdge consumerEdge, double newDemand); + void handleIncomingDemand(FlowEdge consumerEdge, double newDemand); - void pushSupply(FlowEdge consumerEdge, double newSupply); + void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply); void addConsumerEdge(FlowEdge consumerEdge); -- cgit v1.2.3