summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java)8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java160
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java4
7 files changed, 127 insertions, 87 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index 24476048..67540f4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -44,7 +44,7 @@ public final class FlowEngine implements Runnable {
/**
* A priority queue containing the {@link FlowNode} updates to be scheduled in the future.
*/
- private final FlowTimerQueue timerQueue = new FlowTimerQueue(256);
+ private final FlowEventQueue eventQueue = new FlowEventQueue(256);
/**
* The stack of engine invocations to occur in the future.
@@ -127,7 +127,7 @@ public final class FlowEngine implements Runnable {
return;
}
- long deadline = timerQueue.peekDeadline();
+ long deadline = eventQueue.peekDeadline();
if (deadline != Long.MAX_VALUE) {
trySchedule(futureInvocations, clock.millis(), deadline);
}
@@ -139,7 +139,7 @@ public final class FlowEngine implements Runnable {
* This method should only be invoked while inside an engine cycle.
*/
public void scheduleDelayedInContext(FlowNode ctx) {
- FlowTimerQueue timerQueue = this.timerQueue;
+ FlowEventQueue timerQueue = this.eventQueue;
timerQueue.enqueue(ctx);
}
@@ -148,7 +148,7 @@ public final class FlowEngine implements Runnable {
*/
private void doRunEngine(long now) {
final FlowCycleQueue queue = this.cycleQueue;
- final FlowTimerQueue timerQueue = this.timerQueue;
+ final FlowEventQueue timerQueue = this.eventQueue;
try {
// Mark the engine as active to prevent concurrent calls to this method
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java
index 049eb40d..53649d29 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java
@@ -26,12 +26,12 @@ import java.util.Arrays;
import org.opendc.simulator.engine.graph.FlowNode;
/**
- * A specialized priority queue for timers of {@link FlowNode}s.
+ * A specialized priority queue for future event of {@link FlowNode}s sorted on time.
* <p>
* By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
* being generic.
*/
-public final class FlowTimerQueue {
+public final class FlowEventQueue {
/**
* Array representation of binary heap of {@link FlowNode} instances.
*/
@@ -43,11 +43,11 @@ public final class FlowTimerQueue {
private int size = 0;
/**
- * Construct a {@link FlowTimerQueue} with the specified initial capacity.
+ * Construct a {@link FlowEventQueue} with the specified initial capacity.
*
* @param initialCapacity The initial capacity of the queue.
*/
- public FlowTimerQueue(int initialCapacity) {
+ public FlowEventQueue(int initialCapacity) {
this.queue = new FlowNode[initialCapacity];
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
index 2130d376..a9da6f5d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
@@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph;
public interface FlowConsumer {
- void handleSupply(FlowEdge supplierEdge, double newSupply);
+ void handleIncomingSupply(FlowEdge supplierEdge, double newSupply);
- void pushDemand(FlowEdge supplierEdge, double newDemand);
+ void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand);
void addSupplierEdge(FlowEdge supplierEdge);
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 7ef091f8..16bb161f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -29,27 +29,30 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
private FlowEdge supplierEdge;
- private final ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers
- private final ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers
+ private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers
+ private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers
- private double totalDemand; // The total demand of all the consumers
- private double totalSupply; // The total supply from the supplier
+ private double totalIncomingDemand; // The total demand of all the consumers
+ private double currentIncomingSupply; // The current supply provided by the supplier
- private boolean overLoaded = false;
- private int currentConsumerIdx = -1;
+ private boolean outgoingDemandUpdateNeeded = false;
- private double capacity; // What is the max capacity
+ private boolean overloaded = false;
+
+ private double capacity; // What is the max capacity. Can probably be removed
+
+ private final ArrayList<Integer> updatedDemands = new ArrayList<>();
public FlowDistributor(FlowGraph graph) {
super(graph);
}
- public double getTotalDemand() {
- return totalDemand;
+ public double getTotalIncomingDemand() {
+ return totalIncomingDemand;
}
- public double getTotalSupply() {
- return totalSupply;
+ public double getCurrentIncomingSupply() {
+ return currentIncomingSupply;
}
public double getCapacity() {
@@ -58,38 +61,61 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
public long onUpdate(long now) {
+ // Check if current supply is different from total demand
+ if (this.outgoingDemandUpdateNeeded) {
+ this.updateOutgoingDemand();
+
+ return Long.MAX_VALUE;
+ }
+
+ this.updateOutgoingSupplies();
+
return Long.MAX_VALUE;
}
- private void distributeSupply() {
- // if supply >= demand -> push supplies to all tasks
- if (this.totalSupply > this.totalDemand) {
+ private void updateOutgoingDemand() {
+ this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand);
- // If this came from a state of overload, provide all consumers with their demand
- if (this.overLoaded) {
- for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
- }
- }
+ this.outgoingDemandUpdateNeeded = false;
- if (this.currentConsumerIdx != -1) {
- this.pushSupply(
- this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx));
- this.currentConsumerIdx = -1;
- }
+ this.invalidate();
+ }
- this.overLoaded = false;
- }
+ private void updateOutgoingSupplies() {
+
+ // If the demand is higher than the current supply, the system is overloaded.
+ // The available supply is distributed based on the current distribution function.
+ if (this.totalIncomingDemand > this.currentIncomingSupply) {
+ this.overloaded = true;
- // if supply < demand -> distribute the supply over all consumers
- else {
- this.overLoaded = true;
- double[] supplies = redistributeSupply(this.demands, this.totalSupply);
+ double[] supplies = distributeSupply(this.incomingDemands, this.currentIncomingSupply);
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- this.pushSupply(this.consumerEdges.get(idx), supplies[idx]);
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx]);
+ }
+
+ } else {
+
+ // If the distributor was overloaded before, but is not anymore:
+ // provide all consumers with their demand
+ if (this.overloaded) {
+ for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
+ if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
+ }
+ }
+ this.overloaded = false;
+ }
+
+ // Update the supplies of the consumers that changed their demand in the current cycle
+ else {
+ for (int idx : this.updatedDemands) {
+ this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
+ }
}
}
+
+ this.updatedDemands.clear();
}
private record Demand(int idx, double value) {}
@@ -97,10 +123,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
/**
* Distributed the available supply over the different demands.
* The supply is distributed using MaxMin Fairness.
- *
- * TODO: Move this outside of the Distributor so we can easily add different redistribution methods
*/
- private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) {
+ private static double[] distributeSupply(ArrayList<Double> demands, double currentSupply) {
int inputSize = demands.size();
final double[] supplies = new double[inputSize];
@@ -116,7 +140,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return i1.compareTo(i2);
});
- double availableCapacity = totalSupply; // totalSupply
+ double availableCapacity = currentSupply; // totalSupply
for (int i = 0; i < inputSize; i++) {
double d = tempDemands[i].value;
@@ -133,7 +157,6 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
availableCapacity -= r;
}
- // Return the used capacity
return supplies;
}
@@ -146,15 +169,15 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
consumerEdge.setConsumerIndex(this.consumerEdges.size());
this.consumerEdges.add(consumerEdge);
- this.demands.add(0.0);
- this.supplies.add(0.0);
+ this.incomingDemands.add(0.0);
+ this.outgoingSupplies.add(0.0);
}
@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = supplierEdge;
this.capacity = supplierEdge.getCapacity();
- this.totalSupply = 0;
+ this.currentIncomingSupply = 0;
}
@Override
@@ -165,84 +188,87 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return;
}
- this.totalDemand -= consumerEdge.getDemand();
+ this.totalIncomingDemand -= consumerEdge.getDemand();
this.consumerEdges.remove(idx);
- this.demands.remove(idx);
- this.supplies.remove(idx);
+ this.incomingDemands.remove(idx);
+ this.outgoingSupplies.remove(idx);
// update the consumer index for all consumerEdges higher than this.
for (int i = idx; i < this.consumerEdges.size(); i++) {
this.consumerEdges.get(i).setConsumerIndex(i);
}
- this.currentConsumerIdx = -1;
+ for (int i = 0; i < this.updatedDemands.size(); i++) {
+ int j = this.updatedDemands.get(i);
- if (this.overLoaded) {
- this.distributeSupply();
+ if (j == idx) {
+ this.updatedDemands.remove(idx);
+ }
+ if (j > idx) {
+ this.updatedDemands.set(i, j - 1);
+ }
}
- this.pushDemand(this.supplierEdge, this.totalDemand);
+ this.outgoingDemandUpdateNeeded = true;
+ this.invalidate();
}
@Override
public void removeSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = null;
this.capacity = 0;
- this.totalSupply = 0;
+ this.currentIncomingSupply = 0;
+
+ this.invalidate();
}
@Override
- public void handleDemand(FlowEdge consumerEdge, double newDemand) {
+ public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) {
int idx = consumerEdge.getConsumerIndex();
- this.currentConsumerIdx = idx;
-
if (idx == -1) {
System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer");
return;
}
// Update the total demand (This is cheaper than summing over all demands)
- double prevDemand = demands.get(idx);
+ double prevDemand = incomingDemands.get(idx);
- demands.set(idx, newDemand);
- this.totalDemand += (newDemand - prevDemand);
+ incomingDemands.set(idx, newDemand);
+ this.totalIncomingDemand += (newDemand - prevDemand);
- if (overLoaded) {
- distributeSupply();
- }
+ this.updatedDemands.add(idx);
- // Send new totalDemand to CPU
- // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply)
- this.pushDemand(this.supplierEdge, this.totalDemand);
+ this.outgoingDemandUpdateNeeded = true;
+ this.invalidate();
}
@Override
- public void handleSupply(FlowEdge supplierEdge, double newSupply) {
- this.totalSupply = newSupply;
+ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
+ this.currentIncomingSupply = newSupply;
- this.distributeSupply();
+ this.invalidate();
}
@Override
- public void pushDemand(FlowEdge supplierEdge, double newDemand) {
+ public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
this.supplierEdge.pushDemand(newDemand);
}
@Override
- public void pushSupply(FlowEdge consumerEdge, double newSupply) {
+ public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) {
int idx = consumerEdge.getConsumerIndex();
if (idx == -1) {
System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer");
}
- if (supplies.get(idx) == newSupply) {
+ if (outgoingSupplies.get(idx) == newSupply) {
return;
}
- supplies.set(idx, newSupply);
+ outgoingSupplies.set(idx, newSupply);
consumerEdge.pushSupply(newSupply);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
index b7162508..9521f2ce 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
@@ -108,24 +108,38 @@ public class FlowEdge {
/**
* Push new demand from the Consumer to the Supplier
*/
- public void pushDemand(double newDemand) {
- if (newDemand == this.demand) {
+ public void pushDemand(double newDemand, boolean forceThrough) {
+ if ((newDemand == this.demand) && !forceThrough) {
return;
}
this.demand = newDemand;
- this.supplier.handleDemand(this, newDemand);
+ this.supplier.handleIncomingDemand(this, newDemand);
+ }
+
+ /**
+ * Push new demand from the Consumer to the Supplier
+ */
+ public void pushDemand(double newDemand) {
+ this.pushDemand(newDemand, false);
}
/**
* Push new supply from the Supplier to the Consumer
*/
- public void pushSupply(double newSupply) {
- if (newSupply == this.supply) {
+ public void pushSupply(double newSupply, boolean forceThrough) {
+ if ((newSupply == this.supply) && !forceThrough) {
return;
}
this.supply = newSupply;
- this.consumer.handleSupply(this, newSupply);
+ this.consumer.handleIncomingSupply(this, newSupply);
+ }
+
+ /**
+ * Push new supply from the Supplier to the Consumer
+ */
+ public void pushSupply(double newSupply) {
+ this.pushSupply(newSupply, false);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
index 64cd0d8c..e24e9f93 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
@@ -24,7 +24,7 @@ package org.opendc.simulator.engine.graph;
import java.time.InstantSource;
import org.opendc.simulator.engine.engine.FlowEngine;
-import org.opendc.simulator.engine.engine.FlowTimerQueue;
+import org.opendc.simulator.engine.engine.FlowEventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,7 +109,7 @@ public abstract class FlowNode {
private long deadline = Long.MAX_VALUE;
/**
- * The index of the timer in the {@link FlowTimerQueue}.
+ * The index of the timer in the {@link FlowEventQueue}.
*/
private int timerIndex = -1;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
index 84602ee0..da65392b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
@@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph;
public interface FlowSupplier {
- void handleDemand(FlowEdge consumerEdge, double newDemand);
+ void handleIncomingDemand(FlowEdge consumerEdge, double newDemand);
- void pushSupply(FlowEdge consumerEdge, double newSupply);
+ void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply);
void addConsumerEdge(FlowEdge consumerEdge);