summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-10-14 16:38:27 +0200
committerGitHub <noreply@github.com>2025-10-14 16:38:27 +0200
commit4181a4bd51b54a5905be1f46f74c1349776e35c2 (patch)
treea7bd532c2c8fa9b2650656dabe4cb1b78c28e5aa /opendc-simulator/opendc-simulator-flow
parentcd696da4c50a150f1d01fec27eef5a043b57b95a (diff)
Improved the performance by removing many invalidates from FlowNodes (#377)
* Updated the UpDatedConsumer to boolean array * Updated SimTraceWorkload to not invalidate when the next fragment is started. * Removed as much invalidates as possible
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java14
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java35
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java17
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java5
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java20
7 files changed, 83 insertions, 28 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java
index 72dd217c..b0a37d0e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java
@@ -24,6 +24,7 @@ package org.opendc.simulator.engine.engine;
import java.util.ArrayDeque;
import java.util.Arrays;
+import java.util.HashMap;
import org.opendc.simulator.engine.graph.FlowNode;
/**
@@ -32,7 +33,8 @@ import org.opendc.simulator.engine.graph.FlowNode;
* <p>
* By using a specialized class, we reduce the overhead caused by type-erasure.
*/
-final class FlowCycleQueue {
+public final class FlowCycleQueue {
+
/**
* The array of elements in the queue.
*/
@@ -45,10 +47,16 @@ final class FlowCycleQueue {
nodeQueue = new FlowNode[initialCapacity];
}
+ public final HashMap<String, Integer> nodeTypeCounter = new HashMap<>();
/**
* Add the specified context to the queue.
*/
void add(FlowNode ctx) {
+
+ String nodeType = ctx.getClass().getSimpleName();
+ nodeTypeCounter.putIfAbsent(nodeType, 0);
+ nodeTypeCounter.put(nodeType, nodeTypeCounter.get(nodeType) + 1);
+
if (ctx.getInCycleQueue()) {
return;
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index e5dbb7a8..9bbfa777 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -24,6 +24,7 @@ package org.opendc.simulator.engine.engine;
import java.time.Clock;
import java.time.InstantSource;
+import java.util.LinkedList;
import kotlin.coroutines.CoroutineContext;
import org.opendc.common.Dispatcher;
import org.opendc.simulator.engine.graph.FlowNode;
@@ -38,7 +39,9 @@ public final class FlowEngine implements Runnable {
/**
* The queue of {@link FlowNode} updates that need to be updated in the current cycle.
*/
- private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256);
+ // private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256);
+
+ private final LinkedList<FlowNode> cycleQueue = new LinkedList<>();
/**
* A priority queue containing the {@link FlowNode} updates to be scheduled in the future.
@@ -139,16 +142,14 @@ public final class FlowEngine implements Runnable {
* Run all the enqueued actions for the specified timestamp (<code>now</code>).
*/
private void doRunEngine(long now) {
- final FlowCycleQueue cycleQueue = this.cycleQueue;
- final FlowEventQueue eventQueue = this.eventQueue;
-
try {
// Mark the engine as active to prevent concurrent calls to this method
active = true;
+ // int EventCount = 0;
// Execute all scheduled updates at current timestamp
while (true) {
- final FlowNode ctx = eventQueue.poll(now);
+ final FlowNode ctx = this.eventQueue.poll(now);
if (ctx == null) {
break;
}
@@ -158,13 +159,14 @@ public final class FlowEngine implements Runnable {
// Execute all immediate updates
while (true) {
- final FlowNode ctx = cycleQueue.poll();
+ final FlowNode ctx = this.cycleQueue.poll();
if (ctx == null) {
break;
}
ctx.update(now);
}
+
} finally {
active = false;
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index cb2a3ba6..39712f20 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -55,7 +55,10 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
protected final FlowEdge[] consumerEdges;
protected final double[] incomingDemands; // What is demanded by the consumers
protected final double[] outgoingSupplies; // What is supplied to the consumers
- protected ArrayList<Integer> updatedDemands = new ArrayList<>();
+
+ protected final boolean[] updatedDemands;
+ protected int numUpdatedDemands = 0;
+ // protected ArrayList<Integer> updatedDemands = new ArrayList<>();
protected double previousTotalDemand = 0.0;
protected double totalIncomingDemand; // The total demand of all the consumers
@@ -73,6 +76,10 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
protected double capacity; // What is the max capacity. Can probably be removed
+ protected static HashMap<Integer, Integer> updateMap = new HashMap<Integer, Integer>();
+
+ protected boolean overloaded = false;
+
public FlowDistributor(FlowEngine engine, int maxConsumers) {
super(engine);
@@ -91,6 +98,8 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
this.incomingDemands = new double[this.maxConsumers];
this.outgoingSupplies = new double[this.maxConsumers];
+
+ this.updatedDemands = new boolean[this.maxConsumers];
}
public double getTotalIncomingDemand() {
@@ -109,13 +118,13 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
// Check if current supply is different from total demand
if (this.outgoingDemandUpdateNeeded) {
+
this.updateOutgoingDemand();
return Long.MAX_VALUE;
}
- // TODO: look into whether this is always needed
- if (this.numConsumers > 0) {
+ if (this.numUpdatedDemands > 0 || this.overloaded) {
this.updateOutgoingSupplies();
}
@@ -141,7 +150,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
this.numConsumers++;
this.consumerEdges[consumerIndex] = consumerEdge;
- this.outgoingDemandUpdateNeeded = true;
}
@Override
@@ -169,9 +177,11 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
}
// Remove idx from consumers that updated their demands
- if (this.updatedDemands.contains(consumerIndex)) {
- this.updatedDemands.remove(Integer.valueOf(consumerIndex));
- }
+ // if (this.updatedDemands.contains(consumerIndex)) {
+ // this.updatedDemands.remove(Integer.valueOf(consumerIndex));
+ // }
+
+ this.updatedDemands[consumerIndex] = false;
this.consumerEdges[consumerIndex] = null;
this.incomingDemands[consumerIndex] = 0.0;
@@ -196,7 +206,9 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
this.currentIncomingSupplies.put(idx, 0.0);
if (this.supplierEdges.isEmpty()) {
- this.updatedDemands.clear();
+ // this.updatedDemands.clear();
+ Arrays.fill(this.updatedDemands, false);
+ this.numUpdatedDemands = 0;
}
}
@@ -220,9 +232,12 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
}
// TODO: can be optimized by using a boolean array
- this.updatedDemands.add(consumerIndex);
+ // this.updatedDemands.add(consumerIndex);
+ this.updatedDemands[consumerIndex] = true;
+ this.numUpdatedDemands++;
this.outgoingDemandUpdateNeeded = true;
+
this.invalidate();
}
@@ -246,6 +261,7 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
this.totalIncomingSupply += (newSupply - prevSupply);
this.outgoingSupplyUpdateNeeded = true;
+
this.invalidate();
}
@@ -281,7 +297,6 @@ public abstract class FlowDistributor extends FlowNode implements FlowSupplier,
@Override
public ResourceType getSupplierResourceType() {
- // return this.supplierEdge.getSupplierResourceType();
return this.supplierResourceType;
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
index cbfe39a3..ff6ad6f0 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
@@ -137,10 +137,14 @@ public abstract class FlowNode {
// If there is already an update running,
// notify the update, that a next update should be run after
- if (this.nodeState != NodeState.CLOSING && this.nodeState != NodeState.CLOSED) {
- this.nodeState = NodeState.INVALIDATED;
- engine.scheduleImmediate(now, this);
+ if (this.nodeState == NodeState.CLOSING
+ || this.nodeState == NodeState.CLOSED
+ || this.nodeState == NodeState.INVALIDATED) {
+ return;
}
+
+ this.nodeState = NodeState.INVALIDATED;
+ engine.scheduleImmediate(now, this);
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
index 04317d6a..5446f261 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/BestEffortFlowDistributor.java
@@ -23,6 +23,7 @@
package org.opendc.simulator.engine.graph.distributionPolicies;
import java.util.ArrayList;
+import java.util.Arrays;
import org.opendc.simulator.engine.engine.FlowEngine;
import org.opendc.simulator.engine.graph.FlowDistributor;
import org.opendc.simulator.engine.graph.FlowEdge;
@@ -143,17 +144,29 @@ public class BestEffortFlowDistributor extends FlowDistributor {
// Update the supplies of the consumers that changed their demand in the current cycle
else {
- for (int consumerIndex : this.updatedDemands) {
+ for (int consumerIndex = 0; consumerIndex < this.numConsumers; consumerIndex++) {
+ if (!this.updatedDemands[consumerIndex]) {
+ continue;
+ }
this.pushOutgoingSupply(
this.consumerEdges[consumerIndex],
this.incomingDemands[consumerIndex],
this.getConsumerResourceType());
}
+
+ // for (int consumerIndex : this.updatedDemands) {
+ // this.pushOutgoingSupply(
+ // this.consumerEdges[consumerIndex],
+ // this.incomingDemands[consumerIndex],
+ // this.getConsumerResourceType());
+ // }
}
}
this.outgoingSupplyUpdateNeeded = false;
- this.updatedDemands.clear();
+ // this.updatedDemands.clear();
+ Arrays.fill(this.updatedDemands, false);
+ this.numUpdatedDemands = 0;
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
index 89b1b314..6938f7d7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/EqualShareFlowDistributor.java
@@ -54,8 +54,6 @@ public class EqualShareFlowDistributor extends FlowDistributor {
}
this.outgoingDemandUpdateNeeded = false;
- this.updatedDemands.clear();
- this.invalidate();
}
/**
@@ -72,6 +70,9 @@ public class EqualShareFlowDistributor extends FlowDistributor {
this.pushOutgoingSupply(
this.consumerEdges[consumerIndex], equalShare[consumerIndex], this.getConsumerResourceType());
}
+
+ Arrays.fill(this.updatedDemands, false);
+ this.numUpdatedDemands = 0;
}
@Override
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
index e5e4eb59..b8337b7a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/distributionPolicies/MaxMinFairnessFlowDistributor.java
@@ -36,8 +36,6 @@ import org.opendc.simulator.engine.graph.FlowEdge;
*/
public class MaxMinFairnessFlowDistributor extends FlowDistributor {
- private boolean overloaded = false;
-
public MaxMinFairnessFlowDistributor(FlowEngine engine, int maxConsumers) {
super(engine, maxConsumers);
}
@@ -63,6 +61,7 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor {
// If the demand is higher than the current supply, the system is overloaded.
// The available supply is distributed based on the current distribution function.
+ // FIXME: There can a problem that the incoming supply is ony 11 decimal numbers and thus is smaller.
if (this.totalIncomingDemand > this.totalIncomingSupply) {
this.overloaded = true;
@@ -95,16 +94,29 @@ public class MaxMinFairnessFlowDistributor extends FlowDistributor {
// Update the supplies of the consumers that changed their demand in the current cycle
else {
- for (int consumerIndex : this.updatedDemands) {
+ for (int consumerIndex : this.usedConsumerIndices) {
+ if (!this.updatedDemands[consumerIndex]) {
+ continue;
+ }
this.pushOutgoingSupply(
this.consumerEdges[consumerIndex],
this.incomingDemands[consumerIndex],
this.getConsumerResourceType());
}
+
+ //
+ // for (int consumerIndex : this.updatedDemands) {
+ // this.pushOutgoingSupply(
+ // this.consumerEdges[consumerIndex],
+ // this.incomingDemands[consumerIndex],
+ // this.getConsumerResourceType());
+ // }
}
}
- this.updatedDemands.clear();
+ // this.updatedDemands.clear();
+ Arrays.fill(this.updatedDemands, false);
+ this.numUpdatedDemands = 0;
}
private record Demand(int idx, double value) {}