diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-13 20:26:56 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-14 12:55:56 +0200 |
| commit | 91ea74177ec4cd586036a2151933218e8f39866f (patch) | |
| tree | dc5b9c850f0babdf8d2d69a804beca7326e661d6 /odcsim-core/src/main | |
| parent | 2f6dcaef25d80a1411512e482953c83990149fd1 (diff) | |
feat: Add support for stashing messages
This change adds a StashBuffer to stash messages temporarily for
processing at a later moment.
Diffstat (limited to 'odcsim-core/src/main')
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt | 88 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt | 75 |
2 files changed, 163 insertions, 0 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt new file mode 100644 index 00000000..5d73d808 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/StashBuffer.kt @@ -0,0 +1,88 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim + +import com.atlarge.odcsim.internal.StashBufferImpl + +/** + * A non thread safe mutable message buffer that can be used to buffer messages inside actors and then unstash them. + * + * @param T The shape of the messages in this buffer. + */ +interface StashBuffer<T : Any> { + /** + * The first element of the buffer. + * + * @throws NoSuchElementException if the buffer is empty. + */ + val head: T + + /** + * A flag to indicate whether the buffer is empty. + */ + val isEmpty: Boolean + + /** + * A flag to indicate whether the buffer is full. + */ + val isFull: Boolean + + /** + * The number of elements in the stash buffer. + */ + val size: Int + + /** + * Iterate over all elements of the buffer and apply a function to each element, without removing them. + * + * @param block The function to invoke for each element. + */ + fun forEach(block: (T) -> Unit) + + /** + * Add one element to the end of the message buffer. + * + * @param msg The message to stash. + * @throws IllegalStateException if the element cannot be added at this time due to capacity restrictions + */ + fun stash(msg: T) + + /** + * Process all stashed messages with the behavior and the returned [Behavior] from each processed message. + * + * @param ctx The actor context to process these messages in. + * @param behavior The behavior to process the messages with. + */ + fun unstashAll(ctx: ActorContext<T>, behavior: Behavior<T>): Behavior<T> + + companion object { + /** + * Construct a [StashBuffer] with the specified [capacity]. + * + * @param capacity The capacity of the buffer. + */ + operator fun <T : Any> invoke(capacity: Int): StashBuffer<T> = StashBufferImpl(capacity) + } +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt new file mode 100644 index 00000000..1d85a8e6 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.StashBuffer +import java.util.ArrayDeque + +/** + * Internal implementation of the [StashBuffer] interface. + */ +internal class StashBufferImpl<T : Any>(private val capacity: Int) : StashBuffer<T> { + /** + * The internal queue used to store the messages. + */ + private val queue = ArrayDeque<T>(capacity) + + override val head: T + get() = queue.first + + override val isEmpty: Boolean + get() = queue.isEmpty() + + override val isFull: Boolean + get() = size > capacity + + override val size: Int + get() = queue.size + + override fun forEach(block: (T) -> Unit) { + queue.toList().forEach(block) + } + + override fun stash(msg: T) { + queue.add(msg) + } + + override fun unstashAll(ctx: ActorContext<T>, behavior: Behavior<T>): Behavior<T> { + val messages = queue.toList() + queue.clear() + + val interpreter = BehaviorInterpreter<T>(behavior) + interpreter.start(ctx) + + for (message in messages) { + interpreter.interpretMessage(ctx, message) + } + + return interpreter.behavior + } + +} |
