From c425a03c59e7d5c2e5d82988c61e340a6cbf61fe Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 17 Dec 2024 11:14:13 +0100 Subject: Updated the FlowEngine so nodes that have to be updated in the current cycle cannot be scheduled twice. (#284) Updated the FlowNodeQueue --- .../simulator/engine/engine/FlowCycleQueue.java | 117 +++++++++++++++++++++ .../opendc/simulator/engine/engine/FlowEngine.java | 8 +- .../simulator/engine/engine/FlowNodeQueue.java | 110 ------------------- .../opendc/simulator/engine/graph/FlowNode.java | 31 +++--- 4 files changed, 138 insertions(+), 128 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java (limited to 'opendc-simulator/opendc-simulator-flow/src/main') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java new file mode 100644 index 00000000..72dd217c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowCycleQueue.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.ArrayDeque; +import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s + * that should be updated in the current cycle, because of a change caused by another update in the current cycle. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class FlowCycleQueue { + /** + * The array of elements in the queue. + */ + private FlowNode[] nodeQueue; + + private int head = 0; + private int tail = 0; + + public FlowCycleQueue(int initialCapacity) { + nodeQueue = new FlowNode[initialCapacity]; + } + + /** + * Add the specified context to the queue. + */ + void add(FlowNode ctx) { + if (ctx.getInCycleQueue()) { + return; + } + + final FlowNode[] es = nodeQueue; + int tail = this.tail; + + es[tail] = ctx; + + tail = inc(tail, es.length); + this.tail = tail; + + if (head == tail) { + doubleCapacity(); + } + + ctx.setInCycleQueue(true); + } + + /** + * Remove a {@link FlowNode} from the queue or null if the queue is empty. + */ + FlowNode poll() { + final FlowNode[] es = nodeQueue; + int head = this.head; + FlowNode ctx = es[head]; + + if (ctx != null) { + es[head] = null; + this.head = inc(head, es.length); + ctx.setInCycleQueue(false); + } + + return ctx; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = nodeQueue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + final FlowNode[] es = nodeQueue = Arrays.copyOf(nodeQueue, newCapacity); + + // Exceptionally, here tail == head needs to be disambiguated + if (tail < head || (tail == head && es[head] != null)) { + // wrap around; slide first leg forward to end of array + int newSpace = newCapacity - oldCapacity; + System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); + for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; + } + } + + /** + * Circularly increments i, mod modulus. + * Precondition and postcondition: 0 <= i < modulus. + */ + private static int inc(int i, int modulus) { + if (++i >= modulus) i = 0; + return i; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 1a068b40..24476048 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -37,9 +37,9 @@ import org.opendc.simulator.engine.graph.FlowNode; */ public final class FlowEngine implements Runnable { /** - * The queue of {@link FlowNode} updates that are scheduled for immediate execution. + * The queue of {@link FlowNode} updates that need to be updated in the current cycle. */ - private final FlowNodeQueue queue = new FlowNodeQueue(256); + private final FlowCycleQueue cycleQueue = new FlowCycleQueue(256); /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. @@ -112,7 +112,7 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleImmediateInContext(FlowNode ctx) { - queue.add(ctx); + cycleQueue.add(ctx); } /** @@ -147,7 +147,7 @@ public final class FlowEngine implements Runnable { * Run all the enqueued actions for the specified timestamp (now). */ private void doRunEngine(long now) { - final FlowNodeQueue queue = this.queue; + final FlowCycleQueue queue = this.cycleQueue; final FlowTimerQueue timerQueue = this.timerQueue; try { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java deleted file mode 100644 index bd622083..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.engine; - -import java.util.ArrayDeque; -import java.util.Arrays; -import org.opendc.simulator.engine.graph.FlowNode; - -/** - * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s - * that have been updated during the engine cycle and should converge. - *

- * By using a specialized class, we reduce the overhead caused by type-erasure. - */ -final class FlowNodeQueue { - /** - * The array of elements in the queue. - */ - private FlowNode[] elements; - - private int head = 0; - private int tail = 0; - - public FlowNodeQueue(int initialCapacity) { - elements = new FlowNode[initialCapacity]; - } - - /** - * Add the specified context to the queue. - */ - void add(FlowNode ctx) { - final FlowNode[] es = elements; - int tail = this.tail; - - es[tail] = ctx; - - tail = inc(tail, es.length); - this.tail = tail; - - if (head == tail) { - doubleCapacity(); - } - } - - /** - * Remove a {@link FlowNode} from the queue or null if the queue is empty. - */ - FlowNode poll() { - final FlowNode[] es = elements; - int head = this.head; - FlowNode ctx = es[head]; - - if (ctx != null) { - es[head] = null; - this.head = inc(head, es.length); - } - - return ctx; - } - - /** - * Doubles the capacity of this deque - */ - private void doubleCapacity() { - int oldCapacity = elements.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - if (newCapacity < 0) { - throw new IllegalStateException("Sorry, deque too big"); - } - - final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); - - // Exceptionally, here tail == head needs to be disambiguated - if (tail < head || (tail == head && es[head] != null)) { - // wrap around; slide first leg forward to end of array - int newSpace = newCapacity - oldCapacity; - System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); - for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; - } - } - - /** - * Circularly increments i, mod modulus. - * Precondition and postcondition: 0 <= i < modulus. - */ - private static int inc(int i, int modulus) { - if (++i >= modulus) i = 0; - return i; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index b2827130..64cd0d8c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -60,6 +60,14 @@ public abstract class FlowNode { this.timerIndex = index; } + public Boolean getInCycleQueue() { + return inCycleQueue; + } + + public void setInCycleQueue(Boolean inCycleQueue) { + this.inCycleQueue = inCycleQueue; + } + public InstantSource getClock() { return clock; } @@ -105,6 +113,8 @@ public abstract class FlowNode { */ private int timerIndex = -1; + private Boolean inCycleQueue = false; + protected InstantSource clock; protected FlowGraph parentGraph; protected FlowEngine engine; @@ -139,9 +149,9 @@ public abstract class FlowNode { public void invalidate(long now) { // If there is already an update running, // notify the update, that a next update should be run after - if (this.nodeState == NodeState.UPDATING) { + + if (this.nodeState != NodeState.CLOSING && this.nodeState != NodeState.CLOSED) { this.nodeState = NodeState.INVALIDATED; - } else { engine.scheduleImmediate(now, this); } } @@ -172,15 +182,16 @@ public abstract class FlowNode { doFail(e); } - // Check whether the stage is marked as closing. - if (this.nodeState == NodeState.INVALIDATED) { - newDeadline = now; - } if (this.nodeState == NodeState.CLOSING) { closeNode(); return; } + // Check whether the stage is marked as closing. + if ((this.nodeState == NodeState.INVALIDATED) || (this.nodeState == NodeState.CLOSED)) { + return; + } + this.deadline = newDeadline; // Update the timer queue with the new deadline @@ -211,14 +222,6 @@ public abstract class FlowNode { */ public void closeNode() { if (this.nodeState == NodeState.CLOSED) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); - return; - } - - // If this stage is running an update, notify it that is should close after. - if (this.nodeState == NodeState.UPDATING) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); - this.nodeState = NodeState.CLOSING; return; } -- cgit v1.2.3