summaryrefslogtreecommitdiff
path: root/odcsim-core/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-13 20:26:56 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-14 12:55:56 +0200
commit91ea74177ec4cd586036a2151933218e8f39866f (patch)
treedc5b9c850f0babdf8d2d69a804beca7326e661d6 /odcsim-core/src/main
parent2f6dcaef25d80a1411512e482953c83990149fd1 (diff)
feat: Add support for stashing messages
This change adds a StashBuffer to stash messages temporarily for processing at a later moment.
Diffstat (limited to 'odcsim-core/src/main')
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt88
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt75
2 files changed, 163 insertions, 0 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt
new file mode 100644
index 00000000..5d73d808
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt
@@ -0,0 +1,88 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim
+
+import com.atlarge.odcsim.internal.StashBufferImpl
+
+/**
+ * A non thread safe mutable message buffer that can be used to buffer messages inside actors and then unstash them.
+ *
+ * @param T The shape of the messages in this buffer.
+ */
+interface StashBuffer<T : Any> {
+ /**
+ * The first element of the buffer.
+ *
+ * @throws NoSuchElementException if the buffer is empty.
+ */
+ val head: T
+
+ /**
+ * A flag to indicate whether the buffer is empty.
+ */
+ val isEmpty: Boolean
+
+ /**
+ * A flag to indicate whether the buffer is full.
+ */
+ val isFull: Boolean
+
+ /**
+ * The number of elements in the stash buffer.
+ */
+ val size: Int
+
+ /**
+ * Iterate over all elements of the buffer and apply a function to each element, without removing them.
+ *
+ * @param block The function to invoke for each element.
+ */
+ fun forEach(block: (T) -> Unit)
+
+ /**
+ * Add one element to the end of the message buffer.
+ *
+ * @param msg The message to stash.
+ * @throws IllegalStateException if the element cannot be added at this time due to capacity restrictions
+ */
+ fun stash(msg: T)
+
+ /**
+ * Process all stashed messages with the behavior and the returned [Behavior] from each processed message.
+ *
+ * @param ctx The actor context to process these messages in.
+ * @param behavior The behavior to process the messages with.
+ */
+ fun unstashAll(ctx: ActorContext<T>, behavior: Behavior<T>): Behavior<T>
+
+ companion object {
+ /**
+ * Construct a [StashBuffer] with the specified [capacity].
+ *
+ * @param capacity The capacity of the buffer.
+ */
+ operator fun <T : Any> invoke(capacity: Int): StashBuffer<T> = StashBufferImpl(capacity)
+ }
+}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt
new file mode 100644
index 00000000..1d85a8e6
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt
@@ -0,0 +1,75 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.internal
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.StashBuffer
+import java.util.ArrayDeque
+
+/**
+ * Internal implementation of the [StashBuffer] interface.
+ */
+internal class StashBufferImpl<T : Any>(private val capacity: Int) : StashBuffer<T> {
+ /**
+ * The internal queue used to store the messages.
+ */
+ private val queue = ArrayDeque<T>(capacity)
+
+ override val head: T
+ get() = queue.first
+
+ override val isEmpty: Boolean
+ get() = queue.isEmpty()
+
+ override val isFull: Boolean
+ get() = size > capacity
+
+ override val size: Int
+ get() = queue.size
+
+ override fun forEach(block: (T) -> Unit) {
+ queue.toList().forEach(block)
+ }
+
+ override fun stash(msg: T) {
+ queue.add(msg)
+ }
+
+ override fun unstashAll(ctx: ActorContext<T>, behavior: Behavior<T>): Behavior<T> {
+ val messages = queue.toList()
+ queue.clear()
+
+ val interpreter = BehaviorInterpreter<T>(behavior)
+ interpreter.start(ctx)
+
+ for (message in messages) {
+ interpreter.interpretMessage(ctx, message)
+ }
+
+ return interpreter.behavior
+ }
+
+}