diff options
| -rw-r--r-- | opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java | 4 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java | 4 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java) | 25 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java | 8 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java | 31 |
5 files changed, 45 insertions, 27 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index 75bdde92..b612de2c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -270,6 +270,10 @@ final class SimChainWorkload extends SimWorkload implements FlowSupplier { */ @Override public void removeSupplierEdge(FlowEdge supplierEdge) { + if (this.machineEdge == null) { + return; + } + this.stopWorkload(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java index fbbe0815..8487fbc2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java @@ -139,6 +139,10 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { @Override public void stopWorkload() { + if (this.machineEdge == null) { + return; + } + this.closeNode(); this.machineEdge = null; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java index bd622083..72dd217c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java @@ -28,28 +28,32 @@ import org.opendc.simulator.engine.graph.FlowNode; /** * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s - * that have been updated during the engine cycle and should converge. + * that should be updated in the current cycle, because of a change caused by another update in the current cycle. * <p> * By using a specialized class, we reduce the overhead caused by type-erasure. */ -final class FlowNodeQueue { +final class FlowCycleQueue { /** * The array of elements in the queue. */ - private FlowNode[] elements; + private FlowNode[] nodeQueue; private int head = 0; private int tail = 0; - public FlowNodeQueue(int initialCapacity) { - elements = new FlowNode[initialCapacity]; + public FlowCycleQueue(int initialCapacity) { + nodeQueue = new FlowNode[initialCapacity]; } /** * Add the specified context to the queue. */ void add(FlowNode ctx) { - final FlowNode[] es = elements; + if (ctx.getInCycleQueue()) { + return; + } + + final FlowNode[] es = nodeQueue; int tail = this.tail; es[tail] = ctx; @@ -60,19 +64,22 @@ final class FlowNodeQueue { if (head == tail) { doubleCapacity(); } + + ctx.setInCycleQueue(true); } /** * Remove a {@link FlowNode} from the queue or <code>null</code> if the queue is empty. */ FlowNode poll() { - final FlowNode[] es = elements; + final FlowNode[] es = nodeQueue; int head = this.head; FlowNode ctx = es[head]; if (ctx != null) { es[head] = null; this.head = inc(head, es.length); + ctx.setInCycleQueue(false); } return ctx; @@ -82,13 +89,13 @@ final class FlowNodeQueue { * Doubles the capacity of this deque */ private void doubleCapacity() { - int oldCapacity = elements.length; + int oldCapacity = nodeQueue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); if (newCapacity < 0) { throw new IllegalStateException("Sorry, deque too big"); } - final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); + final FlowNode[] es = nodeQueue = Arrays.copyOf(nodeQueue, newCapacity); // Exceptionally, here tail == head needs to be disambiguated if (tail < head || (tail == head && es[head] != null)) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 1a068b40..24476048 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -37,9 +37,9 @@ import org.opendc.simulator.engine.graph.FlowNode; */ public final class FlowEngine implements Runnable { /** - * The queue of {@link FlowNode} updates that are scheduled for immediate execution. + * The queue of {@link FlowNode} updates that need to be updated in the current cycle. */ - private final FlowNodeQueue queue = new FlowNodeQueue(256); + private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256); /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. @@ -112,7 +112,7 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleImmediateInContext(FlowNode ctx) { - queue.add(ctx); + cycleQueue.add(ctx); } /** @@ -147,7 +147,7 @@ public final class FlowEngine implements Runnable { * Run all the enqueued actions for the specified timestamp (<code>now</code>). */ private void doRunEngine(long now) { - final FlowNodeQueue queue = this.queue; + final FlowCycleQueue queue = this.cycleQueue; final FlowTimerQueue timerQueue = this.timerQueue; try { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index b2827130..64cd0d8c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -60,6 +60,14 @@ public abstract class FlowNode { this.timerIndex = index; } + public Boolean getInCycleQueue() { + return inCycleQueue; + } + + public void setInCycleQueue(Boolean inCycleQueue) { + this.inCycleQueue = inCycleQueue; + } + public InstantSource getClock() { return clock; } @@ -105,6 +113,8 @@ public abstract class FlowNode { */ private int timerIndex = -1; + private Boolean inCycleQueue = false; + protected InstantSource clock; protected FlowGraph parentGraph; protected FlowEngine engine; @@ -139,9 +149,9 @@ public abstract class FlowNode { public void invalidate(long now) { // If there is already an update running, // notify the update, that a next update should be run after - if (this.nodeState == NodeState.UPDATING) { + + if (this.nodeState != NodeState.CLOSING && this.nodeState != NodeState.CLOSED) { this.nodeState = NodeState.INVALIDATED; - } else { engine.scheduleImmediate(now, this); } } @@ -172,15 +182,16 @@ public abstract class FlowNode { doFail(e); } - // Check whether the stage is marked as closing. - if (this.nodeState == NodeState.INVALIDATED) { - newDeadline = now; - } if (this.nodeState == NodeState.CLOSING) { closeNode(); return; } + // Check whether the stage is marked as closing. + if ((this.nodeState == NodeState.INVALIDATED) || (this.nodeState == NodeState.CLOSED)) { + return; + } + this.deadline = newDeadline; // Update the timer queue with the new deadline @@ -211,14 +222,6 @@ public abstract class FlowNode { */ public void closeNode() { if (this.nodeState == NodeState.CLOSED) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); - return; - } - - // If this stage is running an update, notify it that is should close after. - if (this.nodeState == NodeState.UPDATING) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); - this.nodeState = NodeState.CLOSING; return; } |
