diff options
| author | Niels Thiele <noleu66@posteo.net> | 2025-07-15 15:53:37 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-15 15:53:37 +0200 |
| commit | a5f3c19200026b9476edc39b951eb1003cff0831 (patch) | |
| tree | 785201fc58893902c15d691eff252ac91f08f690 /opendc-simulator/opendc-simulator-flow | |
| parent | b2dc97dc84f56174ede9f273999ade2ed059d431 (diff) | |
Add configurable resource distribution at host level (#355)
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
11 files changed, 978 insertions, 272 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index f7fc2728..cae3e8a1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -27,49 +27,49 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import org.opendc.common.ResourceType; import org.opendc.simulator.engine.engine.FlowEngine; -import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy; -import org.opendc.simulator.engine.graph.distributionPolicies.MaxMinFairnessPolicy; +import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { - private static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); - private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); - private HashMap<Integer, FlowEdge> supplierEdges = +/** + * A {@link FlowDistributor} is a node that distributes supply from multiple suppliers to multiple consumers. + * It can be used to model host-level resource distribution, such as CPU, memory, and GPU distribution. + * It is a subclass of {@link FlowNode} and implements both {@link FlowSupplier} and {@link FlowConsumer}. + * It maintains a list of consumer edges and supplier edges, and it can handle incoming demands and supplies. + * It also provides methods to update outgoing demands and supplies based on the incoming demands and supplies. + * This class is abstract and should be extended by specific implementations that define the distribution strategy. + * It uses a {@link FlowDistributorFactory.DistributionPolicy} to determine how to distribute the supply among the consumers. + * The default distribution policy is {@link MaxMinFairnessPolicy}, which distributes the supply fairly among the consumers. + */ +public abstract class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { + protected static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); + protected final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); + protected HashMap<Integer, FlowEdge> supplierEdges = new HashMap<>(); // The suppliers that provide supply to this distributor - private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers + protected final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers + protected final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers - private double totalIncomingDemand; // The total demand of all the consumers + protected double totalIncomingDemand; // The total demand of all the consumers // AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable - private HashMap<Integer, Double> currentIncomingSupplies = + protected HashMap<Integer, Double> currentIncomingSupplies = new HashMap<>(); // The current supply provided by the suppliers - private Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers - - private boolean outgoingDemandUpdateNeeded = false; - private Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle + protected Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers - private ResourceType supplierResourceType; - private ResourceType consumerResourceType; + protected boolean outgoingDemandUpdateNeeded = false; + protected Set<Integer> updatedDemands = + new HashSet<>(); // Array of consumers that updated their demand in this cycle - private boolean overloaded = false; + protected ResourceType supplierResourceType; + protected ResourceType consumerResourceType; - private double capacity; // What is the max capacity. Can probably be removed - private DistributionPolicy distributionPolicy; + protected double capacity; // What is the max capacity. Can probably be removed public FlowDistributor(FlowEngine engine) { super(engine); - this.distributionPolicy = new MaxMinFairnessPolicy(); - } - - public FlowDistributor(FlowEngine engine, DistributionPolicy distributionPolicy) { - super(engine); - this.distributionPolicy = distributionPolicy; } public double getTotalIncomingDemand() { @@ -100,64 +100,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return Long.MAX_VALUE; } - private void updateOutgoingDemand() { - // equally distribute the demand to all suppliers - for (FlowEdge supplierEdge : this.supplierEdges.values()) { - this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size()); - // alternatively a relative share could be used, based on capacity minus current incoming supply - // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() - - // currentIncomingSupplies.get(idx) / supplierEdges.size())); - } - - this.outgoingDemandUpdateNeeded = false; - - this.invalidate(); - } + protected abstract void updateOutgoingDemand(); // TODO: This should probably be moved to the distribution strategy - private void updateOutgoingSupplies() { - - // If the demand is higher than the current supply, the system is overloaded. - // The available supply is distributed based on the current distribution function. - if (this.totalIncomingDemand > this.totalIncomingSupply) { - this.overloaded = true; - - double[] supplies = this.distributionPolicy.distributeSupply( - this.incomingDemands, - new ArrayList<>(this.currentIncomingSupplies.values()), - this.totalIncomingSupply); + protected abstract void updateOutgoingSupplies(); - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); - } - - } else { - - // If the distributor was overloaded before, but is not anymore: - // provide all consumers with their demand - if (this.overloaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { - this.pushOutgoingSupply( - this.consumerEdges.get(idx), - this.incomingDemands.get(idx), - this.getConsumerResourceType()); - } - } - this.overloaded = false; - } - - // Update the supplies of the consumers that changed their demand in the current cycle - else { - for (int idx : this.updatedDemands) { - this.pushOutgoingSupply( - this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); - } - } - } - - this.updatedDemands.clear(); - } + public abstract double[] distributeSupply( + ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply); /** * Add a new consumer. @@ -322,4 +271,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu public ResourceType getConsumerResourceType() { return this.consumerResourceType; } + + public Boolean hasSupplierEdges() { + for (FlowEdge edge : this.supplierEdges.values()) { + if (edge != null) { + return true; + } + } + return false; + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java new file mode 100644 index 00000000..4a13beb2 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.Objects; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A Best Effort Flow Distributor that implements a timesliced round-robin approach. + * + * Key principles: + * - Timesliced Round Robin: Resources are allocated to consumers in a round-robin manner + * - Non-Guaranteed Shares: No fixed allocation per consumer, distribution based on current demand + * - Optimized Utilization: Maximizes resource utilization during idle periods + * + * This scheduler is suitable for environments with fluctuating workloads where fairness + * is less important than maximizing overall resource utilization. + * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a> + */ +public class BestEffortFlowDistributor extends FlowDistributor { + + private int currentRoundRobinIndex = 0; + private boolean overloaded = false; + private final long roundRobinInterval; + private long lastRoundRobinUpdate; + + public BestEffortFlowDistributor(FlowEngine flowEngine, long roundRobinInterval) { + super(flowEngine); + this.roundRobinInterval = roundRobinInterval; + this.lastRoundRobinUpdate = -roundRobinInterval; + } + + /** + * Updates the outgoing demand based on the total incoming demand. + * Prioritizes already utilized suppliers when potential supply exceeds demand. + */ + @Override + protected void updateOutgoingDemand() { + + // If potential supply exceeds demand, prioritize already utilized suppliers + if (this.capacity > this.totalIncomingDemand && this.totalIncomingDemand > 0) { + // Best-effort: try to satisfy demand using already active suppliers first + double remainingDemand = this.totalIncomingDemand; + + // Phase 1: Prioritize suppliers that are currently providing supply + for (var entry : this.supplierEdges.entrySet()) { + int supplierIdx = entry.getKey(); + FlowEdge supplierEdge = entry.getValue(); + double currentSupply = this.currentIncomingSupplies.get(supplierIdx); + + if (currentSupply > 0 && remainingDemand > 0) { + // Try to satisfy as much demand as possible from this already active supplier + double demandForThisSupplier = Math.min(remainingDemand, supplierEdge.getCapacity()); + this.pushOutgoingDemand(supplierEdge, demandForThisSupplier); + remainingDemand -= demandForThisSupplier; + } + } + + // Phase 2: If demand still remains, use inactive suppliers + if (remainingDemand > 0) { + for (var entry : this.supplierEdges.entrySet()) { + int supplierIdx = entry.getKey(); + FlowEdge supplierEdge = entry.getValue(); + double currentSupply = this.currentIncomingSupplies.get(supplierIdx); + + if (currentSupply == 0 && remainingDemand > 0) { + double demandForThisSupplier = Math.min(remainingDemand, supplierEdge.getCapacity()); + this.pushOutgoingDemand(supplierEdge, demandForThisSupplier); + remainingDemand -= demandForThisSupplier; + } + } + } + } else { + // System is overloaded or no demand: distribute demand equally across all suppliers + double demandPerSupplier = this.totalIncomingDemand / this.supplierEdges.size(); + + for (FlowEdge supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, demandPerSupplier); + } + } + + this.outgoingDemandUpdateNeeded = false; + this.invalidate(); + } + + /** + * Updates the outgoing supplies using a best-effort approach. + * When overloaded, uses round-robin distribution. Otherwise, satisfies demands optimally. + */ + @Override + protected void updateOutgoingSupplies() { + // Check if system is overloaded (demand exceeds supply) + if (this.totalIncomingDemand > this.totalIncomingSupply) { + this.overloaded = true; + + // Use the distribution algorithm for supply allocation + double[] supplies = this.distributeSupply( + this.incomingDemands, + new ArrayList<>(this.currentIncomingSupplies.values()), + this.totalIncomingSupply); + + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); + } + } else { + // System is not overloaded - satisfy all demands and utilize remaining capacity + + if (this.overloaded) { + // Transitioning from overloaded to non-overloaded state + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), + this.incomingDemands.get(idx), + this.getConsumerResourceType()); + } + } + this.overloaded = false; + } else { + // Update supplies for consumers that changed their demand + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); + } + } + } + + this.updatedDemands.clear(); + } + + /** + * Distributes available supply using a best-effort, round-robin approach. + * Algorithm: + * 1. First pass: Satisfy demands up to available capacity in round-robin order + * 2. Second pass: Distribute remaining capacity to consumers with unsatisfied demand + * 3. Optimize utilization by giving extra capacity to active consumers + */ + @Override + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = this.consumerEdges.size(); + double[] allocation = new double[numConsumers]; + + if (numConsumers == 0 || totalSupply <= 0) { + return allocation; + } + + double remainingSupply = totalSupply; + + // Phase 1: Round-robin distribution based on demand + // Start from the current round-robin index to ensure fairness over time + for (int round = 0; round < numConsumers && remainingSupply > 0; round++) { + int idx = (currentRoundRobinIndex + round) % numConsumers; + double demand = demands.get(idx); + + if (demand > allocation[idx]) { + // Calculate how much we can allocate in this round + double unmetDemand = demand - allocation[idx]; + double toAllocate = Math.min(unmetDemand, remainingSupply); + + allocation[idx] += toAllocate; + remainingSupply -= toAllocate; + } + } + + // Phase 2: Distribute any remaining supply to maximize utilization + // Give preference to consumers with the highest relative demand + if (remainingSupply > 0) { + // Create a list of consumers with unsatisfied demand, sorted by relative need + ArrayList<Integer> unsatisfiedConsumers = new ArrayList<>(); + for (int i = 0; i < numConsumers; i++) { + if (demands.get(i) > allocation[i]) { + unsatisfiedConsumers.add(i); + } + } + + // If no unsatisfied demand, distribute remaining capacity equally among active consumers + if (unsatisfiedConsumers.isEmpty()) { + // Find consumers with any demand (active consumers) + ArrayList<Integer> activeConsumers = new ArrayList<>(); + for (int i = 0; i < numConsumers; i++) { + if (demands.get(i) > 0) { + activeConsumers.add(i); + } + } + + if (!activeConsumers.isEmpty()) { + double extraPerConsumer = remainingSupply / activeConsumers.size(); + for (int idx : activeConsumers) { + allocation[idx] += extraPerConsumer; + } + } + } else { + // Distribute remaining supply proportionally to unsatisfied demand + double totalUnsatisfiedDemand = 0; + for (int idx : unsatisfiedConsumers) { + totalUnsatisfiedDemand += demands.get(idx) - allocation[idx]; + } + + for (int idx : unsatisfiedConsumers) { + double unsatisfiedDemand = demands.get(idx) - allocation[idx]; + double proportion = unsatisfiedDemand / totalUnsatisfiedDemand; + allocation[idx] += remainingSupply * proportion; + } + } + } + + // Update round-robin index for next allocation cycle + if (numConsumers > 0) { + currentRoundRobinIndex = (currentRoundRobinIndex + 1) % numConsumers; + } + + return allocation; + } + + /** + * Enhanced onUpdate method that implements time-sliced round-robin scheduling. + * This method ensures the round-robin index advances at regular intervals, + * creating true time-sliced behavior for best-effort scheduling. + */ + @Override + public long onUpdate(long now) { + long nextUpdate = Long.MAX_VALUE; + + boolean updateNeeded = false; + + // Check if it's time for a round-robin advancement + if (now >= lastRoundRobinUpdate + this.roundRobinInterval) { + updateNeeded = true; + lastRoundRobinUpdate = now; + + // Schedule next round-robin update + nextUpdate = now + this.roundRobinInterval; + } else { + // Schedule the next potential round-robin update + nextUpdate = lastRoundRobinUpdate + this.roundRobinInterval; + } + + // Update demands if needed + if (this.outgoingDemandUpdateNeeded || updateNeeded) { + this.updateOutgoingDemand(); + } + + // Update supplies if needed + if (!this.outgoingSupplies.isEmpty() || updateNeeded) { + this.updateOutgoingSupplies(); + } + + if (this.consumerEdges.isEmpty() || !this.hasSupplierEdges()) { + nextUpdate = Long.MAX_VALUE; + } + + return nextUpdate; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java deleted file mode 100644 index 3a8bebbc..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -import java.util.ArrayList; - -public interface DistributionPolicy { - double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java deleted file mode 100644 index 53cded87..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -public class DistributionPolicyFactory { - - public enum DistributionPolicyType { - MaxMinFairness, - FixedShare; - } - - public static DistributionPolicy getDistributionStrategy(DistributionPolicyType distributionPolicyType) { - - return switch (distributionPolicyType) { - case MaxMinFairness -> new MaxMinFairnessPolicy(); - case FixedShare -> new FixedShare(1); - // actively misspelling - default -> throw new IllegalArgumentException( - "Unknown distribution strategy type: " + distributionPolicyType); - }; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java new file mode 100644 index 00000000..f58164cf --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; + +/** + * A {@link FlowDistributor} that implements the Equal Share distribution policy. + * <p> + * This distributor allocates resources equally among all suppliers and consumers, ensuring that each supplier and + * consumer receives an equal share of the total capacity. + * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a> + */ +public class EqualShareFlowDistributor extends FlowDistributor { + + public EqualShareFlowDistributor(FlowEngine engine) { + super(engine); + } + + /** + * Updates the outgoing demand for each supplier edge based on the total capacity. + * The demand is equally distributed among all suppliers. + * This method is called when the outgoing demand needs to be updated. + */ + @Override + protected void updateOutgoingDemand() { + double equalShare = this.capacity / this.supplierEdges.size(); + + for (var supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, equalShare); + } + + this.outgoingDemandUpdateNeeded = false; + } + + /** + * Updates the outgoing supply for each consumer edge based on the total supply. + * The supply is equally distributed among all consumers. + * This method is called when the outgoing supply needs to be updated. + */ + @Override + protected void updateOutgoingSupplies() { + double[] equalShare = distributeSupply(incomingDemands, outgoingSupplies, this.capacity); + + for (var consumerEdge : this.consumerEdges) { + this.pushOutgoingSupply(consumerEdge, equalShare[consumerEdge.getConsumerIndex()]); + } + } + + @Override + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = demands.size(); + double[] allocation = new double[numConsumers]; + double equalShare = totalSupply / numConsumers; + + // Equal share regardless of individual demands + Arrays.fill(allocation, equalShare); + + return allocation; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java new file mode 100644 index 00000000..c0a8cd13 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A {@link FlowDistributor} that implements the First Fit policy for distributing flow. + * + * This distributor allocates resources to consumers based on the first available supply that meets their demand. + * It does not attempt to balance loads or optimize resource usage beyond the first fit principle. + * It tries to place demands on already existing supplies without creating new ones. + * It assumes that resources can be partitioned, if one supplier cannot satisfy the demand, it will try to combine multiple suppliers. + */ +public class FirstFitPolicyFlowDistributor extends FlowDistributor { + + public FirstFitPolicyFlowDistributor(FlowEngine engine) { + super(engine); + } + + /** + * Updates the outgoing demand for suppliers in sequential order by their index. + * With each supplier being allocated up to its full capacity before moving to the next supplier. + */ + @Override + protected void updateOutgoingDemand() { + double remainingDemand = this.totalIncomingDemand; + + // Sort supplier edges by their index to ensure consistent first-fit ordering + var sortedSuppliers = this.supplierEdges.entrySet().stream() + .sorted((e1, e2) -> Integer.compare(e1.getKey(), e2.getKey())) + .toList(); + + // Apply First Fit strategy: fill suppliers in order until demand is satisfied + for (var supplierEntry : sortedSuppliers) { + var supplierEdge = supplierEntry.getValue(); + double supplierCapacity = supplierEdge.getCapacity(); + + if (remainingDemand <= 0) { + // No more demand to allocate + this.pushOutgoingDemand(supplierEdge, 0.0); + } else if (remainingDemand <= supplierCapacity) { + // This supplier can handle all remaining demand + this.pushOutgoingDemand(supplierEdge, remainingDemand); + remainingDemand = 0; + } else { + // This supplier gets filled to capacity, demand continues to next supplier + this.pushOutgoingDemand(supplierEdge, supplierCapacity); + remainingDemand -= supplierCapacity; + } + } + + this.outgoingDemandUpdateNeeded = false; + } + + /** + * Consumers receive their full demanded amount if it can be satisfied by the available supply, + * or zero if it cannot. + */ + @Override + protected void updateOutgoingSupplies() { + ArrayList<Double> currentPossibleSupplies = new ArrayList<>(); + for (var currentIncomingSupply : currentIncomingSupplies.entrySet()) { + currentPossibleSupplies.add(currentIncomingSupply.getValue()); + } + + double[] shares = distributeSupply(incomingDemands, currentPossibleSupplies, totalIncomingSupply); + + for (FlowEdge consumerEdge : this.consumerEdges) { + this.pushOutgoingSupply(consumerEdge, shares[consumerEdge.getConsumerIndex()]); + } + } + + /** + * Distributes supply among consumers using the First Fit allocation principle. + * Each consumer demand is allocated by trying suppliers in order, potentially + * combining multiple suppliers to satisfy a single demand. + * + * @param demands List of demand values from consumers + * @param currentSupply List of available supply values from suppliers + * @param totalSupply Total amount of supply available (unused in this implementation) + * @return Array of allocation amounts for each consumer + * + * @see #updateOutgoingSupplies() + */ + @Override + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = demands.size(); + double[] allocation = new double[numConsumers]; + + // Create a copy of current supply to track remaining capacity as we allocate + ArrayList<Double> remainingSupply = new ArrayList<>(currentSupply); + + // For each demand, try to satisfy it using suppliers in order + for (int i = 0; i < numConsumers; i++) { + double remainingDemand = demands.get(i); + double totalAllocated = 0.0; + + if (remainingDemand > 0) { + // Try each supplier in order until demand is satisfied + for (int j = 0; j < remainingSupply.size() && remainingDemand > 0; j++) { + double availableSupply = remainingSupply.get(j); + + if (availableSupply > 0) { + // Allocate as much as possible from this supplier + double allocatedFromThisSupplier = Math.min(availableSupply, remainingDemand); + + totalAllocated += allocatedFromThisSupplier; + remainingDemand -= allocatedFromThisSupplier; + + // Reduce the remaining supply capacity + remainingSupply.set(j, availableSupply - allocatedFromThisSupplier); + } + } + } + + allocation[i] = totalAllocated; + } + + return allocation; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java deleted file mode 100644 index baa04975..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -import java.util.ArrayList; - -/** - * A distribution policy that distributes supply equally among all nodes. - * The share can be set to a fixed value, defaulting to 1. - * This policy not implemented yet and is used as a placeholder. - */ -public class FixedShare implements DistributionPolicy { - - private int share; - - public FixedShare() { - this.share = 1; - } - - public FixedShare(int share) { - this.share = share; - } - - @Override - public double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply) { - return new double[0]; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java new file mode 100644 index 00000000..4c0a84d1 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.HashSet; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A {@link FlowDistributor} that implements Fixed Share GPU scheduling. + * + * This distributor allocates a dedicated, consistent portion of GPU resources to each VM (consumer), + * ensuring predictable availability and stable performance. Each active consumer receives a fixed + * share of the total GPU capacity, regardless of their individual demand or the demand of other consumers. + * This policy is heavily inspired by the fixed share GPU scheduling policy used in NVIDIA's MIG (Multi-Instance GPU) technology. + * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a> + * Key characteristics: + * - Each consumer gets a fixed percentage of total GPU capacity when active + * - Unused shares (from inactive consumers) remain unallocated, not redistributed + * - Performance remains consistent and predictable for each consumer + * - Share allocation is based on maximum supported consumers, not currently active ones + */ +public class FixedShareFlowDistributor extends FlowDistributor { + + private double fixedShare; + private final double shareRatio; + private int[] notSuppliedConsumers; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Creates a Fixed Share Flow Distributor. + * + * @param engine The flow engine + * @param shareRatio The fixed share ratio for each consumer (between 0.0 and 1.0) + */ + public FixedShareFlowDistributor(FlowEngine engine, double shareRatio) { + super(engine); + + if (shareRatio <= 0 || shareRatio > 1) { + throw new IllegalArgumentException("Share ratio must be between 0.0 and 1.0"); + } + this.shareRatio = shareRatio; + + // Each consumer gets an equal fixed share of the total capacity + this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size(); + + // Initialize tracking for round-robin prioritization + this.notSuppliedConsumers = new int[0]; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Distribution Logic + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Updates the outgoing demand to suppliers. + * In Fixed Share mode, we request the total amount needed to satisfy all active consumers' fixed shares. + */ + @Override + protected void updateOutgoingDemand() { + // Calculate total demand based on active consumers + this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size(); + + // Distribute demand equally across all suppliers + for (FlowEdge supplierEdge : supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, this.fixedShare); + } + + this.outgoingDemandUpdateNeeded = false; + this.invalidate(); + } + + /** + * Updates the outgoing supplies to consumers. + * Each active consumer receives their fixed share, regardless of their actual demand. + */ + @Override + protected void updateOutgoingSupplies() { + // Calculate the fixed allocation per consumer + + // Distribute to each consumer + int consumerIndex = 0; + if (this.consumerEdges.size() == this.supplierEdges.size()) { + for (FlowEdge consumerEdge : this.consumerEdges) { + this.pushOutgoingSupply(consumerEdge, this.fixedShare); + } + } else { + double[] supplies = distributeSupply(this.incomingDemands, this.outgoingSupplies, this.totalIncomingSupply); + for (FlowEdge consumerEdge : this.consumerEdges) { + if (supplies[consumerIndex] <= 0.0) { + continue; + } + this.pushOutgoingSupply(consumerEdge, this.fixedShare); + } + } + } + + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + double[] supplies = new double[this.consumerEdges.size()]; + + if (this.consumerEdges.size() < this.supplierEdges.size()) { + for (FlowEdge consumerEdge : this.consumerEdges) { + supplies[consumerEdge.getConsumerIndex()] = this.fixedShare; + } + } else { + // Round-robin approach: prioritize consumers that didn't get resources last time + ArrayList<Integer> currentNotSuppliedList = new ArrayList<>(); + + // Calculate how many consumers we can supply with available resources + int maxConsumersToSupply = (int) Math.floor(totalSupply / this.fixedShare); + int consumersSupplied = 0; + + // First pass: try to supply consumers that were not supplied in the previous round + for (int index : this.notSuppliedConsumers) { + if (index < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply) { + supplies[index] = this.fixedShare; + consumersSupplied++; + } + } + + // Second pass: supply remaining consumers if we still have capacity + for (int i = 0; i < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply; i++) { + if (supplies[i] == 0.0) { // This consumer hasn't been supplied yet + supplies[i] = this.fixedShare; + consumersSupplied++; + } + } + + // Build the list of consumers that didn't get resources this round + for (int i = 0; i < this.consumerEdges.size(); i++) { + if (supplies[i] == 0.0) { + currentNotSuppliedList.add(i); + } + } + + // Update the tracking array for next round + this.notSuppliedConsumers = + currentNotSuppliedList.stream().mapToInt(Integer::intValue).toArray(); + } + + return supplies; + } + + @Override + // index of not supplied consumers also need to be updated + public void removeConsumerEdge(FlowEdge consumerEdge) { + int idx = consumerEdge.getConsumerIndex(); + + if (idx == -1) { + return; + } + + this.totalIncomingDemand -= consumerEdge.getDemand(); + + // Remove idx from consumers that updated their demands + this.updatedDemands.remove(idx); + + this.consumerEdges.remove(idx); + this.incomingDemands.remove(idx); + this.outgoingSupplies.remove(idx); + + // update the consumer index for all consumerEdges higher than this. + for (int i = idx; i < this.consumerEdges.size(); i++) { + FlowEdge other = this.consumerEdges.get(i); + + other.setConsumerIndex(other.getConsumerIndex() - 1); + } + + HashSet<Integer> newUpdatedDemands = new HashSet<>(); + for (int idx_other : this.updatedDemands) { + if (idx_other > idx) { + newUpdatedDemands.add(idx_other - 1); + } else { + newUpdatedDemands.add(idx_other); + } + } + this.updatedDemands = newUpdatedDemands; + + // Decrease the index of not supplied consumers + for (int i = 0; i < this.notSuppliedConsumers.length; i++) { + if (this.notSuppliedConsumers[i] > idx) { + this.notSuppliedConsumers[i]--; + } + } + + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java new file mode 100644 index 00000000..eb5d4ff7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; + +public class FlowDistributorFactory { + + public enum DistributionPolicy { + BEST_EFFORT, + EQUAL_SHARE, + FIRST_FIT, + FIXED_SHARE, + MAX_MIN_FAIRNESS; + + private final Map<String, Object> properties = new HashMap<>(); + + public void setProperty(String key, Object value) { + properties.put(key, value); + } + + public Object getProperty(String key) { + return properties.get(key); + } + + public <T> T getProperty(String key, Class<T> type) { + return type.cast(properties.get(key)); + } + + public Set<String> getPropertyNames() { + return properties.keySet(); + } + } + + public static FlowDistributor getFlowDistributor(FlowEngine flowEngine, DistributionPolicy distributionPolicyType) { + + return switch (distributionPolicyType) { + case BEST_EFFORT -> new BestEffortFlowDistributor( + flowEngine, distributionPolicyType.getProperty("updateIntervalLength", Long.class)); + case EQUAL_SHARE -> new EqualShareFlowDistributor(flowEngine); + case FIRST_FIT -> new FirstFitPolicyFlowDistributor(flowEngine); + case FIXED_SHARE -> { + if (!distributionPolicyType.getPropertyNames().contains("shareRatio")) { + throw new IllegalArgumentException( + "FixedShare distribution policy requires a 'shareRatio' property to be set."); + } + yield new FixedShareFlowDistributor( + flowEngine, distributionPolicyType.getProperty("shareRatio", Double.class)); + } + default -> new MaxMinFairnessFlowDistributor(flowEngine); + }; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java new file mode 100644 index 00000000..9b48f204 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph.distributionPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.graph.FlowDistributor; +import org.opendc.simulator.engine.graph.FlowEdge; + +/** + * A flow distributor that implements the max-min fairness distribution policy. + * <p> + * This policy distributes the available supply to consumers in a way that maximizes the minimum supply received by any + * consumer, ensuring fairness across all consumers. + */ +public class MaxMinFairnessFlowDistributor extends FlowDistributor { + + private boolean overloaded = false; + + public MaxMinFairnessFlowDistributor(FlowEngine engine) { + super(engine); + } + + protected void updateOutgoingDemand() { + // equally distribute the demand to all suppliers + for (FlowEdge supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size()); + // alternatively a relative share could be used, based on capacity minus current incoming supply + // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() - + // currentIncomingSupplies.get(idx) / supplierEdges.size())); + } + + this.outgoingDemandUpdateNeeded = false; + + this.invalidate(); + } + + // TODO: This should probably be moved to the distribution strategy + protected void updateOutgoingSupplies() { + + // If the demand is higher than the current supply, the system is overloaded. + // The available supply is distributed based on the current distribution function. + if (this.totalIncomingDemand > this.totalIncomingSupply) { + this.overloaded = true; + + double[] supplies = this.distributeSupply( + this.incomingDemands, + new ArrayList<>(this.currentIncomingSupplies.values()), + this.totalIncomingSupply); + + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); + } + + } else { + + // If the distributor was overloaded before, but is not anymore: + // provide all consumers with their demand + if (this.overloaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), + this.incomingDemands.get(idx), + this.getConsumerResourceType()); + } + } + this.overloaded = false; + } + + // Update the supplies of the consumers that changed their demand in the current cycle + else { + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply( + this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); + } + } + } + + this.updatedDemands.clear(); + } + + private record Demand(int idx, double value) {} + + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { + int inputSize = demands.size(); + + final double[] supplies = new double[inputSize]; + final Demand[] tempDemands = new Demand[inputSize]; + + for (int i = 0; i < inputSize; i++) { + tempDemands[i] = new Demand(i, demands.get(i)); + } + + Arrays.sort(tempDemands, (o1, o2) -> { + Double i1 = o1.value; + Double i2 = o2.value; + return i1.compareTo(i2); + }); + + double availableCapacity = totalSupply; + + for (int i = 0; i < inputSize; i++) { + double d = tempDemands[i].value; + + if (d == 0.0) { + continue; + } + + double availableShare = availableCapacity / (inputSize - i); + double r = Math.min(d, availableShare); + + int idx = tempDemands[i].idx; + supplies[idx] = r; // Update the rates + availableCapacity -= r; + } + + return supplies; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java deleted file mode 100644 index 484e7fe4..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2025 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.graph.distributionPolicies; - -import java.util.ArrayList; -import java.util.Arrays; - -/** - * A distribution policy that implements the Max-Min Fairness algorithm. - * This policy distributes supply to demands in a way that maximizes the minimum - * allocation across all demands, ensuring fairness. - */ -public class MaxMinFairnessPolicy implements DistributionPolicy { - private record Demand(int idx, double value) {} - - @Override - public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { - int inputSize = demands.size(); - - final double[] supplies = new double[inputSize]; - final Demand[] tempDemands = new Demand[inputSize]; - - for (int i = 0; i < inputSize; i++) { - tempDemands[i] = new Demand(i, demands.get(i)); - } - - Arrays.sort(tempDemands, (o1, o2) -> { - Double i1 = o1.value; - Double i2 = o2.value; - return i1.compareTo(i2); - }); - - double availableCapacity = totalSupply; - - for (int i = 0; i < inputSize; i++) { - double d = tempDemands[i].value; - - if (d == 0.0) { - continue; - } - - double availableShare = availableCapacity / (inputSize - i); - double r = Math.min(d, availableShare); - - int idx = tempDemands[i].idx; - supplies[idx] = r; // Update the rates - availableCapacity -= r; - } - - return supplies; - } -} |
