summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main/java
diff options
context:
space:
mode:
authorNiels Thiele <noleu66@posteo.net>2025-07-15 15:53:37 +0200
committerGitHub <noreply@github.com>2025-07-15 15:53:37 +0200
commita5f3c19200026b9476edc39b951eb1003cff0831 (patch)
tree785201fc58893902c15d691eff252ac91f08f690 /opendc-simulator/opendc-simulator-flow/src/main/java
parentb2dc97dc84f56174ede9f273999ade2ed059d431 (diff)
Add configurable resource distribution at host level (#355)
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java120
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java277
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java29
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java42
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java84
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java145
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java48
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java214
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java142
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java72
11 files changed, 978 insertions, 272 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index f7fc2728..cae3e8a1 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -27,49 +27,49 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import org.opendc.common.ResourceType;
import org.opendc.simulator.engine.engine.FlowEngine;
-import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy;
-import org.opendc.simulator.engine.graph.distributionPolicies.MaxMinFairnessPolicy;
+import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class);
- private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
- private HashMap<Integer, FlowEdge> supplierEdges =
+/**
+ * A {@link FlowDistributor} is a node that distributes supply from multiple suppliers to multiple consumers.
+ * It can be used to model host-level resource distribution, such as CPU, memory, and GPU distribution.
+ * It is a subclass of {@link FlowNode} and implements both {@link FlowSupplier} and {@link FlowConsumer}.
+ * It maintains a list of consumer edges and supplier edges, and it can handle incoming demands and supplies.
+ * It also provides methods to update outgoing demands and supplies based on the incoming demands and supplies.
+ * This class is abstract and should be extended by specific implementations that define the distribution strategy.
+ * It uses a {@link FlowDistributorFactory.DistributionPolicy} to determine how to distribute the supply among the consumers.
+ * The default distribution policy is {@link MaxMinFairnessPolicy}, which distributes the supply fairly among the consumers.
+ */
+public abstract class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
+ protected static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class);
+ protected final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
+ protected HashMap<Integer, FlowEdge> supplierEdges =
new HashMap<>(); // The suppliers that provide supply to this distributor
- private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers
- private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers
+ protected final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers
+ protected final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers
- private double totalIncomingDemand; // The total demand of all the consumers
+ protected double totalIncomingDemand; // The total demand of all the consumers
// AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable
- private HashMap<Integer, Double> currentIncomingSupplies =
+ protected HashMap<Integer, Double> currentIncomingSupplies =
new HashMap<>(); // The current supply provided by the suppliers
- private Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers
-
- private boolean outgoingDemandUpdateNeeded = false;
- private Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle
+ protected Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers
- private ResourceType supplierResourceType;
- private ResourceType consumerResourceType;
+ protected boolean outgoingDemandUpdateNeeded = false;
+ protected Set<Integer> updatedDemands =
+ new HashSet<>(); // Array of consumers that updated their demand in this cycle
- private boolean overloaded = false;
+ protected ResourceType supplierResourceType;
+ protected ResourceType consumerResourceType;
- private double capacity; // What is the max capacity. Can probably be removed
- private DistributionPolicy distributionPolicy;
+ protected double capacity; // What is the max capacity. Can probably be removed
public FlowDistributor(FlowEngine engine) {
super(engine);
- this.distributionPolicy = new MaxMinFairnessPolicy();
- }
-
- public FlowDistributor(FlowEngine engine, DistributionPolicy distributionPolicy) {
- super(engine);
- this.distributionPolicy = distributionPolicy;
}
public double getTotalIncomingDemand() {
@@ -100,64 +100,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return Long.MAX_VALUE;
}
- private void updateOutgoingDemand() {
- // equally distribute the demand to all suppliers
- for (FlowEdge supplierEdge : this.supplierEdges.values()) {
- this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size());
- // alternatively a relative share could be used, based on capacity minus current incoming supply
- // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() -
- // currentIncomingSupplies.get(idx) / supplierEdges.size()));
- }
-
- this.outgoingDemandUpdateNeeded = false;
-
- this.invalidate();
- }
+ protected abstract void updateOutgoingDemand();
// TODO: This should probably be moved to the distribution strategy
- private void updateOutgoingSupplies() {
-
- // If the demand is higher than the current supply, the system is overloaded.
- // The available supply is distributed based on the current distribution function.
- if (this.totalIncomingDemand > this.totalIncomingSupply) {
- this.overloaded = true;
-
- double[] supplies = this.distributionPolicy.distributeSupply(
- this.incomingDemands,
- new ArrayList<>(this.currentIncomingSupplies.values()),
- this.totalIncomingSupply);
+ protected abstract void updateOutgoingSupplies();
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType());
- }
-
- } else {
-
- // If the distributor was overloaded before, but is not anymore:
- // provide all consumers with their demand
- if (this.overloaded) {
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
- this.pushOutgoingSupply(
- this.consumerEdges.get(idx),
- this.incomingDemands.get(idx),
- this.getConsumerResourceType());
- }
- }
- this.overloaded = false;
- }
-
- // Update the supplies of the consumers that changed their demand in the current cycle
- else {
- for (int idx : this.updatedDemands) {
- this.pushOutgoingSupply(
- this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType());
- }
- }
- }
-
- this.updatedDemands.clear();
- }
+ public abstract double[] distributeSupply(
+ ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply);
/**
* Add a new consumer.
@@ -322,4 +271,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
public ResourceType getConsumerResourceType() {
return this.consumerResourceType;
}
+
+ public Boolean hasSupplierEdges() {
+ for (FlowEdge edge : this.supplierEdges.values()) {
+ if (edge != null) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
new file mode 100644
index 00000000..4a13beb2
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
@@ -0,0 +1,277 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine.graph.distributionPolicies;
+
+import java.util.ArrayList;
+import java.util.Objects;
+import org.opendc.simulator.engine.engine.FlowEngine;
+import org.opendc.simulator.engine.graph.FlowDistributor;
+import org.opendc.simulator.engine.graph.FlowEdge;
+
+/**
+ * A Best Effort Flow Distributor that implements a timesliced round-robin approach.
+ *
+ * Key principles:
+ * - Timesliced Round Robin: Resources are allocated to consumers in a round-robin manner
+ * - Non-Guaranteed Shares: No fixed allocation per consumer, distribution based on current demand
+ * - Optimized Utilization: Maximizes resource utilization during idle periods
+ *
+ * This scheduler is suitable for environments with fluctuating workloads where fairness
+ * is less important than maximizing overall resource utilization.
+ * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a>
+ */
+public class BestEffortFlowDistributor extends FlowDistributor {
+
+ private int currentRoundRobinIndex = 0;
+ private boolean overloaded = false;
+ private final long roundRobinInterval;
+ private long lastRoundRobinUpdate;
+
+ public BestEffortFlowDistributor(FlowEngine flowEngine, long roundRobinInterval) {
+ super(flowEngine);
+ this.roundRobinInterval = roundRobinInterval;
+ this.lastRoundRobinUpdate = -roundRobinInterval;
+ }
+
+ /**
+ * Updates the outgoing demand based on the total incoming demand.
+ * Prioritizes already utilized suppliers when potential supply exceeds demand.
+ */
+ @Override
+ protected void updateOutgoingDemand() {
+
+ // If potential supply exceeds demand, prioritize already utilized suppliers
+ if (this.capacity > this.totalIncomingDemand && this.totalIncomingDemand > 0) {
+ // Best-effort: try to satisfy demand using already active suppliers first
+ double remainingDemand = this.totalIncomingDemand;
+
+ // Phase 1: Prioritize suppliers that are currently providing supply
+ for (var entry : this.supplierEdges.entrySet()) {
+ int supplierIdx = entry.getKey();
+ FlowEdge supplierEdge = entry.getValue();
+ double currentSupply = this.currentIncomingSupplies.get(supplierIdx);
+
+ if (currentSupply > 0 && remainingDemand > 0) {
+ // Try to satisfy as much demand as possible from this already active supplier
+ double demandForThisSupplier = Math.min(remainingDemand, supplierEdge.getCapacity());
+ this.pushOutgoingDemand(supplierEdge, demandForThisSupplier);
+ remainingDemand -= demandForThisSupplier;
+ }
+ }
+
+ // Phase 2: If demand still remains, use inactive suppliers
+ if (remainingDemand > 0) {
+ for (var entry : this.supplierEdges.entrySet()) {
+ int supplierIdx = entry.getKey();
+ FlowEdge supplierEdge = entry.getValue();
+ double currentSupply = this.currentIncomingSupplies.get(supplierIdx);
+
+ if (currentSupply == 0 && remainingDemand > 0) {
+ double demandForThisSupplier = Math.min(remainingDemand, supplierEdge.getCapacity());
+ this.pushOutgoingDemand(supplierEdge, demandForThisSupplier);
+ remainingDemand -= demandForThisSupplier;
+ }
+ }
+ }
+ } else {
+ // System is overloaded or no demand: distribute demand equally across all suppliers
+ double demandPerSupplier = this.totalIncomingDemand / this.supplierEdges.size();
+
+ for (FlowEdge supplierEdge : this.supplierEdges.values()) {
+ this.pushOutgoingDemand(supplierEdge, demandPerSupplier);
+ }
+ }
+
+ this.outgoingDemandUpdateNeeded = false;
+ this.invalidate();
+ }
+
+ /**
+ * Updates the outgoing supplies using a best-effort approach.
+ * When overloaded, uses round-robin distribution. Otherwise, satisfies demands optimally.
+ */
+ @Override
+ protected void updateOutgoingSupplies() {
+ // Check if system is overloaded (demand exceeds supply)
+ if (this.totalIncomingDemand > this.totalIncomingSupply) {
+ this.overloaded = true;
+
+ // Use the distribution algorithm for supply allocation
+ double[] supplies = this.distributeSupply(
+ this.incomingDemands,
+ new ArrayList<>(this.currentIncomingSupplies.values()),
+ this.totalIncomingSupply);
+
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType());
+ }
+ } else {
+ // System is not overloaded - satisfy all demands and utilize remaining capacity
+
+ if (this.overloaded) {
+ // Transitioning from overloaded to non-overloaded state
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
+ this.pushOutgoingSupply(
+ this.consumerEdges.get(idx),
+ this.incomingDemands.get(idx),
+ this.getConsumerResourceType());
+ }
+ }
+ this.overloaded = false;
+ } else {
+ // Update supplies for consumers that changed their demand
+ for (int idx : this.updatedDemands) {
+ this.pushOutgoingSupply(
+ this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType());
+ }
+ }
+ }
+
+ this.updatedDemands.clear();
+ }
+
+ /**
+ * Distributes available supply using a best-effort, round-robin approach.
+ * Algorithm:
+ * 1. First pass: Satisfy demands up to available capacity in round-robin order
+ * 2. Second pass: Distribute remaining capacity to consumers with unsatisfied demand
+ * 3. Optimize utilization by giving extra capacity to active consumers
+ */
+ @Override
+ public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
+ int numConsumers = this.consumerEdges.size();
+ double[] allocation = new double[numConsumers];
+
+ if (numConsumers == 0 || totalSupply <= 0) {
+ return allocation;
+ }
+
+ double remainingSupply = totalSupply;
+
+ // Phase 1: Round-robin distribution based on demand
+ // Start from the current round-robin index to ensure fairness over time
+ for (int round = 0; round < numConsumers && remainingSupply > 0; round++) {
+ int idx = (currentRoundRobinIndex + round) % numConsumers;
+ double demand = demands.get(idx);
+
+ if (demand > allocation[idx]) {
+ // Calculate how much we can allocate in this round
+ double unmetDemand = demand - allocation[idx];
+ double toAllocate = Math.min(unmetDemand, remainingSupply);
+
+ allocation[idx] += toAllocate;
+ remainingSupply -= toAllocate;
+ }
+ }
+
+ // Phase 2: Distribute any remaining supply to maximize utilization
+ // Give preference to consumers with the highest relative demand
+ if (remainingSupply > 0) {
+ // Create a list of consumers with unsatisfied demand, sorted by relative need
+ ArrayList<Integer> unsatisfiedConsumers = new ArrayList<>();
+ for (int i = 0; i < numConsumers; i++) {
+ if (demands.get(i) > allocation[i]) {
+ unsatisfiedConsumers.add(i);
+ }
+ }
+
+ // If no unsatisfied demand, distribute remaining capacity equally among active consumers
+ if (unsatisfiedConsumers.isEmpty()) {
+ // Find consumers with any demand (active consumers)
+ ArrayList<Integer> activeConsumers = new ArrayList<>();
+ for (int i = 0; i < numConsumers; i++) {
+ if (demands.get(i) > 0) {
+ activeConsumers.add(i);
+ }
+ }
+
+ if (!activeConsumers.isEmpty()) {
+ double extraPerConsumer = remainingSupply / activeConsumers.size();
+ for (int idx : activeConsumers) {
+ allocation[idx] += extraPerConsumer;
+ }
+ }
+ } else {
+ // Distribute remaining supply proportionally to unsatisfied demand
+ double totalUnsatisfiedDemand = 0;
+ for (int idx : unsatisfiedConsumers) {
+ totalUnsatisfiedDemand += demands.get(idx) - allocation[idx];
+ }
+
+ for (int idx : unsatisfiedConsumers) {
+ double unsatisfiedDemand = demands.get(idx) - allocation[idx];
+ double proportion = unsatisfiedDemand / totalUnsatisfiedDemand;
+ allocation[idx] += remainingSupply * proportion;
+ }
+ }
+ }
+
+ // Update round-robin index for next allocation cycle
+ if (numConsumers > 0) {
+ currentRoundRobinIndex = (currentRoundRobinIndex + 1) % numConsumers;
+ }
+
+ return allocation;
+ }
+
+ /**
+ * Enhanced onUpdate method that implements time-sliced round-robin scheduling.
+ * This method ensures the round-robin index advances at regular intervals,
+ * creating true time-sliced behavior for best-effort scheduling.
+ */
+ @Override
+ public long onUpdate(long now) {
+ long nextUpdate = Long.MAX_VALUE;
+
+ boolean updateNeeded = false;
+
+ // Check if it's time for a round-robin advancement
+ if (now >= lastRoundRobinUpdate + this.roundRobinInterval) {
+ updateNeeded = true;
+ lastRoundRobinUpdate = now;
+
+ // Schedule next round-robin update
+ nextUpdate = now + this.roundRobinInterval;
+ } else {
+ // Schedule the next potential round-robin update
+ nextUpdate = lastRoundRobinUpdate + this.roundRobinInterval;
+ }
+
+ // Update demands if needed
+ if (this.outgoingDemandUpdateNeeded || updateNeeded) {
+ this.updateOutgoingDemand();
+ }
+
+ // Update supplies if needed
+ if (!this.outgoingSupplies.isEmpty() || updateNeeded) {
+ this.updateOutgoingSupplies();
+ }
+
+ if (this.consumerEdges.isEmpty() || !this.hasSupplierEdges()) {
+ nextUpdate = Long.MAX_VALUE;
+ }
+
+ return nextUpdate;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java
deleted file mode 100644
index 3a8bebbc..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (c) 2025 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.engine.graph.distributionPolicies;
-
-import java.util.ArrayList;
-
-public interface DistributionPolicy {
- double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply);
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java
deleted file mode 100644
index 53cded87..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicyFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2025 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.engine.graph.distributionPolicies;
-
-public class DistributionPolicyFactory {
-
- public enum DistributionPolicyType {
- MaxMinFairness,
- FixedShare;
- }
-
- public static DistributionPolicy getDistributionStrategy(DistributionPolicyType distributionPolicyType) {
-
- return switch (distributionPolicyType) {
- case MaxMinFairness -> new MaxMinFairnessPolicy();
- case FixedShare -> new FixedShare(1);
- // actively misspelling
- default -> throw new IllegalArgumentException(
- "Unknown distribution strategy type: " + distributionPolicyType);
- };
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
new file mode 100644
index 00000000..f58164cf
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine.graph.distributionPolicies;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.opendc.simulator.engine.engine.FlowEngine;
+import org.opendc.simulator.engine.graph.FlowDistributor;
+
+/**
+ * A {@link FlowDistributor} that implements the Equal Share distribution policy.
+ * <p>
+ * This distributor allocates resources equally among all suppliers and consumers, ensuring that each supplier and
+ * consumer receives an equal share of the total capacity.
+ * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a>
+ */
+public class EqualShareFlowDistributor extends FlowDistributor {
+
+ public EqualShareFlowDistributor(FlowEngine engine) {
+ super(engine);
+ }
+
+ /**
+ * Updates the outgoing demand for each supplier edge based on the total capacity.
+ * The demand is equally distributed among all suppliers.
+ * This method is called when the outgoing demand needs to be updated.
+ */
+ @Override
+ protected void updateOutgoingDemand() {
+ double equalShare = this.capacity / this.supplierEdges.size();
+
+ for (var supplierEdge : this.supplierEdges.values()) {
+ this.pushOutgoingDemand(supplierEdge, equalShare);
+ }
+
+ this.outgoingDemandUpdateNeeded = false;
+ }
+
+ /**
+ * Updates the outgoing supply for each consumer edge based on the total supply.
+ * The supply is equally distributed among all consumers.
+ * This method is called when the outgoing supply needs to be updated.
+ */
+ @Override
+ protected void updateOutgoingSupplies() {
+ double[] equalShare = distributeSupply(incomingDemands, outgoingSupplies, this.capacity);
+
+ for (var consumerEdge : this.consumerEdges) {
+ this.pushOutgoingSupply(consumerEdge, equalShare[consumerEdge.getConsumerIndex()]);
+ }
+ }
+
+ @Override
+ public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
+ int numConsumers = demands.size();
+ double[] allocation = new double[numConsumers];
+ double equalShare = totalSupply / numConsumers;
+
+ // Equal share regardless of individual demands
+ Arrays.fill(allocation, equalShare);
+
+ return allocation;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java
new file mode 100644
index 00000000..c0a8cd13
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine.graph.distributionPolicies;
+
+import java.util.ArrayList;
+import org.opendc.simulator.engine.engine.FlowEngine;
+import org.opendc.simulator.engine.graph.FlowDistributor;
+import org.opendc.simulator.engine.graph.FlowEdge;
+
+/**
+ * A {@link FlowDistributor} that implements the First Fit policy for distributing flow.
+ *
+ * This distributor allocates resources to consumers based on the first available supply that meets their demand.
+ * It does not attempt to balance loads or optimize resource usage beyond the first fit principle.
+ * It tries to place demands on already existing supplies without creating new ones.
+ * It assumes that resources can be partitioned, if one supplier cannot satisfy the demand, it will try to combine multiple suppliers.
+ */
+public class FirstFitPolicyFlowDistributor extends FlowDistributor {
+
+ public FirstFitPolicyFlowDistributor(FlowEngine engine) {
+ super(engine);
+ }
+
+ /**
+ * Updates the outgoing demand for suppliers in sequential order by their index.
+ * With each supplier being allocated up to its full capacity before moving to the next supplier.
+ */
+ @Override
+ protected void updateOutgoingDemand() {
+ double remainingDemand = this.totalIncomingDemand;
+
+ // Sort supplier edges by their index to ensure consistent first-fit ordering
+ var sortedSuppliers = this.supplierEdges.entrySet().stream()
+ .sorted((e1, e2) -> Integer.compare(e1.getKey(), e2.getKey()))
+ .toList();
+
+ // Apply First Fit strategy: fill suppliers in order until demand is satisfied
+ for (var supplierEntry : sortedSuppliers) {
+ var supplierEdge = supplierEntry.getValue();
+ double supplierCapacity = supplierEdge.getCapacity();
+
+ if (remainingDemand <= 0) {
+ // No more demand to allocate
+ this.pushOutgoingDemand(supplierEdge, 0.0);
+ } else if (remainingDemand <= supplierCapacity) {
+ // This supplier can handle all remaining demand
+ this.pushOutgoingDemand(supplierEdge, remainingDemand);
+ remainingDemand = 0;
+ } else {
+ // This supplier gets filled to capacity, demand continues to next supplier
+ this.pushOutgoingDemand(supplierEdge, supplierCapacity);
+ remainingDemand -= supplierCapacity;
+ }
+ }
+
+ this.outgoingDemandUpdateNeeded = false;
+ }
+
+ /**
+ * Consumers receive their full demanded amount if it can be satisfied by the available supply,
+ * or zero if it cannot.
+ */
+ @Override
+ protected void updateOutgoingSupplies() {
+ ArrayList<Double> currentPossibleSupplies = new ArrayList<>();
+ for (var currentIncomingSupply : currentIncomingSupplies.entrySet()) {
+ currentPossibleSupplies.add(currentIncomingSupply.getValue());
+ }
+
+ double[] shares = distributeSupply(incomingDemands, currentPossibleSupplies, totalIncomingSupply);
+
+ for (FlowEdge consumerEdge : this.consumerEdges) {
+ this.pushOutgoingSupply(consumerEdge, shares[consumerEdge.getConsumerIndex()]);
+ }
+ }
+
+ /**
+ * Distributes supply among consumers using the First Fit allocation principle.
+ * Each consumer demand is allocated by trying suppliers in order, potentially
+ * combining multiple suppliers to satisfy a single demand.
+ *
+ * @param demands List of demand values from consumers
+ * @param currentSupply List of available supply values from suppliers
+ * @param totalSupply Total amount of supply available (unused in this implementation)
+ * @return Array of allocation amounts for each consumer
+ *
+ * @see #updateOutgoingSupplies()
+ */
+ @Override
+ public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
+ int numConsumers = demands.size();
+ double[] allocation = new double[numConsumers];
+
+ // Create a copy of current supply to track remaining capacity as we allocate
+ ArrayList<Double> remainingSupply = new ArrayList<>(currentSupply);
+
+ // For each demand, try to satisfy it using suppliers in order
+ for (int i = 0; i < numConsumers; i++) {
+ double remainingDemand = demands.get(i);
+ double totalAllocated = 0.0;
+
+ if (remainingDemand > 0) {
+ // Try each supplier in order until demand is satisfied
+ for (int j = 0; j < remainingSupply.size() && remainingDemand > 0; j++) {
+ double availableSupply = remainingSupply.get(j);
+
+ if (availableSupply > 0) {
+ // Allocate as much as possible from this supplier
+ double allocatedFromThisSupplier = Math.min(availableSupply, remainingDemand);
+
+ totalAllocated += allocatedFromThisSupplier;
+ remainingDemand -= allocatedFromThisSupplier;
+
+ // Reduce the remaining supply capacity
+ remainingSupply.set(j, availableSupply - allocatedFromThisSupplier);
+ }
+ }
+ }
+
+ allocation[i] = totalAllocated;
+ }
+
+ return allocation;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java
deleted file mode 100644
index baa04975..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) 2025 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.engine.graph.distributionPolicies;
-
-import java.util.ArrayList;
-
-/**
- * A distribution policy that distributes supply equally among all nodes.
- * The share can be set to a fixed value, defaulting to 1.
- * This policy not implemented yet and is used as a placeholder.
- */
-public class FixedShare implements DistributionPolicy {
-
- private int share;
-
- public FixedShare() {
- this.share = 1;
- }
-
- public FixedShare(int share) {
- this.share = share;
- }
-
- @Override
- public double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply) {
- return new double[0];
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java
new file mode 100644
index 00000000..4c0a84d1
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine.graph.distributionPolicies;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import org.opendc.simulator.engine.engine.FlowEngine;
+import org.opendc.simulator.engine.graph.FlowDistributor;
+import org.opendc.simulator.engine.graph.FlowEdge;
+
+/**
+ * A {@link FlowDistributor} that implements Fixed Share GPU scheduling.
+ *
+ * This distributor allocates a dedicated, consistent portion of GPU resources to each VM (consumer),
+ * ensuring predictable availability and stable performance. Each active consumer receives a fixed
+ * share of the total GPU capacity, regardless of their individual demand or the demand of other consumers.
+ * This policy is heavily inspired by the fixed share GPU scheduling policy used in NVIDIA's MIG (Multi-Instance GPU) technology.
+ * <a href="https://docs.nvidia.com/vgpu/knowledge-base/latest/vgpu-features.html#vgpu-schedulers">original description</a>
+ * Key characteristics:
+ * - Each consumer gets a fixed percentage of total GPU capacity when active
+ * - Unused shares (from inactive consumers) remain unallocated, not redistributed
+ * - Performance remains consistent and predictable for each consumer
+ * - Share allocation is based on maximum supported consumers, not currently active ones
+ */
+public class FixedShareFlowDistributor extends FlowDistributor {
+
+ private double fixedShare;
+ private final double shareRatio;
+ private int[] notSuppliedConsumers;
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Constructor
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Creates a Fixed Share Flow Distributor.
+ *
+ * @param engine The flow engine
+ * @param shareRatio The fixed share ratio for each consumer (between 0.0 and 1.0)
+ */
+ public FixedShareFlowDistributor(FlowEngine engine, double shareRatio) {
+ super(engine);
+
+ if (shareRatio <= 0 || shareRatio > 1) {
+ throw new IllegalArgumentException("Share ratio must be between 0.0 and 1.0");
+ }
+ this.shareRatio = shareRatio;
+
+ // Each consumer gets an equal fixed share of the total capacity
+ this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size();
+
+ // Initialize tracking for round-robin prioritization
+ this.notSuppliedConsumers = new int[0];
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Distribution Logic
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Updates the outgoing demand to suppliers.
+ * In Fixed Share mode, we request the total amount needed to satisfy all active consumers' fixed shares.
+ */
+ @Override
+ protected void updateOutgoingDemand() {
+ // Calculate total demand based on active consumers
+ this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size();
+
+ // Distribute demand equally across all suppliers
+ for (FlowEdge supplierEdge : supplierEdges.values()) {
+ this.pushOutgoingDemand(supplierEdge, this.fixedShare);
+ }
+
+ this.outgoingDemandUpdateNeeded = false;
+ this.invalidate();
+ }
+
+ /**
+ * Updates the outgoing supplies to consumers.
+ * Each active consumer receives their fixed share, regardless of their actual demand.
+ */
+ @Override
+ protected void updateOutgoingSupplies() {
+ // Calculate the fixed allocation per consumer
+
+ // Distribute to each consumer
+ int consumerIndex = 0;
+ if (this.consumerEdges.size() == this.supplierEdges.size()) {
+ for (FlowEdge consumerEdge : this.consumerEdges) {
+ this.pushOutgoingSupply(consumerEdge, this.fixedShare);
+ }
+ } else {
+ double[] supplies = distributeSupply(this.incomingDemands, this.outgoingSupplies, this.totalIncomingSupply);
+ for (FlowEdge consumerEdge : this.consumerEdges) {
+ if (supplies[consumerIndex] <= 0.0) {
+ continue;
+ }
+ this.pushOutgoingSupply(consumerEdge, this.fixedShare);
+ }
+ }
+ }
+
+ public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
+ double[] supplies = new double[this.consumerEdges.size()];
+
+ if (this.consumerEdges.size() < this.supplierEdges.size()) {
+ for (FlowEdge consumerEdge : this.consumerEdges) {
+ supplies[consumerEdge.getConsumerIndex()] = this.fixedShare;
+ }
+ } else {
+ // Round-robin approach: prioritize consumers that didn't get resources last time
+ ArrayList<Integer> currentNotSuppliedList = new ArrayList<>();
+
+ // Calculate how many consumers we can supply with available resources
+ int maxConsumersToSupply = (int) Math.floor(totalSupply / this.fixedShare);
+ int consumersSupplied = 0;
+
+ // First pass: try to supply consumers that were not supplied in the previous round
+ for (int index : this.notSuppliedConsumers) {
+ if (index < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply) {
+ supplies[index] = this.fixedShare;
+ consumersSupplied++;
+ }
+ }
+
+ // Second pass: supply remaining consumers if we still have capacity
+ for (int i = 0; i < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply; i++) {
+ if (supplies[i] == 0.0) { // This consumer hasn't been supplied yet
+ supplies[i] = this.fixedShare;
+ consumersSupplied++;
+ }
+ }
+
+ // Build the list of consumers that didn't get resources this round
+ for (int i = 0; i < this.consumerEdges.size(); i++) {
+ if (supplies[i] == 0.0) {
+ currentNotSuppliedList.add(i);
+ }
+ }
+
+ // Update the tracking array for next round
+ this.notSuppliedConsumers =
+ currentNotSuppliedList.stream().mapToInt(Integer::intValue).toArray();
+ }
+
+ return supplies;
+ }
+
+ @Override
+ // index of not supplied consumers also need to be updated
+ public void removeConsumerEdge(FlowEdge consumerEdge) {
+ int idx = consumerEdge.getConsumerIndex();
+
+ if (idx == -1) {
+ return;
+ }
+
+ this.totalIncomingDemand -= consumerEdge.getDemand();
+
+ // Remove idx from consumers that updated their demands
+ this.updatedDemands.remove(idx);
+
+ this.consumerEdges.remove(idx);
+ this.incomingDemands.remove(idx);
+ this.outgoingSupplies.remove(idx);
+
+ // update the consumer index for all consumerEdges higher than this.
+ for (int i = idx; i < this.consumerEdges.size(); i++) {
+ FlowEdge other = this.consumerEdges.get(i);
+
+ other.setConsumerIndex(other.getConsumerIndex() - 1);
+ }
+
+ HashSet<Integer> newUpdatedDemands = new HashSet<>();
+ for (int idx_other : this.updatedDemands) {
+ if (idx_other > idx) {
+ newUpdatedDemands.add(idx_other - 1);
+ } else {
+ newUpdatedDemands.add(idx_other);
+ }
+ }
+ this.updatedDemands = newUpdatedDemands;
+
+ // Decrease the index of not supplied consumers
+ for (int i = 0; i < this.notSuppliedConsumers.length; i++) {
+ if (this.notSuppliedConsumers[i] > idx) {
+ this.notSuppliedConsumers[i]--;
+ }
+ }
+
+ this.outgoingDemandUpdateNeeded = true;
+ this.invalidate();
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java
new file mode 100644
index 00000000..eb5d4ff7
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine.graph.distributionPolicies;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.opendc.simulator.engine.engine.FlowEngine;
+import org.opendc.simulator.engine.graph.FlowDistributor;
+
+public class FlowDistributorFactory {
+
+ public enum DistributionPolicy {
+ BEST_EFFORT,
+ EQUAL_SHARE,
+ FIRST_FIT,
+ FIXED_SHARE,
+ MAX_MIN_FAIRNESS;
+
+ private final Map<String, Object> properties = new HashMap<>();
+
+ public void setProperty(String key, Object value) {
+ properties.put(key, value);
+ }
+
+ public Object getProperty(String key) {
+ return properties.get(key);
+ }
+
+ public <T> T getProperty(String key, Class<T> type) {
+ return type.cast(properties.get(key));
+ }
+
+ public Set<String> getPropertyNames() {
+ return properties.keySet();
+ }
+ }
+
+ public static FlowDistributor getFlowDistributor(FlowEngine flowEngine, DistributionPolicy distributionPolicyType) {
+
+ return switch (distributionPolicyType) {
+ case BEST_EFFORT -> new BestEffortFlowDistributor(
+ flowEngine, distributionPolicyType.getProperty("updateIntervalLength", Long.class));
+ case EQUAL_SHARE -> new EqualShareFlowDistributor(flowEngine);
+ case FIRST_FIT -> new FirstFitPolicyFlowDistributor(flowEngine);
+ case FIXED_SHARE -> {
+ if (!distributionPolicyType.getPropertyNames().contains("shareRatio")) {
+ throw new IllegalArgumentException(
+ "FixedShare distribution policy requires a 'shareRatio' property to be set.");
+ }
+ yield new FixedShareFlowDistributor(
+ flowEngine, distributionPolicyType.getProperty("shareRatio", Double.class));
+ }
+ default -> new MaxMinFairnessFlowDistributor(flowEngine);
+ };
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
new file mode 100644
index 00000000..9b48f204
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine.graph.distributionPolicies;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import org.opendc.simulator.engine.engine.FlowEngine;
+import org.opendc.simulator.engine.graph.FlowDistributor;
+import org.opendc.simulator.engine.graph.FlowEdge;
+
+/**
+ * A flow distributor that implements the max-min fairness distribution policy.
+ * <p>
+ * This policy distributes the available supply to consumers in a way that maximizes the minimum supply received by any
+ * consumer, ensuring fairness across all consumers.
+ */
+public class MaxMinFairnessFlowDistributor extends FlowDistributor {
+
+ private boolean overloaded = false;
+
+ public MaxMinFairnessFlowDistributor(FlowEngine engine) {
+ super(engine);
+ }
+
+ protected void updateOutgoingDemand() {
+ // equally distribute the demand to all suppliers
+ for (FlowEdge supplierEdge : this.supplierEdges.values()) {
+ this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size());
+ // alternatively a relative share could be used, based on capacity minus current incoming supply
+ // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() -
+ // currentIncomingSupplies.get(idx) / supplierEdges.size()));
+ }
+
+ this.outgoingDemandUpdateNeeded = false;
+
+ this.invalidate();
+ }
+
+ // TODO: This should probably be moved to the distribution strategy
+ protected void updateOutgoingSupplies() {
+
+ // If the demand is higher than the current supply, the system is overloaded.
+ // The available supply is distributed based on the current distribution function.
+ if (this.totalIncomingDemand > this.totalIncomingSupply) {
+ this.overloaded = true;
+
+ double[] supplies = this.distributeSupply(
+ this.incomingDemands,
+ new ArrayList<>(this.currentIncomingSupplies.values()),
+ this.totalIncomingSupply);
+
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType());
+ }
+
+ } else {
+
+ // If the distributor was overloaded before, but is not anymore:
+ // provide all consumers with their demand
+ if (this.overloaded) {
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
+ this.pushOutgoingSupply(
+ this.consumerEdges.get(idx),
+ this.incomingDemands.get(idx),
+ this.getConsumerResourceType());
+ }
+ }
+ this.overloaded = false;
+ }
+
+ // Update the supplies of the consumers that changed their demand in the current cycle
+ else {
+ for (int idx : this.updatedDemands) {
+ this.pushOutgoingSupply(
+ this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType());
+ }
+ }
+ }
+
+ this.updatedDemands.clear();
+ }
+
+ private record Demand(int idx, double value) {}
+
+ public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
+ int inputSize = demands.size();
+
+ final double[] supplies = new double[inputSize];
+ final Demand[] tempDemands = new Demand[inputSize];
+
+ for (int i = 0; i < inputSize; i++) {
+ tempDemands[i] = new Demand(i, demands.get(i));
+ }
+
+ Arrays.sort(tempDemands, (o1, o2) -> {
+ Double i1 = o1.value;
+ Double i2 = o2.value;
+ return i1.compareTo(i2);
+ });
+
+ double availableCapacity = totalSupply;
+
+ for (int i = 0; i < inputSize; i++) {
+ double d = tempDemands[i].value;
+
+ if (d == 0.0) {
+ continue;
+ }
+
+ double availableShare = availableCapacity / (inputSize - i);
+ double r = Math.min(d, availableShare);
+
+ int idx = tempDemands[i].idx;
+ supplies[idx] = r; // Update the rates
+ availableCapacity -= r;
+ }
+
+ return supplies;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java
deleted file mode 100644
index 484e7fe4..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2025 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.engine.graph.distributionPolicies;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-/**
- * A distribution policy that implements the Max-Min Fairness algorithm.
- * This policy distributes supply to demands in a way that maximizes the minimum
- * allocation across all demands, ensuring fairness.
- */
-public class MaxMinFairnessPolicy implements DistributionPolicy {
- private record Demand(int idx, double value) {}
-
- @Override
- public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
- int inputSize = demands.size();
-
- final double[] supplies = new double[inputSize];
- final Demand[] tempDemands = new Demand[inputSize];
-
- for (int i = 0; i < inputSize; i++) {
- tempDemands[i] = new Demand(i, demands.get(i));
- }
-
- Arrays.sort(tempDemands, (o1, o2) -> {
- Double i1 = o1.value;
- Double i2 = o2.value;
- return i1.compareTo(i2);
- });
-
- double availableCapacity = totalSupply;
-
- for (int i = 0; i < inputSize; i++) {
- double d = tempDemands[i].value;
-
- if (d == 0.0) {
- continue;
- }
-
- double availableShare = availableCapacity / (inputSize - i);
- double r = Math.min(d, availableShare);
-
- int idx = tempDemands[i].idx;
- supplies[idx] = r; // Update the rates
- availableCapacity -= r;
- }
-
- return supplies;
- }
-}