summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java)12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java)3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java)29
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java)23
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java)3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java)87
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt2
11 files changed, 106 insertions, 61 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index 10af7c51..1a068b40 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -20,12 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.engine;
import java.time.Clock;
import java.time.InstantSource;
import kotlin.coroutines.CoroutineContext;
import org.opendc.common.Dispatcher;
+import org.opendc.simulator.engine.graph.FlowGraph;
+import org.opendc.simulator.engine.graph.FlowNode;
/**
* A {@link FlowEngine} simulates a generic flow network.
@@ -89,7 +91,7 @@ public final class FlowEngine implements Runnable {
* This method should be used when the state of a flow context is invalidated/interrupted and needs to be
* re-computed.
*/
- void scheduleImmediate(long now, FlowNode ctx) {
+ public void scheduleImmediate(long now, FlowNode ctx) {
scheduleImmediateInContext(ctx);
// In-case the engine is already running in the call-stack, return immediately. The changes will be picked
@@ -109,14 +111,14 @@ public final class FlowEngine implements Runnable {
* <p>
* This method should only be invoked while inside an engine cycle.
*/
- void scheduleImmediateInContext(FlowNode ctx) {
+ public void scheduleImmediateInContext(FlowNode ctx) {
queue.add(ctx);
}
/**
* Enqueue the specified {@link FlowNode} to be updated at its updated deadline.
*/
- void scheduleDelayed(FlowNode ctx) {
+ public void scheduleDelayed(FlowNode ctx) {
scheduleDelayedInContext(ctx);
// In-case the engine is already running in the call-stack, return immediately. The changes will be picked
@@ -136,7 +138,7 @@ public final class FlowEngine implements Runnable {
* <p>
* This method should only be invoked while inside an engine cycle.
*/
- void scheduleDelayedInContext(FlowNode ctx) {
+ public void scheduleDelayedInContext(FlowNode ctx) {
FlowTimerQueue timerQueue = this.timerQueue;
timerQueue.enqueue(ctx);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java
index 37b3c65b..bd622083 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java
@@ -20,10 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.engine;
import java.util.ArrayDeque;
import java.util.Arrays;
+import org.opendc.simulator.engine.graph.FlowNode;
/**
* A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java
index 1e348b10..049eb40d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java
@@ -20,9 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.engine;
import java.util.Arrays;
+import org.opendc.simulator.engine.graph.FlowNode;
/**
* A specialized priority queue for timers of {@link FlowNode}s.
@@ -55,9 +56,9 @@ public final class FlowTimerQueue {
*/
public void enqueue(FlowNode node) {
FlowNode[] es = queue;
- int k = node.timerIndex;
+ int k = node.getTimerIndex();
- if (node.deadline != Long.MAX_VALUE) {
+ if (node.getDeadline() != Long.MAX_VALUE) {
if (k >= 0) {
update(es, node, k);
} else {
@@ -82,7 +83,7 @@ public final class FlowTimerQueue {
final FlowNode[] es = queue;
final FlowNode head = es[0];
- if (now < head.deadline) {
+ if (now < head.getDeadline()) {
return null;
}
@@ -95,7 +96,7 @@ public final class FlowTimerQueue {
siftDown(0, next, es, n);
}
- head.timerIndex = -1;
+ head.setTimerIndex(-1);
return head;
}
@@ -104,7 +105,7 @@ public final class FlowTimerQueue {
*/
public long peekDeadline() {
if (this.size > 0) {
- return this.queue[0].deadline;
+ return this.queue[0].getDeadline();
}
return Long.MAX_VALUE;
@@ -130,7 +131,7 @@ public final class FlowTimerQueue {
private void update(FlowNode[] es, FlowNode node, int k) {
if (k > 0) {
int parent = (k - 1) >>> 1;
- if (es[parent].deadline > node.deadline) {
+ if (es[parent].getDeadline() > node.getDeadline()) {
siftUp(k, node, es);
return;
}
@@ -175,13 +176,13 @@ public final class FlowTimerQueue {
while (k > 0) {
int parent = (k - 1) >>> 1;
FlowNode e = es[parent];
- if (key.deadline >= e.deadline) break;
+ if (key.getDeadline() >= e.getDeadline()) break;
es[k] = e;
- e.timerIndex = k;
+ e.setTimerIndex(k);
k = parent;
}
es[k] = key;
- key.timerIndex = k;
+ key.setTimerIndex(k);
}
private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) {
@@ -190,16 +191,16 @@ public final class FlowTimerQueue {
int child = (k << 1) + 1; // assume left child is least
FlowNode c = es[child];
int right = child + 1;
- if (right < n && c.deadline > es[right].deadline) c = es[child = right];
+ if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right];
- if (key.deadline <= c.deadline) break;
+ if (key.getDeadline() <= c.getDeadline()) break;
es[k] = c;
- c.timerIndex = k;
+ c.setTimerIndex(k);
k = child;
}
es[k] = key;
- key.timerIndex = k;
+ key.setTimerIndex(k);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java
index 15da2f23..5607278c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.engine;
import java.util.Arrays;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
index ddb40794..2130d376 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.graph;
public interface FlowConsumer {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 48177412..7ef091f8 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -20,17 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.simulator;
+package org.opendc.simulator.engine.graph;
import java.util.ArrayList;
import java.util.Arrays;
-import org.opendc.simulator.engine.FlowConsumer;
-import org.opendc.simulator.engine.FlowEdge;
-import org.opendc.simulator.engine.FlowGraph;
-import org.opendc.simulator.engine.FlowNode;
-import org.opendc.simulator.engine.FlowSupplier;
-public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer {
+public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
private FlowEdge supplierEdge;
@@ -45,7 +40,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
private double capacity; // What is the max capacity
- public Multiplexer(FlowGraph graph) {
+ public FlowDistributor(FlowGraph graph) {
super(graph);
}
@@ -70,7 +65,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
// if supply >= demand -> push supplies to all tasks
if (this.totalSupply > this.totalDemand) {
- // If this came from a state of over provisioning, provide all consumers with their demand
+ // If this came from a state of overload, provide all consumers with their demand
if (this.overLoaded) {
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
@@ -99,6 +94,12 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
private record Demand(int idx, double value) {}
+ /**
+ * Distributed the available supply over the different demands.
+ * The supply is distributed using MaxMin Fairness.
+ *
+ * TODO: Move this outside of the Distributor so we can easily add different redistribution methods
+ */
private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) {
int inputSize = demands.size();
@@ -198,7 +199,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
this.currentConsumerIdx = idx;
if (idx == -1) {
- System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer");
+ System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer");
return;
}
@@ -234,7 +235,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer
int idx = consumerEdge.getConsumerIndex();
if (idx == -1) {
- System.out.println("Error (Multiplexer): pushing supply to an unknown consumer");
+ System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer");
}
if (supplies.get(idx) == newSupply) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
index 95fe7928..b7162508 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.graph;
/**
* An edge that connects two FlowStages.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
index d82b542b..0e6e137c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
@@ -20,10 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.graph;
import java.util.ArrayList;
import java.util.HashMap;
+import org.opendc.simulator.engine.engine.FlowEngine;
public class FlowGraph {
private final FlowEngine engine;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
index d1faf465..6ee947bc 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java
@@ -20,9 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.graph;
import java.time.InstantSource;
+import org.opendc.simulator.engine.engine.FlowEngine;
+import org.opendc.simulator.engine.engine.FlowTimerQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,21 +44,79 @@ public abstract class FlowNode {
protected NodeState nodeState = NodeState.PENDING;
+ public NodeState getNodeState() {
+ return nodeState;
+ }
+
+ public void setNodeState(NodeState nodeState) {
+ this.nodeState = nodeState;
+ }
+
+ public int getTimerIndex() {
+ return timerIndex;
+ }
+
+ public void setTimerIndex(int index) {
+ this.timerIndex = index;
+ }
+
+ public InstantSource getClock() {
+ return clock;
+ }
+
+ public void setClock(InstantSource clock) {
+ this.clock = clock;
+ }
+
+ public FlowGraph getParentGraph() {
+ return parentGraph;
+ }
+
+ public void setParentGraph(FlowGraph parentGraph) {
+ this.parentGraph = parentGraph;
+ }
+
+ public FlowEngine getEngine() {
+ return engine;
+ }
+
+ public void setEngine(FlowEngine engine) {
+ this.engine = engine;
+ }
+
+ /**
+ * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch).
+ */
+ public long getDeadline() {
+ return deadline;
+ }
+
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+ }
+
/**
* The deadline of the stage after which an update should run.
*/
- long deadline = Long.MAX_VALUE;
+ private long deadline = Long.MAX_VALUE;
/**
* The index of the timer in the {@link FlowTimerQueue}.
*/
- int timerIndex = -1;
+ private int timerIndex = -1;
protected InstantSource clock;
protected FlowGraph parentGraph;
protected FlowEngine engine;
/**
+ * Return the {@link FlowGraph} to which this stage belongs.
+ */
+ public FlowGraph getGraph() {
+ return parentGraph;
+ }
+
+ /**
* Construct a new {@link FlowNode} instance.
*
* @param parentGraph The {@link FlowGraph} this stage belongs to.
@@ -70,27 +130,6 @@ public abstract class FlowNode {
}
/**
- * Return the {@link FlowGraph} to which this stage belongs.
- */
- public FlowGraph getGraph() {
- return parentGraph;
- }
-
- /**
- * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch).
- */
- public long getDeadline() {
- return deadline;
- }
-
- public void setDeadline(long deadline) {
- this.deadline = deadline;
- }
-
- public void setTimerIndex(int index) {
- this.timerIndex = index;
- }
- /**
* Invalidate the {@link FlowNode} forcing the stage to update.
*
* <p>
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
index 955f4943..84602ee0 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.engine;
+package org.opendc.simulator.engine.graph;
public interface FlowSupplier {
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt
index 7744d7b2..4dd17dbe 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt
@@ -24,7 +24,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
-import org.opendc.simulator.engine.InvocationStack
+import org.opendc.simulator.engine.engine.InvocationStack
/**
* Test suite for the [InvocationStack] class.