diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-10-14 16:38:27 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-14 16:38:27 +0200 |
| commit | 4181a4bd51b54a5905be1f46f74c1349776e35c2 (patch) | |
| tree | a7bd532c2c8fa9b2650656dabe4cb1b78c28e5aa /opendc-simulator/opendc-simulator-flow/src/main/java | |
| parent | cd696da4c50a150f1d01fec27eef5a043b57b95a (diff) | |
Improved the performance by removing many invalidates from FlowNodes (#377)
* Updated the UpDatedConsumer to boolean array
* Updated SimTraceWorkload to not invalidate when the next fragment is started.
* Removed as much invalidates as possible
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java')
7 files changed, 83 insertions, 28 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java index 72dd217c..b0a37d0e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java @@ -24,6 +24,7 @@ package org.opendc.simulator.engine.engine; import java.util.ArrayDeque; import java.util.Arrays; +import java.util.HashMap; import org.opendc.simulator.engine.graph.FlowNode; /** @@ -32,7 +33,8 @@ import org.opendc.simulator.engine.graph.FlowNode; * <p> * By using a specialized class, we reduce the overhead caused by type-erasure. */ -final class FlowCycleQueue { +public final class FlowCycleQueue { + /** * The array of elements in the queue. */ @@ -45,10 +47,16 @@ final class FlowCycleQueue { nodeQueue = new FlowNode[initialCapacity]; } + public final HashMap<String, Integer> nodeTypeCounter = new HashMap<>(); /** * Add the specified context to the queue. */ void add(FlowNode ctx) { + + String nodeType = ctx.getClass().getSimpleName(); + nodeTypeCounter.putIfAbsent(nodeType, 0); + nodeTypeCounter.put(nodeType, nodeTypeCounter.get(nodeType) + 1); + if (ctx.getInCycleQueue()) { return; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index e5dbb7a8..9bbfa777 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -24,6 +24,7 @@ package org.opendc.simulator.engine.engine; import java.time.Clock; import java.time.InstantSource; +import java.util.LinkedList; import kotlin.coroutines.CoroutineContext; import org.opendc.common.Dispatcher; import org.opendc.simulator.engine.graph.FlowNode; @@ -38,7 +39,9 @@ public final class FlowEngine implements Runnable { /** * The queue of {@link FlowNode} updates that need to be updated in the current cycle. */ - private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256); + // private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256); + + private final LinkedList<FlowNode> cycleQueue = new LinkedList<>(); /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. @@ -139,16 +142,14 @@ public final class FlowEngine implements Runnable { * Run all the enqueued actions for the specified timestamp (<code>now</code>). */ private void doRunEngine(long now) { - final FlowCycleQueue cycleQueue = this.cycleQueue; - final FlowEventQueue eventQueue = this.eventQueue; - try { // Mark the engine as active to prevent concurrent calls to this method active = true; + // int EventCount = 0; // Execute all scheduled updates at current timestamp while (true) { - final FlowNode ctx = eventQueue.poll(now); + final FlowNode ctx = this.eventQueue.poll(now); if (ctx == null) { break; } @@ -158,13 +159,14 @@ public final class FlowEngine implements Runnable { // Execute all immediate updates while (true) { - final FlowNode ctx = cycleQueue.poll(); + final FlowNode ctx = this.cycleQueue.poll(); if (ctx == null) { break; } ctx.update(now); } + } finally { active = false; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index cb2a3ba6..39712f20 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -55,7 +55,10 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, protected final FlowEdge[] consumerEdges; protected final double[] incomingDemands; // What is demanded by the consumers protected final double[] outgoingSupplies; // What is supplied to the consumers - protected ArrayList<Integer> updatedDemands = new ArrayList<>(); + + protected final boolean[] updatedDemands; + protected int numUpdatedDemands = 0; + // protected ArrayList<Integer> updatedDemands = new ArrayList<>(); protected double previousTotalDemand = 0.0; protected double totalIncomingDemand; // The total demand of all the consumers @@ -73,6 +76,10 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, protected double capacity; // What is the max capacity. Can probably be removed + protected static HashMap<Integer, Integer> updateMap = new HashMap<Integer, Integer>(); + + protected boolean overloaded = false; + public FlowDistributor(FlowEngine engine, int maxConsumers) { super(engine); @@ -91,6 +98,8 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.incomingDemands = new double[this.maxConsumers]; this.outgoingSupplies = new double[this.maxConsumers]; + + this.updatedDemands = new boolean[this.maxConsumers]; } public double getTotalIncomingDemand() { @@ -109,13 +118,13 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, // Check if current supply is different from total demand if (this.outgoingDemandUpdateNeeded) { + this.updateOutgoingDemand(); return Long.MAX_VALUE; } - // TODO: look into whether this is always needed - if (this.numConsumers > 0) { + if (this.numUpdatedDemands > 0 || this.overloaded) { this.updateOutgoingSupplies(); } @@ -141,7 +150,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.numConsumers++; this.consumerEdges[consumerIndex] = consumerEdge; - this.outgoingDemandUpdateNeeded = true; } @Override @@ -169,9 +177,11 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, } // Remove idx from consumers that updated their demands - if (this.updatedDemands.contains(consumerIndex)) { - this.updatedDemands.remove(Integer.valueOf(consumerIndex)); - } + // if (this.updatedDemands.contains(consumerIndex)) { + // this.updatedDemands.remove(Integer.valueOf(consumerIndex)); + // } + + this.updatedDemands[consumerIndex] = false; this.consumerEdges[consumerIndex] = null; this.incomingDemands[consumerIndex] = 0.0; @@ -196,7 +206,9 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.currentIncomingSupplies.put(idx, 0.0); if (this.supplierEdges.isEmpty()) { - this.updatedDemands.clear(); + // this.updatedDemands.clear(); + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } } @@ -220,9 +232,12 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, } // TODO: can be optimized by using a boolean array - this.updatedDemands.add(consumerIndex); + // this.updatedDemands.add(consumerIndex); + this.updatedDemands[consumerIndex] = true; + this.numUpdatedDemands++; this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @@ -246,6 +261,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, this.totalIncomingSupply += (newSupply - prevSupply); this.outgoingSupplyUpdateNeeded = true; + this.invalidate(); } @@ -281,7 +297,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier, @Override public ResourceType getSupplierResourceType() { - // return this.supplierEdge.getSupplierResourceType(); return this.supplierResourceType; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index cbfe39a3..ff6ad6f0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -137,10 +137,14 @@ public abstract class FlowNode { // If there is already an update running, // notify the update, that a next update should be run after - if (this.nodeState != NodeState.CLOSING && this.nodeState != NodeState.CLOSED) { - this.nodeState = NodeState.INVALIDATED; - engine.scheduleImmediate(now, this); + if (this.nodeState == NodeState.CLOSING + || this.nodeState == NodeState.CLOSED + || this.nodeState == NodeState.INVALIDATED) { + return; } + + this.nodeState = NodeState.INVALIDATED; + engine.scheduleImmediate(now, this); } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java index 04317d6a..5446f261 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java @@ -23,6 +23,7 @@ package org.opendc.simulator.engine.graph.distributionPolicies; import java.util.ArrayList; +import java.util.Arrays; import org.opendc.simulator.engine.engine.FlowEngine; import org.opendc.simulator.engine.graph.FlowDistributor; import org.opendc.simulator.engine.graph.FlowEdge; @@ -143,17 +144,29 @@ public class BestEffortFlowDistributor extends FlowDistributor { // Update the supplies of the consumers that changed their demand in the current cycle else { - for (int consumerIndex : this.updatedDemands) { + for (int consumerIndex = 0; consumerIndex < this.numConsumers; consumerIndex++) { + if (!this.updatedDemands[consumerIndex]) { + continue; + } this.pushOutgoingSupply( this.consumerEdges[consumerIndex], this.incomingDemands[consumerIndex], this.getConsumerResourceType()); } + + // for (int consumerIndex : this.updatedDemands) { + // this.pushOutgoingSupply( + // this.consumerEdges[consumerIndex], + // this.incomingDemands[consumerIndex], + // this.getConsumerResourceType()); + // } } } this.outgoingSupplyUpdateNeeded = false; - this.updatedDemands.clear(); + // this.updatedDemands.clear(); + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java index 89b1b314..6938f7d7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java @@ -54,8 +54,6 @@ public class EqualShareFlowDistributor extends FlowDistributor { } this.outgoingDemandUpdateNeeded = false; - this.updatedDemands.clear(); - this.invalidate(); } /** @@ -72,6 +70,9 @@ public class EqualShareFlowDistributor extends FlowDistributor { this.pushOutgoingSupply( this.consumerEdges[consumerIndex], equalShare[consumerIndex], this.getConsumerResourceType()); } + + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } @Override diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java index e5e4eb59..b8337b7a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java @@ -36,8 +36,6 @@ import org.opendc.simulator.engine.graph.FlowEdge; */ public class MaxMinFairnessFlowDistributor extends FlowDistributor { - private boolean overloaded = false; - public MaxMinFairnessFlowDistributor(FlowEngine engine, int maxConsumers) { super(engine, maxConsumers); } @@ -63,6 +61,7 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { // If the demand is higher than the current supply, the system is overloaded. // The available supply is distributed based on the current distribution function. + // FIXME: There can a problem that the incoming supply is ony 11 decimal numbers and thus is smaller. if (this.totalIncomingDemand > this.totalIncomingSupply) { this.overloaded = true; @@ -95,16 +94,29 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor { // Update the supplies of the consumers that changed their demand in the current cycle else { - for (int consumerIndex : this.updatedDemands) { + for (int consumerIndex : this.usedConsumerIndices) { + if (!this.updatedDemands[consumerIndex]) { + continue; + } this.pushOutgoingSupply( this.consumerEdges[consumerIndex], this.incomingDemands[consumerIndex], this.getConsumerResourceType()); } + + // + // for (int consumerIndex : this.updatedDemands) { + // this.pushOutgoingSupply( + // this.consumerEdges[consumerIndex], + // this.incomingDemands[consumerIndex], + // this.getConsumerResourceType()); + // } } } - this.updatedDemands.clear(); + // this.updatedDemands.clear(); + Arrays.fill(this.updatedDemands, false); + this.numUpdatedDemands = 0; } private record Demand(int idx, double value) {} |
