diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java')
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java) | 12 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java) | 3 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java) | 29 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java) | 2 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java) | 2 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java) | 23 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java) | 2 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java) | 3 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java) | 87 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java) | 2 |
10 files changed, 105 insertions, 60 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 10af7c51..1a068b40 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -20,12 +20,14 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.time.Clock; import java.time.InstantSource; import kotlin.coroutines.CoroutineContext; import org.opendc.common.Dispatcher; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; /** * A {@link FlowEngine} simulates a generic flow network. @@ -89,7 +91,7 @@ public final class FlowEngine implements Runnable { * This method should be used when the state of a flow context is invalidated/interrupted and needs to be * re-computed. */ - void scheduleImmediate(long now, FlowNode ctx) { + public void scheduleImmediate(long now, FlowNode ctx) { scheduleImmediateInContext(ctx); // In-case the engine is already running in the call-stack, return immediately. The changes will be picked @@ -109,14 +111,14 @@ public final class FlowEngine implements Runnable { * <p> * This method should only be invoked while inside an engine cycle. */ - void scheduleImmediateInContext(FlowNode ctx) { + public void scheduleImmediateInContext(FlowNode ctx) { queue.add(ctx); } /** * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. */ - void scheduleDelayed(FlowNode ctx) { + public void scheduleDelayed(FlowNode ctx) { scheduleDelayedInContext(ctx); // In-case the engine is already running in the call-stack, return immediately. The changes will be picked @@ -136,7 +138,7 @@ public final class FlowEngine implements Runnable { * <p> * This method should only be invoked while inside an engine cycle. */ - void scheduleDelayedInContext(FlowNode ctx) { + public void scheduleDelayedInContext(FlowNode ctx) { FlowTimerQueue timerQueue = this.timerQueue; timerQueue.enqueue(ctx); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java index 37b3c65b..bd622083 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java @@ -20,10 +20,11 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.util.ArrayDeque; import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; /** * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java index 1e348b10..049eb40d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java @@ -20,9 +20,10 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; /** * A specialized priority queue for timers of {@link FlowNode}s. @@ -55,9 +56,9 @@ public final class FlowTimerQueue { */ public void enqueue(FlowNode node) { FlowNode[] es = queue; - int k = node.timerIndex; + int k = node.getTimerIndex(); - if (node.deadline != Long.MAX_VALUE) { + if (node.getDeadline() != Long.MAX_VALUE) { if (k >= 0) { update(es, node, k); } else { @@ -82,7 +83,7 @@ public final class FlowTimerQueue { final FlowNode[] es = queue; final FlowNode head = es[0]; - if (now < head.deadline) { + if (now < head.getDeadline()) { return null; } @@ -95,7 +96,7 @@ public final class FlowTimerQueue { siftDown(0, next, es, n); } - head.timerIndex = -1; + head.setTimerIndex(-1); return head; } @@ -104,7 +105,7 @@ public final class FlowTimerQueue { */ public long peekDeadline() { if (this.size > 0) { - return this.queue[0].deadline; + return this.queue[0].getDeadline(); } return Long.MAX_VALUE; @@ -130,7 +131,7 @@ public final class FlowTimerQueue { private void update(FlowNode[] es, FlowNode node, int k) { if (k > 0) { int parent = (k - 1) >>> 1; - if (es[parent].deadline > node.deadline) { + if (es[parent].getDeadline() > node.getDeadline()) { siftUp(k, node, es); return; } @@ -175,13 +176,13 @@ public final class FlowTimerQueue { while (k > 0) { int parent = (k - 1) >>> 1; FlowNode e = es[parent]; - if (key.deadline >= e.deadline) break; + if (key.getDeadline() >= e.getDeadline()) break; es[k] = e; - e.timerIndex = k; + e.setTimerIndex(k); k = parent; } es[k] = key; - key.timerIndex = k; + key.setTimerIndex(k); } private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { @@ -190,16 +191,16 @@ public final class FlowTimerQueue { int child = (k << 1) + 1; // assume left child is least FlowNode c = es[child]; int right = child + 1; - if (right < n && c.deadline > es[right].deadline) c = es[child = right]; + if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; - if (key.deadline <= c.deadline) break; + if (key.getDeadline() <= c.getDeadline()) break; es[k] = c; - c.timerIndex = k; + c.setTimerIndex(k); k = child; } es[k] = key; - key.timerIndex = k; + key.setTimerIndex(k); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java index 15da2f23..5607278c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.engine; import java.util.Arrays; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java index ddb40794..2130d376 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; public interface FlowConsumer { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 48177412..7ef091f8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -20,17 +20,12 @@ * SOFTWARE. */ -package org.opendc.simulator; +package org.opendc.simulator.engine.graph; import java.util.ArrayList; import java.util.Arrays; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; -public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer { +public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); private FlowEdge supplierEdge; @@ -45,7 +40,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer private double capacity; // What is the max capacity - public Multiplexer(FlowGraph graph) { + public FlowDistributor(FlowGraph graph) { super(graph); } @@ -70,7 +65,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer // if supply >= demand -> push supplies to all tasks if (this.totalSupply > this.totalDemand) { - // If this came from a state of over provisioning, provide all consumers with their demand + // If this came from a state of overload, provide all consumers with their demand if (this.overLoaded) { for (int idx = 0; idx < this.consumerEdges.size(); idx++) { this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); @@ -99,6 +94,12 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer private record Demand(int idx, double value) {} + /** + * Distributed the available supply over the different demands. + * The supply is distributed using MaxMin Fairness. + * + * TODO: Move this outside of the Distributor so we can easily add different redistribution methods + */ private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) { int inputSize = demands.size(); @@ -198,7 +199,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer this.currentConsumerIdx = idx; if (idx == -1) { - System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer"); + System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); return; } @@ -234,7 +235,7 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { - System.out.println("Error (Multiplexer): pushing supply to an unknown consumer"); + System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); } if (supplies.get(idx) == newSupply) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index 95fe7928..b7162508 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; /** * An edge that connects two FlowStages. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java index d82b542b..0e6e137c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java @@ -20,10 +20,11 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; import java.util.ArrayList; import java.util.HashMap; +import org.opendc.simulator.engine.engine.FlowEngine; public class FlowGraph { private final FlowEngine engine; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index d1faf465..6ee947bc 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -20,9 +20,11 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; import java.time.InstantSource; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.engine.FlowTimerQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,21 +44,79 @@ public abstract class FlowNode { protected NodeState nodeState = NodeState.PENDING; + public NodeState getNodeState() { + return nodeState; + } + + public void setNodeState(NodeState nodeState) { + this.nodeState = nodeState; + } + + public int getTimerIndex() { + return timerIndex; + } + + public void setTimerIndex(int index) { + this.timerIndex = index; + } + + public InstantSource getClock() { + return clock; + } + + public void setClock(InstantSource clock) { + this.clock = clock; + } + + public FlowGraph getParentGraph() { + return parentGraph; + } + + public void setParentGraph(FlowGraph parentGraph) { + this.parentGraph = parentGraph; + } + + public FlowEngine getEngine() { + return engine; + } + + public void setEngine(FlowEngine engine) { + this.engine = engine; + } + + /** + * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + /** * The deadline of the stage after which an update should run. */ - long deadline = Long.MAX_VALUE; + private long deadline = Long.MAX_VALUE; /** * The index of the timer in the {@link FlowTimerQueue}. */ - int timerIndex = -1; + private int timerIndex = -1; protected InstantSource clock; protected FlowGraph parentGraph; protected FlowEngine engine; /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** * Construct a new {@link FlowNode} instance. * * @param parentGraph The {@link FlowGraph} this stage belongs to. @@ -70,27 +130,6 @@ public abstract class FlowNode { } /** - * Return the {@link FlowGraph} to which this stage belongs. - */ - public FlowGraph getGraph() { - return parentGraph; - } - - /** - * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). - */ - public long getDeadline() { - return deadline; - } - - public void setDeadline(long deadline) { - this.deadline = deadline; - } - - public void setTimerIndex(int index) { - this.timerIndex = index; - } - /** * Invalidate the {@link FlowNode} forcing the stage to update. * * <p> diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java index 955f4943..84602ee0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.engine; +package org.opendc.simulator.engine.graph; public interface FlowSupplier { |
