diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-09-26 16:17:39 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-09-26 16:17:39 +0200 |
| commit | 2ba57fd06560f096def01a31f8e47827f0f01da0 (patch) | |
| tree | 27ac3c625b1e406a6f574b30c6c0eab0e6eb862d /opendc-simulator | |
| parent | e88adbc3fc15a2de717f9478454c5a5229ece10e (diff) | |
Converted maps in the FlowDistributor to Arrays for performance (#373)
* Updated the flowDistributor to use arrays instead of maps to improve performance.
* Small cleanup
Diffstat (limited to 'opendc-simulator')
10 files changed, 196 insertions, 235 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java index 8c14943f..b746156a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/machine/SimMachine.java @@ -206,10 +206,15 @@ public class SimMachine { this.availableResourceTypes = this.machineModel.getUsedResources(); + int maxTasks = Math.max( + this.machineModel.getCpuModel().getCoreCount(), + this.machineModel.getGpuModels().size() * 100); + // Create the psu and cpu and connect them this.psu = new SimPsu(engine); new FlowEdge(this.psu, powerDistributor); - this.distributors[ResourceType.POWER.ordinal()] = new MaxMinFairnessFlowDistributor(engine); // Maybe First fit + this.distributors[ResourceType.POWER.ordinal()] = new MaxMinFairnessFlowDistributor( + engine, 1 + this.machineModel.getGpuModels().size()); // Maybe First fit new FlowEdge(this.distributors[ResourceType.POWER.ordinal()], this.psu); this.computeResources.put( @@ -225,8 +230,8 @@ public class SimMachine { -1); // Create a FlowDistributor and add the cpu as supplier - this.distributors[ResourceType.CPU.ordinal()] = - FlowDistributorFactory.getFlowDistributor(engine, this.machineModel.getCpuDistributionStrategy()); + this.distributors[ResourceType.CPU.ordinal()] = FlowDistributorFactory.getFlowDistributor( + engine, this.machineModel.getCpuDistributionStrategy(), maxTasks); new FlowEdge( this.distributors[ResourceType.CPU.ordinal()], (FlowSupplier) this.computeResources.get(ResourceType.CPU).getFirst(), @@ -238,8 +243,8 @@ public class SimMachine { this.memory = new Memory(engine, this.machineModel.getMemory()); if (this.availableResourceTypes.contains(ResourceType.GPU)) { - this.distributors[ResourceType.GPU.ordinal()] = - FlowDistributorFactory.getFlowDistributor(engine, this.machineModel.getGpuDistributionStrategy()); + this.distributors[ResourceType.GPU.ordinal()] = FlowDistributorFactory.getFlowDistributor( + engine, this.machineModel.getGpuDistributionStrategy(), maxTasks); ArrayList<ComputeResource> gpus = new ArrayList<>(); for (GpuModel gpuModel : machineModel.getGpuModels()) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java index ec4089cd..782fab05 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/power/SimPsu.java @@ -38,11 +38,13 @@ import org.opendc.simulator.engine.graph.FlowSupplier; public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer { private long lastUpdate; - private double powerDemand = 0.0; - private double powerSupplied = 0.0; + private double incomingPowerDemand = 0.0; + private double outgoingPowerDemand = 0.0; + private double incomingPowerSupply = 0.0; + private double outgoingPowerSupply = 0.0; private double totalEnergyUsage = 0.0; - private FlowEdge cpuEdge; + private FlowEdge componentEdge; private FlowEdge powerSupplyEdge; private double capacity = Long.MAX_VALUE; @@ -57,7 +59,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer * @return <code>true</code> if the InPort is connected to an OutPort, <code>false</code> otherwise. */ public boolean isConnected() { - return cpuEdge != null; + return componentEdge != null; } /** @@ -65,15 +67,15 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer * <p> * This method provides access to the power consumption of the machine before PSU losses are applied. */ - public double getPowerDemand() { - return this.powerDemand; + public double getIncomingPowerDemand() { + return this.incomingPowerDemand; } /** * Return the instantaneous power usage of the machine (in W) measured at the InPort of the power supply. */ public double getPowerDraw() { - return this.powerSupplied; + return this.incomingPowerSupply; } /** @@ -106,10 +108,10 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public long onUpdate(long now) { updateCounters(); - double powerSupply = this.powerDemand; + double powerSupply = this.incomingPowerSupply; - if (powerSupply != this.powerSupplied) { - this.pushOutgoingSupply(this.cpuEdge, powerSupply); + if (powerSupply != this.incomingPowerDemand) { + this.pushOutgoingSupply(this.componentEdge, powerSupply); } return Long.MAX_VALUE; @@ -129,7 +131,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer long duration = now - lastUpdate; if (duration > 0) { // Compute the energy usage of the psu - this.totalEnergyUsage += (this.powerSupplied * duration * 0.001); + this.totalEnergyUsage += (this.incomingPowerSupply * duration * 0.001); } } @@ -139,37 +141,35 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - this.powerDemand = newDemand; + this.outgoingPowerDemand = newDemand; powerSupplyEdge.pushDemand(newDemand); } @Override public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { - this.powerSupplied = newSupply; - cpuEdge.pushSupply(newSupply); + this.outgoingPowerSupply = newSupply; + componentEdge.pushSupply(newSupply); } @Override public void handleIncomingDemand(FlowEdge consumerEdge, double newPowerDemand) { - updateCounters(); - this.powerDemand = newPowerDemand; + this.incomingPowerDemand = newPowerDemand; pushOutgoingDemand(this.powerSupplyEdge, newPowerDemand); } @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newPowerSupply) { - updateCounters(); - this.powerSupplied = newPowerSupply; + this.incomingPowerSupply = newPowerSupply; - pushOutgoingSupply(this.cpuEdge, newPowerSupply); + pushOutgoingSupply(this.componentEdge, newPowerSupply); } @Override public void addConsumerEdge(FlowEdge consumerEdge) { - this.cpuEdge = consumerEdge; + this.componentEdge = consumerEdge; } @Override @@ -179,7 +179,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public void removeConsumerEdge(FlowEdge consumerEdge) { - this.cpuEdge = null; + this.componentEdge = null; } @Override @@ -189,7 +189,7 @@ public final class SimPsu extends FlowNode implements FlowSupplier, FlowConsumer @Override public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { - List<FlowEdge> supplyingEdges = cpuEdge != null ? List.of(cpuEdge) : List.of(); + List<FlowEdge> supplyingEdges = componentEdge != null ? List.of(componentEdge) : List.of(); List<FlowEdge> consumingEdges = powerSupplyEdge != null ? List.of(powerSupplyEdge) : List.of(); return Map.of( diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index c91f94d4..70fe7e96 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -118,12 +118,6 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // instead iterate over the resources in the fragment as required resources not provided by the VM for (ResourceType resourceType : workload.getResourceTypes()) { this.usedResourceTypes.add(resourceType); - - // this.resourcesSupplied.put(resourceType, 0.0); - // this.newResourcesSupply.put(resourceType, 0.0); - // this.resourcesDemand.put(resourceType, 0.0); - // this.remainingWork.put(resourceType, 0.0); - // this.workloadFinished.put(resourceType, false); } } } @@ -146,11 +140,6 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { if (supplier.getSupplierResourceType() != ResourceType.AUXILIARY) { new FlowEdge(this, supplier); this.usedResourceTypes.add(supplier.getSupplierResourceType()); - // this.resourcesSupplied.put(supplier.getSupplierResourceType(), 0.0); - // this.newResourcesSupply.put(supplier.getSupplierResourceType(), 0.0); - // this.resourcesDemand.put(supplier.getSupplierResourceType(), 0.0); - // this.remainingWork.put(supplier.getSupplierResourceType(), 0.0); - // this.workloadFinished.put(supplier.getSupplierResourceType(), false); } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index cb276183..cb2a3ba6 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -23,11 +23,10 @@ package org.opendc.simulator.engine.graph; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.opendc.common.ResourceType; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.distributionPolicies.FlowDistributorFactory; @@ -46,31 +45,52 @@ import org.slf4j.LoggerFactory; */ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { protected static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); - protected final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); - protected HashMap<Integer, FlowEdge> supplierEdges = - new HashMap<>(); // The suppliers that provide supply to this distributor - protected final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers - protected final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers + protected int numConsumers = 0; + protected final int maxConsumers; + protected final ArrayList<Integer> availableConsumerIndices; + protected final ArrayList<Integer> usedConsumerIndices; + + protected final FlowEdge[] consumerEdges; + protected final double[] incomingDemands; // What is demanded by the consumers + protected final double[] outgoingSupplies; // What is supplied to the consumers + protected ArrayList<Integer> updatedDemands = new ArrayList<>(); + + protected double previousTotalDemand = 0.0; protected double totalIncomingDemand; // The total demand of all the consumers - // AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable + + protected HashMap<Integer, FlowEdge> supplierEdges = new HashMap<>(); protected HashMap<Integer, Double> currentIncomingSupplies = new HashMap<>(); // The current supply provided by the suppliers protected Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers protected boolean outgoingDemandUpdateNeeded = false; protected boolean outgoingSupplyUpdateNeeded = false; - protected Set<Integer> updatedDemands = - new HashSet<>(); // Array of consumers that updated their demand in this cycle protected ResourceType supplierResourceType; protected ResourceType consumerResourceType; protected double capacity; // What is the max capacity. Can probably be removed - public FlowDistributor(FlowEngine engine) { + public FlowDistributor(FlowEngine engine, int maxConsumers) { super(engine); + + this.maxConsumers = maxConsumers; + + this.availableConsumerIndices = new ArrayList<>(this.maxConsumers); + this.usedConsumerIndices = new ArrayList<>(this.maxConsumers); + + for (int i = 0; i < this.maxConsumers; i++) { + this.availableConsumerIndices.add(i); + } + + this.consumerEdges = new FlowEdge[this.maxConsumers]; + // this.supplierEdges = new FlowEdge[this.maxSuppliers]; + // this.incomingSupplies = new double[this.maxSuppliers]; + + this.incomingDemands = new double[this.maxConsumers]; + this.outgoingSupplies = new double[this.maxConsumers]; } public double getTotalIncomingDemand() { @@ -95,7 +115,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, } // TODO: look into whether this is always needed - if (!this.outgoingSupplies.isEmpty()) { + if (this.numConsumers > 0) { this.updateOutgoingSupplies(); } @@ -106,8 +126,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, protected abstract void updateOutgoingSupplies(); - public abstract double[] distributeSupply( - ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply); + public abstract double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply); /** * Add a new consumer. @@ -115,31 +134,32 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, */ @Override public void addConsumerEdge(FlowEdge consumerEdge) { - consumerEdge.setConsumerIndex(this.consumerEdges.size()); + int consumerIndex = this.availableConsumerIndices.removeFirst(); + this.usedConsumerIndices.add(consumerIndex); - this.consumerEdges.add(consumerEdge); - this.incomingDemands.add(0.0); - this.outgoingSupplies.add(0.0); - this.consumerResourceType = consumerEdge.getConsumerResourceType(); + consumerEdge.setConsumerIndex(consumerIndex); + + this.numConsumers++; + this.consumerEdges[consumerIndex] = consumerEdge; this.outgoingDemandUpdateNeeded = true; } @Override public void addSupplierEdge(FlowEdge supplierEdge) { // supplierIndex not always set, so we use 0 as default to avoid index out of bounds - int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); + int supplierIndex = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); - this.supplierEdges.put(idx, supplierEdge); + this.supplierEdges.put(supplierIndex, supplierEdge); this.capacity += supplierEdge.getCapacity(); - this.currentIncomingSupplies.put(idx, 0.0); + this.currentIncomingSupplies.put(supplierIndex, 0.0); this.supplierResourceType = supplierEdge.getSupplierResourceType(); } @Override public void removeConsumerEdge(FlowEdge consumerEdge) { - int idx = consumerEdge.getConsumerIndex(); + int consumerIndex = consumerEdge.getConsumerIndex(); - if (idx == -1) { + if (consumerIndex == -1) { return; } @@ -149,30 +169,18 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, } // Remove idx from consumers that updated their demands - this.updatedDemands.remove(idx); - - this.consumerEdges.remove(idx); - this.incomingDemands.remove(idx); - this.outgoingSupplies.remove(idx); - - // update the consumer index for all consumerEdges higher than this. - for (int i = idx; i < this.consumerEdges.size(); i++) { - FlowEdge other = this.consumerEdges.get(i); - - other.setConsumerIndex(other.getConsumerIndex() - 1); + if (this.updatedDemands.contains(consumerIndex)) { + this.updatedDemands.remove(Integer.valueOf(consumerIndex)); } - HashSet<Integer> newUpdatedDemands = new HashSet<>(); + this.consumerEdges[consumerIndex] = null; + this.incomingDemands[consumerIndex] = 0.0; + this.outgoingSupplies[consumerIndex] = 0.0; - for (int idx_other : this.updatedDemands) { - if (idx_other > idx) { - newUpdatedDemands.add(idx_other - 1); - } else { - newUpdatedDemands.add(idx_other); - } - } + this.usedConsumerIndices.remove(Integer.valueOf(consumerIndex)); + this.availableConsumerIndices.add(consumerIndex); - this.updatedDemands = newUpdatedDemands; + this.numConsumers--; this.outgoingDemandUpdateNeeded = true; this.invalidate(); @@ -182,8 +190,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, public void removeSupplierEdge(FlowEdge supplierEdge) { // supplierIndex not always set, so we use 0 as default to avoid index out of bounds int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); - // to keep index consistent, entries are neutralized instead of removed - // this.supplierEdges.put(idx, null); this.supplierEdges.remove(idx); this.capacity -= supplierEdge.getCapacity(); @@ -196,24 +202,25 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, @Override public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { - int idx = consumerEdge.getConsumerIndex(); + int consumerIndex = consumerEdge.getConsumerIndex(); - if (idx == -1) { + if (consumerIndex == -1) { LOGGER.warn("Demand {} pushed by an unknown consumer", newDemand); return; } // Update the total demand (This is cheaper than summing over all demands) - double prevDemand = incomingDemands.get(idx); + double prevDemand = incomingDemands[consumerIndex]; - incomingDemands.set(idx, newDemand); + incomingDemands[consumerIndex] = newDemand; // only update the total supply if the new supply is different from the previous one this.totalIncomingDemand += (newDemand - prevDemand); if (totalIncomingDemand < 0) { this.totalIncomingDemand = 0.0; } - this.updatedDemands.add(idx); + // TODO: can be optimized by using a boolean array + this.updatedDemands.add(consumerIndex); this.outgoingDemandUpdateNeeded = true; this.invalidate(); @@ -244,22 +251,22 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType(), this.consumerEdges.size()); + supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType(), this.numConsumers); } @Override public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { - int idx = consumerEdge.getConsumerIndex(); + int consumerIndex = consumerEdge.getConsumerIndex(); - if (idx == -1) { + if (consumerIndex == -1) { System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); } - if (outgoingSupplies.get(idx) == newSupply) { + if (outgoingSupplies[consumerIndex] == newSupply) { return; } - outgoingSupplies.set(idx, newSupply); + outgoingSupplies[consumerIndex] = newSupply; consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType()); } @@ -267,7 +274,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { return Map.of( FlowEdge.NodeType.CONSUMING, - this.consumerEdges, + Arrays.asList(this.consumerEdges), FlowEdge.NodeType.SUPPLYING, new ArrayList<>(this.supplierEdges.values())); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java index 703aca35..04317d6a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java @@ -23,7 +23,6 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; -import java.util.Objects; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; @@ -47,8 +46,8 @@ public class BestEffortFlowDistributor extends FlowDistributor { private final long roundRobinInterval; private long lastRoundRobinUpdate; - public BestEffortFlowDistributor(FlowEngine flowEngine, long roundRobinInterval) { - super(flowEngine); + public BestEffortFlowDistributor(FlowEngine flowEngine, long roundRobinInterval, int maxConsumers) { + super(flowEngine, maxConsumers); this.roundRobinInterval = roundRobinInterval; this.lastRoundRobinUpdate = -roundRobinInterval; } @@ -122,28 +121,33 @@ public class BestEffortFlowDistributor extends FlowDistributor { new ArrayList<>(this.currentIncomingSupplies.values()), this.totalIncomingSupply); - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); + for (int consumerIndex : this.usedConsumerIndices) { + this.pushOutgoingSupply( + this.consumerEdges[consumerIndex], supplies[consumerIndex], this.getConsumerResourceType()); } } else { // System is not overloaded - satisfy all demands and utilize remaining capacity if (this.overloaded) { - // Transitioning from overloaded to non-overloaded state - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { + for (int consumerIndex : this.usedConsumerIndices) { + // TODO: I think we can remove this check + if (this.outgoingSupplies[consumerIndex] == this.incomingDemands[consumerIndex]) { this.pushOutgoingSupply( - this.consumerEdges.get(idx), - this.incomingDemands.get(idx), + this.consumerEdges[consumerIndex], + this.incomingDemands[consumerIndex], this.getConsumerResourceType()); } } this.overloaded = false; - } else { - // Update supplies for consumers that changed their demand - for (int idx : this.updatedDemands) { + } + + // Update the supplies of the consumers that changed their demand in the current cycle + else { + for (int consumerIndex : this.updatedDemands) { this.pushOutgoingSupply( - this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); + this.consumerEdges[consumerIndex], + this.incomingDemands[consumerIndex], + this.getConsumerResourceType()); } } } @@ -160,8 +164,8 @@ public class BestEffortFlowDistributor extends FlowDistributor { * 3. Optimize utilization by giving extra capacity to active consumers */ @Override - public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { - int numConsumers = this.consumerEdges.size(); + public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = this.consumerEdges.length; double[] allocation = new double[numConsumers]; if (numConsumers == 0 || totalSupply <= 0) { @@ -174,7 +178,7 @@ public class BestEffortFlowDistributor extends FlowDistributor { // Start from the current round-robin index to ensure fairness over time for (int round = 0; round < numConsumers && remainingSupply > 0; round++) { int idx = (currentRoundRobinIndex + round) % numConsumers; - double demand = demands.get(idx); + double demand = demands[idx]; if (demand > allocation[idx]) { // Calculate how much we can allocate in this round @@ -192,7 +196,7 @@ public class BestEffortFlowDistributor extends FlowDistributor { // Create a list of consumers with unsatisfied demand, sorted by relative need ArrayList<Integer> unsatisfiedConsumers = new ArrayList<>(); for (int i = 0; i < numConsumers; i++) { - if (demands.get(i) > allocation[i]) { + if (demands[i] > allocation[i]) { unsatisfiedConsumers.add(i); } } @@ -202,7 +206,7 @@ public class BestEffortFlowDistributor extends FlowDistributor { // Find consumers with any demand (active consumers) ArrayList<Integer> activeConsumers = new ArrayList<>(); for (int i = 0; i < numConsumers; i++) { - if (demands.get(i) > 0) { + if (demands[i] > 0) { activeConsumers.add(i); } } @@ -217,11 +221,11 @@ public class BestEffortFlowDistributor extends FlowDistributor { // Distribute remaining supply proportionally to unsatisfied demand double totalUnsatisfiedDemand = 0; for (int idx : unsatisfiedConsumers) { - totalUnsatisfiedDemand += demands.get(idx) - allocation[idx]; + totalUnsatisfiedDemand += demands[idx] - allocation[idx]; } for (int idx : unsatisfiedConsumers) { - double unsatisfiedDemand = demands.get(idx) - allocation[idx]; + double unsatisfiedDemand = demands[idx] - allocation[idx]; double proportion = unsatisfiedDemand / totalUnsatisfiedDemand; allocation[idx] += remainingSupply * proportion; } @@ -269,7 +273,7 @@ public class BestEffortFlowDistributor extends FlowDistributor { this.updateOutgoingSupplies(); } - if (this.consumerEdges.isEmpty() || !this.hasSupplierEdges()) { + if (this.numConsumers == 0 || !this.hasSupplierEdges()) { nextUpdate = Long.MAX_VALUE; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java index 87ed7ca2..89b1b314 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java @@ -36,8 +36,8 @@ import org.opendc.simulator.engine.graph.FlowDistributor; */ public class EqualShareFlowDistributor extends FlowDistributor { - public EqualShareFlowDistributor(FlowEngine engine) { - super(engine); + public EqualShareFlowDistributor(FlowEngine engine, int maxConsumers) { + super(engine, maxConsumers); } /** @@ -65,18 +65,19 @@ public class EqualShareFlowDistributor extends FlowDistributor { */ @Override protected void updateOutgoingSupplies() { - double[] equalShare = distributeSupply(incomingDemands, outgoingSupplies, this.capacity); + double[] equalShare = distributeSupply( + incomingDemands, new ArrayList<>(this.currentIncomingSupplies.values()), this.capacity); - for (var consumerEdge : this.consumerEdges) { - this.pushOutgoingSupply(consumerEdge, equalShare[consumerEdge.getConsumerIndex()]); + for (int consumerIndex : this.usedConsumerIndices) { + this.pushOutgoingSupply( + this.consumerEdges[consumerIndex], equalShare[consumerIndex], this.getConsumerResourceType()); } } @Override - public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { - int numConsumers = demands.size(); - double[] allocation = new double[numConsumers]; - double equalShare = totalSupply / numConsumers; + public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) { + double[] allocation = new double[this.numConsumers]; + double equalShare = totalSupply / this.numConsumers; // Equal share regardless of individual demands Arrays.fill(allocation, equalShare); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java index 9ab24d9f..67458ad6 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FirstFitPolicyFlowDistributor.java @@ -25,7 +25,6 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; -import org.opendc.simulator.engine.graph.FlowEdge; /** * A {@link FlowDistributor} that implements the First Fit policy for distributing flow. @@ -37,8 +36,8 @@ import org.opendc.simulator.engine.graph.FlowEdge; */ public class FirstFitPolicyFlowDistributor extends FlowDistributor { - public FirstFitPolicyFlowDistributor(FlowEngine engine) { - super(engine); + public FirstFitPolicyFlowDistributor(FlowEngine engine, int maxConsumers) { + super(engine, maxConsumers); } /** @@ -90,8 +89,9 @@ public class FirstFitPolicyFlowDistributor extends FlowDistributor { double[] shares = distributeSupply(incomingDemands, currentPossibleSupplies, totalIncomingSupply); - for (FlowEdge consumerEdge : this.consumerEdges) { - this.pushOutgoingSupply(consumerEdge, shares[consumerEdge.getConsumerIndex()]); + for (int consumerIndex : this.usedConsumerIndices) { + this.pushOutgoingSupply( + this.consumerEdges[consumerIndex], shares[consumerIndex], this.getConsumerResourceType()); } } @@ -108,8 +108,8 @@ public class FirstFitPolicyFlowDistributor extends FlowDistributor { * @see #updateOutgoingSupplies() */ @Override - public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { - int numConsumers = demands.size(); + public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) { + int numConsumers = demands.length; double[] allocation = new double[numConsumers]; // Create a copy of current supply to track remaining capacity as we allocate @@ -117,7 +117,7 @@ public class FirstFitPolicyFlowDistributor extends FlowDistributor { // For each demand, try to satisfy it using suppliers in order for (int i = 0; i < numConsumers; i++) { - double remainingDemand = demands.get(i); + double remainingDemand = demands[i]; double totalAllocated = 0.0; if (remainingDemand > 0) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java index f22ea9fb..7a586b72 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShareFlowDistributor.java @@ -23,7 +23,6 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; -import java.util.HashSet; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; @@ -46,7 +45,7 @@ public class FixedShareFlowDistributor extends FlowDistributor { private double fixedShare; private final double shareRatio; - private int[] notSuppliedConsumers; + private final ArrayList<Integer> notSuppliedConsumers = new ArrayList<>(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -58,8 +57,8 @@ public class FixedShareFlowDistributor extends FlowDistributor { * @param engine The flow engine * @param shareRatio The fixed share ratio for each consumer (between 0.0 and 1.0) */ - public FixedShareFlowDistributor(FlowEngine engine, double shareRatio) { - super(engine); + public FixedShareFlowDistributor(FlowEngine engine, double shareRatio, int maxConsumers) { + super(engine, maxConsumers); if (shareRatio <= 0 || shareRatio > 1) { throw new IllegalArgumentException("Share ratio must be between 0.0 and 1.0"); @@ -68,9 +67,6 @@ public class FixedShareFlowDistributor extends FlowDistributor { // Each consumer gets an equal fixed share of the total capacity this.fixedShare = this.shareRatio * this.capacity / this.supplierEdges.size(); - - // Initialize tracking for round-robin prioritization - this.notSuppliedConsumers = new int[0]; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -103,62 +99,47 @@ public class FixedShareFlowDistributor extends FlowDistributor { protected void updateOutgoingSupplies() { // Calculate the fixed allocation per consumer - // Distribute to each consumer - int consumerIndex = 0; - if (this.consumerEdges.size() == this.supplierEdges.size()) { - for (FlowEdge consumerEdge : this.consumerEdges) { - this.pushOutgoingSupply(consumerEdge, this.fixedShare); - } - } else { - double[] supplies = distributeSupply(this.incomingDemands, this.outgoingSupplies, this.totalIncomingSupply); - for (FlowEdge consumerEdge : this.consumerEdges) { - this.pushOutgoingSupply(consumerEdge, this.fixedShare); - } + double[] supplies = distributeSupply( + this.incomingDemands, new ArrayList<>(this.currentIncomingSupplies.values()), this.totalIncomingSupply); + + for (int consumerIndex : this.usedConsumerIndices) { + this.pushOutgoingSupply( + this.consumerEdges[consumerIndex], supplies[consumerIndex], this.getConsumerResourceType()); } } - public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { - double[] supplies = new double[this.consumerEdges.size()]; + public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) { + double[] supplies = new double[this.maxConsumers]; + + if (this.numConsumers < this.supplierEdges.size() && this.fixedShare * this.numConsumers <= totalSupply) { - if (this.consumerEdges.size() < this.supplierEdges.size() - && this.fixedShare * this.consumerEdges.size() <= totalSupply) { - for (FlowEdge consumerEdge : this.consumerEdges) { - supplies[consumerEdge.getConsumerIndex()] = this.fixedShare; + for (int consumerIndex : this.usedConsumerIndices) { + supplies[consumerIndex] = this.fixedShare; } } else { - // Round-robin approach: prioritize consumers that didn't get resources last time - ArrayList<Integer> currentNotSuppliedList = new ArrayList<>(); - // Calculate how many consumers we can supply with available resources int maxConsumersToSupply = (int) Math.floor(totalSupply / this.fixedShare); int consumersSupplied = 0; // First pass: try to supply consumers that were not supplied in the previous round - for (int index : this.notSuppliedConsumers) { - if (index < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply) { - supplies[index] = this.fixedShare; - consumersSupplied++; + for (int consumerIndex : this.notSuppliedConsumers) { + if (consumersSupplied >= maxConsumersToSupply) { + break; } + supplies[consumerIndex] = this.fixedShare; + consumersSupplied++; } - // Second pass: supply remaining consumers if we still have capacity - for (int i = 0; i < this.consumerEdges.size() && consumersSupplied < maxConsumersToSupply; i++) { - if (supplies[i] == 0.0) { // This consumer hasn't been supplied yet - supplies[i] = this.fixedShare; + this.notSuppliedConsumers.clear(); + for (int consumerIndex : this.usedConsumerIndices) { + if (supplies[consumerIndex] == 0.0) { + if (consumersSupplied >= maxConsumersToSupply) { + this.notSuppliedConsumers.add(consumerIndex); + } + supplies[consumerIndex] = this.fixedShare; consumersSupplied++; } } - - // Build the list of consumers that didn't get resources this round - for (int i = 0; i < this.consumerEdges.size(); i++) { - if (supplies[i] == 0.0) { - currentNotSuppliedList.add(i); - } - } - - // Update the tracking array for next round - this.notSuppliedConsumers = - currentNotSuppliedList.stream().mapToInt(Integer::intValue).toArray(); } return supplies; @@ -167,46 +148,16 @@ public class FixedShareFlowDistributor extends FlowDistributor { @Override // index of not supplied consumers also need to be updated public void removeConsumerEdge(FlowEdge consumerEdge) { - int idx = consumerEdge.getConsumerIndex(); + int consumerIndex = consumerEdge.getConsumerIndex(); - if (idx == -1) { + if (consumerIndex == -1) { return; } - this.totalIncomingDemand -= consumerEdge.getDemand(); - - // Remove idx from consumers that updated their demands - this.updatedDemands.remove(idx); - - this.consumerEdges.remove(idx); - this.incomingDemands.remove(idx); - this.outgoingSupplies.remove(idx); - - // update the consumer index for all consumerEdges higher than this. - for (int i = idx; i < this.consumerEdges.size(); i++) { - FlowEdge other = this.consumerEdges.get(i); - - other.setConsumerIndex(other.getConsumerIndex() - 1); + if (this.notSuppliedConsumers.contains(consumerIndex)) { + this.notSuppliedConsumers.remove(Integer.valueOf(consumerIndex)); } - HashSet<Integer> newUpdatedDemands = new HashSet<>(); - for (int idx_other : this.updatedDemands) { - if (idx_other > idx) { - newUpdatedDemands.add(idx_other - 1); - } else { - newUpdatedDemands.add(idx_other); - } - } - this.updatedDemands = newUpdatedDemands; - - // Decrease the index of not supplied consumers - for (int i = 0; i < this.notSuppliedConsumers.length; i++) { - if (this.notSuppliedConsumers[i] > idx) { - this.notSuppliedConsumers[i]--; - } - } - - this.outgoingDemandUpdateNeeded = true; - this.invalidate(); + super.removeConsumerEdge(consumerEdge); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java index eb5d4ff7..96e42d4c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FlowDistributorFactory.java @@ -56,22 +56,23 @@ public class FlowDistributorFactory { } } - public static FlowDistributor getFlowDistributor(FlowEngine flowEngine, DistributionPolicy distributionPolicyType) { + public static FlowDistributor getFlowDistributor( + FlowEngine flowEngine, DistributionPolicy distributionPolicyType, int maxConsumers) { return switch (distributionPolicyType) { case BEST_EFFORT -> new BestEffortFlowDistributor( - flowEngine, distributionPolicyType.getProperty("updateIntervalLength", Long.class)); - case EQUAL_SHARE -> new EqualShareFlowDistributor(flowEngine); - case FIRST_FIT -> new FirstFitPolicyFlowDistributor(flowEngine); + flowEngine, distributionPolicyType.getProperty("updateIntervalLength", Long.class), maxConsumers); + case EQUAL_SHARE -> new EqualShareFlowDistributor(flowEngine, maxConsumers); + case FIRST_FIT -> new FirstFitPolicyFlowDistributor(flowEngine, maxConsumers); case FIXED_SHARE -> { if (!distributionPolicyType.getPropertyNames().contains("shareRatio")) { throw new IllegalArgumentException( "FixedShare distribution policy requires a 'shareRatio' property to be set."); } yield new FixedShareFlowDistributor( - flowEngine, distributionPolicyType.getProperty("shareRatio", Double.class)); + flowEngine, distributionPolicyType.getProperty("shareRatio", Double.class), maxConsumers); } - default -> new MaxMinFairnessFlowDistributor(flowEngine); + default -> new MaxMinFairnessFlowDistributor(flowEngine, maxConsumers); }; } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java index 371015a4..875412c6 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java @@ -24,7 +24,6 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; import java.util.Arrays; -import java.util.Objects; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; @@ -39,8 +38,8 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { private boolean overloaded = false; - public MaxMinFairnessFlowDistributor(FlowEngine engine) { - super(engine); + public MaxMinFairnessFlowDistributor(FlowEngine engine, int maxConsumers) { + super(engine, maxConsumers); } protected void updateOutgoingDemand() { @@ -66,8 +65,9 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { new ArrayList<>(this.currentIncomingSupplies.values()), this.totalIncomingSupply); - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); + for (int consumerIndex : this.usedConsumerIndices) { + this.pushOutgoingSupply( + this.consumerEdges[consumerIndex], supplies[consumerIndex], this.getConsumerResourceType()); } } else { @@ -75,11 +75,12 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { // If the distributor was overloaded before, but is not anymore: // provide all consumers with their demand if (this.overloaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { + for (int consumerIndex : this.usedConsumerIndices) { + // TODO: I think we can remove this check + if (this.outgoingSupplies[consumerIndex] != this.incomingDemands[consumerIndex]) { this.pushOutgoingSupply( - this.consumerEdges.get(idx), - this.incomingDemands.get(idx), + this.consumerEdges[consumerIndex], + this.incomingDemands[consumerIndex], this.getConsumerResourceType()); } } @@ -88,9 +89,11 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { // Update the supplies of the consumers that changed their demand in the current cycle else { - for (int idx : this.updatedDemands) { + for (int consumerIndex : this.updatedDemands) { this.pushOutgoingSupply( - this.consumerEdges.get(idx), this.incomingDemands.get(idx), this.getConsumerResourceType()); + this.consumerEdges[consumerIndex], + this.incomingDemands[consumerIndex], + this.getConsumerResourceType()); } } } @@ -100,14 +103,14 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { private record Demand(int idx, double value) {} - public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { - int inputSize = demands.size(); + public double[] distributeSupply(double[] demands, ArrayList<Double> currentSupply, double totalSupply) { + int inputSize = demands.length; final double[] supplies = new double[inputSize]; final Demand[] tempDemands = new Demand[inputSize]; for (int i = 0; i < inputSize; i++) { - tempDemands[i] = new Demand(i, demands.get(i)); + tempDemands[i] = new Demand(i, demands[i]); } Arrays.sort(tempDemands, (o1, o2) -> { |
