diff options
| author | Niels Thiele <noleu66@posteo.net> | 2025-07-15 11:29:47 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-15 11:29:47 +0200 |
| commit | b2dc97dc84f56174ede9f273999ade2ed059d431 (patch) | |
| tree | 1b5d6d775890375f46b533c7aa78e492a88afc3f /opendc-simulator/opendc-simulator-flow/src/main/java/org | |
| parent | 0203254b709614fa732c114aa25916f61b8b3275 (diff) | |
multi gpu support (#351)
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org')
5 files changed, 79 insertions, 30 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 674db8ca..f7fc2728 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -23,6 +23,7 @@ package org.opendc.simulator.engine.graph; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -32,20 +33,30 @@ import org.opendc.common.ResourceType; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.distributionPolicies.DistributionPolicy; import org.opendc.simulator.engine.graph.distributionPolicies.MaxMinFairnessPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowDistributor.class); private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); - private FlowEdge supplierEdge; + private HashMap<Integer, FlowEdge> supplierEdges = + new HashMap<>(); // The suppliers that provide supply to this distributor private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers private double totalIncomingDemand; // The total demand of all the consumers - private double currentIncomingSupply; // The current supply provided by the supplier + // AS index is based on the supplierIndex of the FlowEdge, ids of entries need to be stable + private HashMap<Integer, Double> currentIncomingSupplies = + new HashMap<>(); // The current supply provided by the suppliers + private Double totalIncomingSupply = 0.0; // The total supply provided by the suppliers private boolean outgoingDemandUpdateNeeded = false; private Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle + private ResourceType supplierResourceType; + private ResourceType consumerResourceType; + private boolean overloaded = false; private double capacity; // What is the max capacity. Can probably be removed @@ -66,7 +77,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu } public double getCurrentIncomingSupply() { - return currentIncomingSupply; + return this.totalIncomingSupply; } public double getCapacity() { @@ -90,7 +101,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu } private void updateOutgoingDemand() { - this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand); + // equally distribute the demand to all suppliers + for (FlowEdge supplierEdge : this.supplierEdges.values()) { + this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand / this.supplierEdges.size()); + // alternatively a relative share could be used, based on capacity minus current incoming supply + // this.pushOutgoingDemand(supplierEdge, this.totalIncomingDemand * (supplierEdge.getCapacity() - + // currentIncomingSupplies.get(idx) / supplierEdges.size())); + } this.outgoingDemandUpdateNeeded = false; @@ -102,11 +119,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu // If the demand is higher than the current supply, the system is overloaded. // The available supply is distributed based on the current distribution function. - if (this.totalIncomingDemand > this.currentIncomingSupply) { + if (this.totalIncomingDemand > this.totalIncomingSupply) { this.overloaded = true; - double[] supplies = - this.distributionPolicy.distributeSupply(this.incomingDemands, this.currentIncomingSupply); + double[] supplies = this.distributionPolicy.distributeSupply( + this.incomingDemands, + new ArrayList<>(this.currentIncomingSupplies.values()), + this.totalIncomingSupply); for (int idx = 0; idx < this.consumerEdges.size(); idx++) { this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx], this.getConsumerResourceType()); @@ -151,13 +170,18 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu this.consumerEdges.add(consumerEdge); this.incomingDemands.add(0.0); this.outgoingSupplies.add(0.0); + this.consumerResourceType = consumerEdge.getConsumerResourceType(); } @Override public void addSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = supplierEdge; - this.capacity = supplierEdge.getCapacity(); - this.currentIncomingSupply = 0; + // supplierIndex not always set, so we use 0 as default to avoid index out of bounds + int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); + + this.supplierEdges.put(idx, supplierEdge); + this.capacity += supplierEdge.getCapacity(); + this.currentIncomingSupplies.put(idx, 0.0); + this.supplierResourceType = supplierEdge.getSupplierResourceType(); } @Override @@ -202,13 +226,16 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu @Override public void removeSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = null; - this.capacity = 0; - this.currentIncomingSupply = 0; - - this.updatedDemands.clear(); - - this.closeNode(); + // supplierIndex not always set, so we use 0 as default to avoid index out of bounds + int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); + // to keep index consistent, entries are neutralized instead of removed + this.supplierEdges.put(idx, null); + this.capacity -= supplierEdge.getCapacity(); + this.currentIncomingSupplies.put(idx, 0.0); + + if (this.supplierEdges.isEmpty()) { + this.updatedDemands.clear(); + } } @Override @@ -216,7 +243,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { - System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); + LOGGER.warn("Demand {} pushed by an unknown consumer", newDemand); return; } @@ -224,6 +251,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu double prevDemand = incomingDemands.get(idx); incomingDemands.set(idx, newDemand); + // only update the total supply if the new supply is different from the previous one this.totalIncomingDemand += (newDemand - prevDemand); this.updatedDemands.add(idx); @@ -243,14 +271,20 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { - this.currentIncomingSupply = newSupply; + // supplierIndex not always set, so we use 0 as default to avoid index out of bounds + int idx = supplierEdge.getSupplierIndex() == -1 ? 0 : supplierEdge.getSupplierIndex(); + double prevSupply = currentIncomingSupplies.get(idx); + + currentIncomingSupplies.put(idx, newSupply); + // only update the total supply if the new supply is different from the previous one + this.totalIncomingSupply += (newSupply - prevSupply); this.invalidate(); } @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - this.supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType()); + supplierEdge.pushDemand(newDemand, false, this.getSupplierResourceType()); } @Override @@ -267,23 +301,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu outgoingSupplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType()); - consumerEdge.pushSupply(newSupply, false, this.getSupplierResourceType()); } @Override public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { - List<FlowEdge> supplyingEdges = (this.supplierEdge != null) ? List.of(this.supplierEdge) : List.of(); - - return Map.of(FlowEdge.NodeType.CONSUMING, supplyingEdges, FlowEdge.NodeType.SUPPLYING, this.consumerEdges); + return Map.of( + FlowEdge.NodeType.CONSUMING, + this.consumerEdges, + FlowEdge.NodeType.SUPPLYING, + new ArrayList<>(this.supplierEdges.values())); } @Override public ResourceType getSupplierResourceType() { - return this.supplierEdge.getSupplierResourceType(); + // return this.supplierEdge.getSupplierResourceType(); + return this.supplierResourceType; } @Override public ResourceType getConsumerResourceType() { - return this.consumerEdges.getFirst().getConsumerResourceType(); + return this.consumerResourceType; } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index aa3894c1..db2a2944 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -54,6 +54,15 @@ public class FlowEdge { } public FlowEdge(FlowConsumer consumer, FlowSupplier supplier, ResourceType resourceType) { + this(consumer, supplier, resourceType, -1, -1); + } + + public FlowEdge( + FlowConsumer consumer, + FlowSupplier supplier, + ResourceType resourceType, + int consumerIndex, + int supplierIndex) { if (!(consumer instanceof FlowNode)) { throw new IllegalArgumentException("Flow consumer is not a FlowNode"); } @@ -67,6 +76,10 @@ public class FlowEdge { this.capacity = supplier.getCapacity(resourceType); + // to avoid race condition of setting indices and requiring them in the PSU + this.supplierIndex = supplierIndex; + this.consumerIndex = consumerIndex; + this.consumer.addSupplierEdge(this); this.supplier.addConsumerEdge(this); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java index 9d2246cd..3a8bebbc 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/DistributionPolicy.java @@ -25,5 +25,5 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; public interface DistributionPolicy { - double[] distributeSupply(ArrayList<Double> supply, double currentSupply); + double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java index 40d70b5e..baa04975 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/FixedShare.java @@ -42,7 +42,7 @@ public class FixedShare implements DistributionPolicy { } @Override - public double[] distributeSupply(ArrayList<Double> supply, double currentSupply) { + public double[] distributeSupply(ArrayList<Double> supply, ArrayList<Double> currentSupply, double totalSupply) { return new double[0]; } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java index 1d387349..484e7fe4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessPolicy.java @@ -34,7 +34,7 @@ public class MaxMinFairnessPolicy implements DistributionPolicy { private record Demand(int idx, double value) {} @Override - public double[] distributeSupply(ArrayList<Double> demands, double currentSupply) { + public double[] distributeSupply(ArrayList<Double> demands, ArrayList<Double> currentSupply, double totalSupply) { int inputSize = demands.size(); final double[] supplies = new double[inputSize]; @@ -50,7 +50,7 @@ public class MaxMinFairnessPolicy implements DistributionPolicy { return i1.compareTo(i2); }); - double availableCapacity = currentSupply; // totalSupply + double availableCapacity = totalSupply; for (int i = 0; i < inputSize; i++) { double d = tempDemands[i].value; |
