summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-09-26 16:17:39 +0200
committerGitHub <noreply@github.com>2025-09-26 16:17:39 +0200
commit2ba57fd06560f096def01a31f8e47827f0f01da0 (patch)
tree27ac3c625b1e406a6f574b30c6c0eab0e6eb862d /opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc
parente88adbc3fc15a2de717f9478454c5a5229ece10e (diff)
Converted maps in the FlowDistributor to Arrays for performance (#373)
* Updated the flowDistributor to use arrays instead of maps to improve performance. * Small cleanup
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java121
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java48
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java113
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java13
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java31
7 files changed, 164 insertions, 197 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index cb276183..cb2a3ba6 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -23,11 +23,10 @@
package org.opendc.simulator.engine.graph;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.opendc.common.ResourceType;
import org.opendc.simulator.engine.engine.FlowEngine;
import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory;
@@ -46,31 +45,52 @@ import org.slf4j.LoggerFactory;
*/
public abstract class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
protected static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class);
- protected final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
- protected HashMap<Integer, FlowEdge> supplierEdges =
- new HashMap<>(); // The suppliers that provide supply to this distributor
- protected final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers
- protected final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers
+ protected int numConsumers = 0;
+ protected final int maxConsumers;
+ protected final ArrayList<Integer> availableConsumerIndices;
+ protected final ArrayList<Integer> usedConsumerIndices;
+
+ protected final FlowEdge[] consumerEdges;
+ protected final double[] incomingDemands; // What is demanded by the consumers
+ protected final double[] outgoingSupplies; // What is supplied to the consumers
+ protected ArrayList<Integer> updatedDemands = new ArrayList<>();
+
+ protected double previousTotalDemand = 0.0;
protected double totalIncomingDemand; // The total demand of all the consumers
- // AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable
+
+ protected HashMap<Integer, FlowEdge> supplierEdges = new HashMap<>();
protected HashMap<Integer, Double> currentIncomingSupplies =
new HashMap<>(); // The current supply provided by the suppliers
protected Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers
protected boolean outgoingDemandUpdateNeeded = false;
protected boolean outgoingSupplyUpdateNeeded = false;
- protected Set<Integer> updatedDemands =
- new HashSet<>(); // Array of consumers that updated their demand in this cycle
protected ResourceType supplierResourceType;
protected ResourceType consumerResourceType;
protected double capacity; // What is the max capacity. Can probably be removed
- public FlowDistributor(FlowEngine engine) {
+ public FlowDistributor(FlowEngine engine, int maxConsumers) {
super(engine);
+
+ this.maxConsumers = maxConsumers;
+
+ this.availableConsumerIndices = new ArrayList<>(this.maxConsumers);
+ this.usedConsumerIndices = new ArrayList<>(this.maxConsumers);
+
+ for (int i = 0; i < this.maxConsumers; i++) {
+ this.availableConsumerIndices.add(i);
+ }
+
+ this.consumerEdges = new FlowEdge[this.maxConsumers];
+ // this.supplierEdges = new FlowEdge[this.maxSuppliers];
+ // this.incomingSupplies = new double[this.maxSuppliers];
+
+ this.incomingDemands = new double[this.maxConsumers];
+ this.outgoingSupplies = new double[this.maxConsumers];
}
public double getTotalIncomingDemand() {
@@ -95,7 +115,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
}
// TODO: look into whether this is always needed
- if (!this.outgoingSupplies.isEmpty()) {
+ if (this.numConsumers > 0) {
this.updateOutgoingSupplies();
}
@@ -106,8 +126,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
protected abstract void updateOutgoingSupplies();
- public abstract double[] distributeSupply(
- ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply);
+ public abstract double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply);
/**
* Add a new consumer.
@@ -115,31 +134,32 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
*/
@Override
public void addConsumerEdge(FlowEdge consumerEdge) {
- consumerEdge.setConsumerIndex(this.consumerEdges.size());
+ int consumerIndex = this.availableConsumerIndices.removeFirst();
+ this.usedConsumerIndices.add(consumerIndex);
- this.consumerEdges.add(consumerEdge);
- this.incomingDemands.add(0.0);
- this.outgoingSupplies.add(0.0);
- this.consumerResourceType = consumerEdge.getConsumerResourceType();
+ consumerEdge.setConsumerIndex(consumerIndex);
+
+ this.numConsumers++;
+ this.consumerEdges[consumerIndex] = consumerEdge;
this.outgoingDemandUpdateNeeded = true;
}
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
// supplierIndex not always set, so we use 0 as default to avoid index out of bounds
- int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex();
+ int supplierIndex = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex();
- this.supplierEdges.put(idx, supplierEdge);
+ this.supplierEdges.put(supplierIndex, supplierEdge);
this.capacity += supplierEdge.getCapacity();
- this.currentIncomingSupplies.put(idx, 0.0);
+ this.currentIncomingSupplies.put(supplierIndex, 0.0);
this.supplierResourceType = supplierEdge.getSupplierResourceType();
}
@Override
public void removeConsumerEdge(FlowEdge consumerEdge) {
- int idx = consumerEdge.getConsumerIndex();
+ int consumerIndex = consumerEdge.getConsumerIndex();
- if (idx == -1) {
+ if (consumerIndex == -1) {
return;
}
@@ -149,30 +169,18 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
}
// Remove idx from consumers that updated their demands
- this.updatedDemands.remove(idx);
-
- this.consumerEdges.remove(idx);
- this.incomingDemands.remove(idx);
- this.outgoingSupplies.remove(idx);
-
- // update the consumer index for all consumerEdges higher than this.
- for (int i = idx; i < this.consumerEdges.size(); i++) {
- FlowEdge other = this.consumerEdges.get(i);
-
- other.setConsumerIndex(other.getConsumerIndex() - 1);
+ if (this.updatedDemands.contains(consumerIndex)) {
+ this.updatedDemands.remove(Integer.valueOf(consumerIndex));
}
- HashSet<Integer> newUpdatedDemands = new HashSet<>();
+ this.consumerEdges[consumerIndex] = null;
+ this.incomingDemands[consumerIndex] = 0.0;
+ this.outgoingSupplies[consumerIndex] = 0.0;
- for (int idx_other : this.updatedDemands) {
- if (idx_other > idx) {
- newUpdatedDemands.add(idx_other - 1);
- } else {
- newUpdatedDemands.add(idx_other);
- }
- }
+ this.usedConsumerIndices.remove(Integer.valueOf(consumerIndex));
+ this.availableConsumerIndices.add(consumerIndex);
- this.updatedDemands = newUpdatedDemands;
+ this.numConsumers--;
this.outgoingDemandUpdateNeeded = true;
this.invalidate();
@@ -182,8 +190,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
public void removeSupplierEdge(FlowEdge supplierEdge) {
// supplierIndex not always set, so we use 0 as default to avoid index out of bounds
int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex();
- // to keep index consistent, entries are neutralized instead of removed
- // this.supplierEdges.put(idx, null);
this.supplierEdges.remove(idx);
this.capacity -= supplierEdge.getCapacity();
@@ -196,24 +202,25 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
@Override
public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
- int idx = consumerEdge.getConsumerIndex();
+ int consumerIndex = consumerEdge.getConsumerIndex();
- if (idx == -1) {
+ if (consumerIndex == -1) {
LOGGER.warn("Demand {} pushed by an unknown consumer", newDemand);
return;
}
// Update the total demand (This is cheaper than summing over all demands)
- double prevDemand = incomingDemands.get(idx);
+ double prevDemand = incomingDemands[consumerIndex];
- incomingDemands.set(idx, newDemand);
+ incomingDemands[consumerIndex] = newDemand;
// only update the total supply if the new supply is different from the previous one
this.totalIncomingDemand += (newDemand - prevDemand);
if (totalIncomingDemand < 0) {
this.totalIncomingDemand = 0.0;
}
- this.updatedDemands.add(idx);
+ // TODO: can be optimized by using a boolean array
+ this.updatedDemands.add(consumerIndex);
this.outgoingDemandUpdateNeeded = true;
this.invalidate();
@@ -244,22 +251,22 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
@Override
public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
- supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType(), this.consumerEdges.size());
+ supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType(), this.numConsumers);
}
@Override
public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
- int idx = consumerEdge.getConsumerIndex();
+ int consumerIndex = consumerEdge.getConsumerIndex();
- if (idx == -1) {
+ if (consumerIndex == -1) {
System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer");
}
- if (outgoingSupplies.get(idx) == newSupply) {
+ if (outgoingSupplies[consumerIndex] == newSupply) {
return;
}
- outgoingSupplies.set(idx, newSupply);
+ outgoingSupplies[consumerIndex] = newSupply;
consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType());
}
@@ -267,7 +274,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() {
return Map.of(
FlowEdge.NodeType.CONSUMING,
- this.consumerEdges,
+ Arrays.asList(this.consumerEdges),
FlowEdge.NodeType.SUPPLYING,
new ArrayList<>(this.supplierEdges.values()));
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
index 703aca35..04317d6a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
@@ -23,7 +23,6 @@
package org.opendc.simulator.engine.graph.distributionPolicies;
import java.util.ArrayList;
-import java.util.Objects;
import org.opendc.simulator.engine.engine.FlowEngine;
import org.opendc.simulator.engine.graph.FlowDistributor;
import org.opendc.simulator.engine.graph.FlowEdge;
@@ -47,8 +46,8 @@ public class BestEffortFlowDistributor extends FlowDistributor {
private final long roundRobinInterval;
private long lastRoundRobinUpdate;
- public BestEffortFlowDistributor(FlowEngine flowEngine, long roundRobinInterval) {
- super(flowEngine);
+ public BestEffortFlowDistributor(FlowEngine flowEngine, long roundRobinInterval, int maxConsumers) {
+ super(flowEngine, maxConsumers);
this.roundRobinInterval = roundRobinInterval;
this.lastRoundRobinUpdate = -roundRobinInterval;
}
@@ -122,28 +121,33 @@ public class BestEffortFlowDistributor extends FlowDistributor {
new ArrayList<>(this.currentIncomingSupplies.values()),
this.totalIncomingSupply);
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType());
+ for (int consumerIndex : this.usedConsumerIndices) {
+ this.pushOutgoingSupply(
+ this.consumerEdges[consumerIndex], supplies[consumerIndex], this.getConsumerResourceType());
}
} else {
// System is not overloaded - satisfy all demands and utilize remaining capacity
if (this.overloaded) {
- // Transitioning from overloaded to non-overloaded state
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
+ for (int consumerIndex : this.usedConsumerIndices) {
+ // TODO: I think we can remove this check
+ if (this.outgoingSupplies[consumerIndex] == this.incomingDemands[consumerIndex]) {
this.pushOutgoingSupply(
- this.consumerEdges.get(idx),
- this.incomingDemands.get(idx),
+ this.consumerEdges[consumerIndex],
+ this.incomingDemands[consumerIndex],
this.getConsumerResourceType());
}
}
this.overloaded = false;
- } else {
- // Update supplies for consumers that changed their demand
- for (int idx : this.updatedDemands) {
+ }
+
+ // Update the supplies of the consumers that changed their demand in the current cycle
+ else {
+ for (int consumerIndex : this.updatedDemands) {
this.pushOutgoingSupply(
- this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType());
+ this.consumerEdges[consumerIndex],
+ this.incomingDemands[consumerIndex],
+ this.getConsumerResourceType());
}
}
}
@@ -160,8 +164,8 @@ public class BestEffortFlowDistributor extends FlowDistributor {
* 3. Optimize utilization by giving extra capacity to active consumers
*/
@Override
- public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
- int numConsumers = this.consumerEdges.size();
+ public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) {
+ int numConsumers = this.consumerEdges.length;
double[] allocation = new double[numConsumers];
if (numConsumers == 0 || totalSupply <= 0) {
@@ -174,7 +178,7 @@ public class BestEffortFlowDistributor extends FlowDistributor {
// Start from the current round-robin index to ensure fairness over time
for (int round = 0; round < numConsumers && remainingSupply > 0; round++) {
int idx = (currentRoundRobinIndex + round) % numConsumers;
- double demand = demands.get(idx);
+ double demand = demands[idx];
if (demand > allocation[idx]) {
// Calculate how much we can allocate in this round
@@ -192,7 +196,7 @@ public class BestEffortFlowDistributor extends FlowDistributor {
// Create a list of consumers with unsatisfied demand, sorted by relative need
ArrayList<Integer> unsatisfiedConsumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
- if (demands.get(i) > allocation[i]) {
+ if (demands[i] > allocation[i]) {
unsatisfiedConsumers.add(i);
}
}
@@ -202,7 +206,7 @@ public class BestEffortFlowDistributor extends FlowDistributor {
// Find consumers with any demand (active consumers)
ArrayList<Integer> activeConsumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
- if (demands.get(i) > 0) {
+ if (demands[i] > 0) {
activeConsumers.add(i);
}
}
@@ -217,11 +221,11 @@ public class BestEffortFlowDistributor extends FlowDistributor {
// Distribute remaining supply proportionally to unsatisfied demand
double totalUnsatisfiedDemand = 0;
for (int idx : unsatisfiedConsumers) {
- totalUnsatisfiedDemand += demands.get(idx) - allocation[idx];
+ totalUnsatisfiedDemand += demands[idx] - allocation[idx];
}
for (int idx : unsatisfiedConsumers) {
- double unsatisfiedDemand = demands.get(idx) - allocation[idx];
+ double unsatisfiedDemand = demands[idx] - allocation[idx];
double proportion = unsatisfiedDemand / totalUnsatisfiedDemand;
allocation[idx] += remainingSupply * proportion;
}
@@ -269,7 +273,7 @@ public class BestEffortFlowDistributor extends FlowDistributor {
this.updateOutgoingSupplies();
}
- if (this.consumerEdges.isEmpty() || !this.hasSupplierEdges()) {
+ if (this.numConsumers == 0 || !this.hasSupplierEdges()) {
nextUpdate = Long.MAX_VALUE;
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
index 87ed7ca2..89b1b314 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
@@ -36,8 +36,8 @@ import org.opendc.simulator.engine.graph.FlowDistributor;
*/
public class EqualShareFlowDistributor extends FlowDistributor {
- public EqualShareFlowDistributor(FlowEngine engine) {
- super(engine);
+ public EqualShareFlowDistributor(FlowEngine engine, int maxConsumers) {
+ super(engine, maxConsumers);
}
/**
@@ -65,18 +65,19 @@ public class EqualShareFlowDistributor extends FlowDistributor {
*/
@Override
protected void updateOutgoingSupplies() {
- double[] equalShare = distributeSupply(incomingDemands, outgoingSupplies, this.capacity);
+ double[] equalShare = distributeSupply(
+ incomingDemands, new ArrayList<>(this.currentIncomingSupplies.values()), this.capacity);
- for (var consumerEdge : this.consumerEdges) {
- this.pushOutgoingSupply(consumerEdge, equalShare[consumerEdge.getConsumerIndex()]);
+ for (int consumerIndex : this.usedConsumerIndices) {
+ this.pushOutgoingSupply(
+ this.consumerEdges[consumerIndex], equalShare[consumerIndex], this.getConsumerResourceType());
}
}
@Override
- public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
- int numConsumers = demands.size();
- double[] allocation = new double[numConsumers];
- double equalShare = totalSupply / numConsumers;
+ public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) {
+ double[] allocation = new double[this.numConsumers];
+ double equalShare = totalSupply / this.numConsumers;
// Equal share regardless of individual demands
Arrays.fill(allocation, equalShare);
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java
index 9ab24d9f..67458ad6 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java
@@ -25,7 +25,6 @@ package org.opendc.simulator.engine.graph.distributionPolicies;
import java.util.ArrayList;
import org.opendc.simulator.engine.engine.FlowEngine;
import org.opendc.simulator.engine.graph.FlowDistributor;
-import org.opendc.simulator.engine.graph.FlowEdge;
/**
* A {@link FlowDistributor} that implements the First Fit policy for distributing flow.
@@ -37,8 +36,8 @@ import org.opendc.simulator.engine.graph.FlowEdge;
*/
public class FirstFitPolicyFlowDistributor extends FlowDistributor {
- public FirstFitPolicyFlowDistributor(FlowEngine engine) {
- super(engine);
+ public FirstFitPolicyFlowDistributor(FlowEngine engine, int maxConsumers) {
+ super(engine, maxConsumers);
}
/**
@@ -90,8 +89,9 @@ public class FirstFitPolicyFlowDistributor extends FlowDistributor {
double[] shares = distributeSupply(incomingDemands, currentPossibleSupplies, totalIncomingSupply);
- for (FlowEdge consumerEdge : this.consumerEdges) {
- this.pushOutgoingSupply(consumerEdge, shares[consumerEdge.getConsumerIndex()]);
+ for (int consumerIndex : this.usedConsumerIndices) {
+ this.pushOutgoingSupply(
+ this.consumerEdges[consumerIndex], shares[consumerIndex], this.getConsumerResourceType());
}
}
@@ -108,8 +108,8 @@ public class FirstFitPolicyFlowDistributor extends FlowDistributor {
* @see #updateOutgoingSupplies()
*/
@Override
- public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
- int numConsumers = demands.size();
+ public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) {
+ int numConsumers = demands.length;
double[] allocation = new double[numConsumers];
// Create a copy of current supply to track remaining capacity as we allocate
@@ -117,7 +117,7 @@ public class FirstFitPolicyFlowDistributor extends FlowDistributor {
// For each demand, try to satisfy it using suppliers in order
for (int i = 0; i < numConsumers; i++) {
- double remainingDemand = demands.get(i);
+ double remainingDemand = demands[i];
double totalAllocated = 0.0;
if (remainingDemand > 0) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java
index f22ea9fb..7a586b72 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java
@@ -23,7 +23,6 @@
package org.opendc.simulator.engine.graph.distributionPolicies;
import java.util.ArrayList;
-import java.util.HashSet;
import org.opendc.simulator.engine.engine.FlowEngine;
import org.opendc.simulator.engine.graph.FlowDistributor;
import org.opendc.simulator.engine.graph.FlowEdge;
@@ -46,7 +45,7 @@ public class FixedShareFlowDistributor extends FlowDistributor {
private double fixedShare;
private final double shareRatio;
- private int[] notSuppliedConsumers;
+ private final ArrayList<Integer> notSuppliedConsumers = new ArrayList<>();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@@ -58,8 +57,8 @@ public class FixedShareFlowDistributor extends FlowDistributor {
* @param engine The flow engine
* @param shareRatio The fixed share ratio for each consumer (between 0.0 and 1.0)
*/
- public FixedShareFlowDistributor(FlowEngine engine, double shareRatio) {
- super(engine);
+ public FixedShareFlowDistributor(FlowEngine engine, double shareRatio, int maxConsumers) {
+ super(engine, maxConsumers);
if (shareRatio <= 0 || shareRatio > 1) {
throw new IllegalArgumentException("Share ratio must be between 0.0 and 1.0");
@@ -68,9 +67,6 @@ public class FixedShareFlowDistributor extends FlowDistributor {
// Each consumer gets an equal fixed share of the total capacity
this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size();
-
- // Initialize tracking for round-robin prioritization
- this.notSuppliedConsumers = new int[0];
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -103,62 +99,47 @@ public class FixedShareFlowDistributor extends FlowDistributor {
protected void updateOutgoingSupplies() {
// Calculate the fixed allocation per consumer
- // Distribute to each consumer
- int consumerIndex = 0;
- if (this.consumerEdges.size() == this.supplierEdges.size()) {
- for (FlowEdge consumerEdge : this.consumerEdges) {
- this.pushOutgoingSupply(consumerEdge, this.fixedShare);
- }
- } else {
- double[] supplies = distributeSupply(this.incomingDemands, this.outgoingSupplies, this.totalIncomingSupply);
- for (FlowEdge consumerEdge : this.consumerEdges) {
- this.pushOutgoingSupply(consumerEdge, this.fixedShare);
- }
+ double[] supplies = distributeSupply(
+ this.incomingDemands, new ArrayList<>(this.currentIncomingSupplies.values()), this.totalIncomingSupply);
+
+ for (int consumerIndex : this.usedConsumerIndices) {
+ this.pushOutgoingSupply(
+ this.consumerEdges[consumerIndex], supplies[consumerIndex], this.getConsumerResourceType());
}
}
- public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
- double[] supplies = new double[this.consumerEdges.size()];
+ public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) {
+ double[] supplies = new double[this.maxConsumers];
+
+ if (this.numConsumers < this.supplierEdges.size() && this.fixedShare * this.numConsumers <= totalSupply) {
- if (this.consumerEdges.size() < this.supplierEdges.size()
- && this.fixedShare * this.consumerEdges.size() <= totalSupply) {
- for (FlowEdge consumerEdge : this.consumerEdges) {
- supplies[consumerEdge.getConsumerIndex()] = this.fixedShare;
+ for (int consumerIndex : this.usedConsumerIndices) {
+ supplies[consumerIndex] = this.fixedShare;
}
} else {
- // Round-robin approach: prioritize consumers that didn't get resources last time
- ArrayList<Integer> currentNotSuppliedList = new ArrayList<>();
-
// Calculate how many consumers we can supply with available resources
int maxConsumersToSupply = (int) Math.floor(totalSupply / this.fixedShare);
int consumersSupplied = 0;
// First pass: try to supply consumers that were not supplied in the previous round
- for (int index : this.notSuppliedConsumers) {
- if (index < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply) {
- supplies[index] = this.fixedShare;
- consumersSupplied++;
+ for (int consumerIndex : this.notSuppliedConsumers) {
+ if (consumersSupplied >= maxConsumersToSupply) {
+ break;
}
+ supplies[consumerIndex] = this.fixedShare;
+ consumersSupplied++;
}
- // Second pass: supply remaining consumers if we still have capacity
- for (int i = 0; i < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply; i++) {
- if (supplies[i] == 0.0) { // This consumer hasn't been supplied yet
- supplies[i] = this.fixedShare;
+ this.notSuppliedConsumers.clear();
+ for (int consumerIndex : this.usedConsumerIndices) {
+ if (supplies[consumerIndex] == 0.0) {
+ if (consumersSupplied >= maxConsumersToSupply) {
+ this.notSuppliedConsumers.add(consumerIndex);
+ }
+ supplies[consumerIndex] = this.fixedShare;
consumersSupplied++;
}
}
-
- // Build the list of consumers that didn't get resources this round
- for (int i = 0; i < this.consumerEdges.size(); i++) {
- if (supplies[i] == 0.0) {
- currentNotSuppliedList.add(i);
- }
- }
-
- // Update the tracking array for next round
- this.notSuppliedConsumers =
- currentNotSuppliedList.stream().mapToInt(Integer::intValue).toArray();
}
return supplies;
@@ -167,46 +148,16 @@ public class FixedShareFlowDistributor extends FlowDistributor {
@Override
// index of not supplied consumers also need to be updated
public void removeConsumerEdge(FlowEdge consumerEdge) {
- int idx = consumerEdge.getConsumerIndex();
+ int consumerIndex = consumerEdge.getConsumerIndex();
- if (idx == -1) {
+ if (consumerIndex == -1) {
return;
}
- this.totalIncomingDemand -= consumerEdge.getDemand();
-
- // Remove idx from consumers that updated their demands
- this.updatedDemands.remove(idx);
-
- this.consumerEdges.remove(idx);
- this.incomingDemands.remove(idx);
- this.outgoingSupplies.remove(idx);
-
- // update the consumer index for all consumerEdges higher than this.
- for (int i = idx; i < this.consumerEdges.size(); i++) {
- FlowEdge other = this.consumerEdges.get(i);
-
- other.setConsumerIndex(other.getConsumerIndex() - 1);
+ if (this.notSuppliedConsumers.contains(consumerIndex)) {
+ this.notSuppliedConsumers.remove(Integer.valueOf(consumerIndex));
}
- HashSet<Integer> newUpdatedDemands = new HashSet<>();
- for (int idx_other : this.updatedDemands) {
- if (idx_other > idx) {
- newUpdatedDemands.add(idx_other - 1);
- } else {
- newUpdatedDemands.add(idx_other);
- }
- }
- this.updatedDemands = newUpdatedDemands;
-
- // Decrease the index of not supplied consumers
- for (int i = 0; i < this.notSuppliedConsumers.length; i++) {
- if (this.notSuppliedConsumers[i] > idx) {
- this.notSuppliedConsumers[i]--;
- }
- }
-
- this.outgoingDemandUpdateNeeded = true;
- this.invalidate();
+ super.removeConsumerEdge(consumerEdge);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java
index eb5d4ff7..96e42d4c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java
@@ -56,22 +56,23 @@ public class FlowDistributorFactory {
}
}
- public static FlowDistributor getFlowDistributor(FlowEngine flowEngine, DistributionPolicy distributionPolicyType) {
+ public static FlowDistributor getFlowDistributor(
+ FlowEngine flowEngine, DistributionPolicy distributionPolicyType, int maxConsumers) {
return switch (distributionPolicyType) {
case BEST_EFFORT -> new BestEffortFlowDistributor(
- flowEngine, distributionPolicyType.getProperty("updateIntervalLength", Long.class));
- case EQUAL_SHARE -> new EqualShareFlowDistributor(flowEngine);
- case FIRST_FIT -> new FirstFitPolicyFlowDistributor(flowEngine);
+ flowEngine, distributionPolicyType.getProperty("updateIntervalLength", Long.class), maxConsumers);
+ case EQUAL_SHARE -> new EqualShareFlowDistributor(flowEngine, maxConsumers);
+ case FIRST_FIT -> new FirstFitPolicyFlowDistributor(flowEngine, maxConsumers);
case FIXED_SHARE -> {
if (!distributionPolicyType.getPropertyNames().contains("shareRatio")) {
throw new IllegalArgumentException(
"FixedShare distribution policy requires a 'shareRatio' property to be set.");
}
yield new FixedShareFlowDistributor(
- flowEngine, distributionPolicyType.getProperty("shareRatio", Double.class));
+ flowEngine, distributionPolicyType.getProperty("shareRatio", Double.class), maxConsumers);
}
- default -> new MaxMinFairnessFlowDistributor(flowEngine);
+ default -> new MaxMinFairnessFlowDistributor(flowEngine, maxConsumers);
};
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
index 371015a4..875412c6 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
@@ -24,7 +24,6 @@ package org.opendc.simulator.engine.graph.distributionPolicies;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Objects;
import org.opendc.simulator.engine.engine.FlowEngine;
import org.opendc.simulator.engine.graph.FlowDistributor;
import org.opendc.simulator.engine.graph.FlowEdge;
@@ -39,8 +38,8 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor {
private boolean overloaded = false;
- public MaxMinFairnessFlowDistributor(FlowEngine engine) {
- super(engine);
+ public MaxMinFairnessFlowDistributor(FlowEngine engine, int maxConsumers) {
+ super(engine, maxConsumers);
}
protected void updateOutgoingDemand() {
@@ -66,8 +65,9 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor {
new ArrayList<>(this.currentIncomingSupplies.values()),
this.totalIncomingSupply);
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType());
+ for (int consumerIndex : this.usedConsumerIndices) {
+ this.pushOutgoingSupply(
+ this.consumerEdges[consumerIndex], supplies[consumerIndex], this.getConsumerResourceType());
}
} else {
@@ -75,11 +75,12 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor {
// If the distributor was overloaded before, but is not anymore:
// provide all consumers with their demand
if (this.overloaded) {
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
+ for (int consumerIndex : this.usedConsumerIndices) {
+ // TODO: I think we can remove this check
+ if (this.outgoingSupplies[consumerIndex] != this.incomingDemands[consumerIndex]) {
this.pushOutgoingSupply(
- this.consumerEdges.get(idx),
- this.incomingDemands.get(idx),
+ this.consumerEdges[consumerIndex],
+ this.incomingDemands[consumerIndex],
this.getConsumerResourceType());
}
}
@@ -88,9 +89,11 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor {
// Update the supplies of the consumers that changed their demand in the current cycle
else {
- for (int idx : this.updatedDemands) {
+ for (int consumerIndex : this.updatedDemands) {
this.pushOutgoingSupply(
- this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType());
+ this.consumerEdges[consumerIndex],
+ this.incomingDemands[consumerIndex],
+ this.getConsumerResourceType());
}
}
}
@@ -100,14 +103,14 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor {
private record Demand(int idx, double value) {}
- public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) {
- int inputSize = demands.size();
+ public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) {
+ int inputSize = demands.length;
final double[] supplies = new double[inputSize];
final Demand[] tempDemands = new Demand[inputSize];
for (int i = 0; i < inputSize; i++) {
- tempDemands[i] = new Demand(i, demands.get(i));
+ tempDemands[i] = new Demand(i, demands[i]);
}
Arrays.sort(tempDemands, (o1, o2) -> {