diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-04 21:28:09 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-14 12:55:55 +0200 |
| commit | 9ebe07bdbcdd7ca8f8b3c1f76a7f26b560cf2625 (patch) | |
| tree | 49aa5d7d59444b864458e3414a396b801edb6b98 /odcsim-core/src/main | |
| parent | 54da96844b9581224a353c1012ec8c2c85810e44 (diff) | |
feat: Add support for timers
This change adds support for timers in Behavior, in order to simplify
sending messages to the `self` actor.
Diffstat (limited to 'odcsim-core/src/main')
3 files changed, 234 insertions, 0 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt index f2736437..ea68f34b 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt @@ -27,6 +27,7 @@ package com.atlarge.odcsim import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.EmptyBehavior import com.atlarge.odcsim.internal.IgnoreBehavior +import com.atlarge.odcsim.internal.TimerSchedulerImpl /** * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors @@ -117,6 +118,25 @@ fun <T : Any> wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Beha } /** + * Obtain a [TimerScheduler] for building a [Behavior] instance. + */ +fun <T : Any> withTimers(handler: (TimerScheduler<T>) -> Behavior<T>): Behavior<T> { + return setup { ctx -> + val scheduler = TimerSchedulerImpl(ctx) + receiveSignal<T> { _, signal -> + if (signal is TimerSchedulerImpl.TimerSignal) { + val res = scheduler.interceptTimerSignal(signal) + if (res != null) { + ctx.send(ctx.self, res) + return@receiveSignal same() + } + } + unhandled() + }.join(handler(scheduler)) + } +} + +/** * Join together both [Behavior] with another [Behavior], essentially running them side-by-side, only directly * propagating stopped behavior. * diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/TimerScheduler.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/TimerScheduler.kt new file mode 100644 index 00000000..c5c54b64 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/TimerScheduler.kt @@ -0,0 +1,92 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim + +/** + * An interface to provide support for scheduled self messages in an actor. It is used with [withTimers]. + * Timers are bound to the lifecycle of the actor that owns it, and thus are cancelled automatically when it is + * restarted or stopped. + * + * Please be aware that [TimerScheduler] is not thread-safe and must only be used within the actor that owns it. + * + * @param T The shape of the messages the owning actor of this scheduling accepts. + */ +interface TimerScheduler<T : Any> { + /** + * Cancel a timer with the given key. + * + * @param key The key of the timer. + */ + fun cancel(key: Any) + + /** + * Cancel all timers. + */ + fun cancelAll() + + /** + * Check if a timer with a given [key] is active. + * + * @param key The key to check if it is active. + * @return `true` if a timer with the specified key is active, `false` otherwise. + */ + fun isTimerActive(key: Any): Boolean + + /** + * Start a periodic timer that will send [msg] to the `self` actor at a fixed [interval]. + * + * @param key The key of the timer. + * @param msg The message to send to the actor. + * @param interval The interval of simulation time after which it should be sent. + */ + fun startPeriodicTimer(key: Any, msg: T, interval: Duration) + + /** + * Start a timer that will send [msg] once to the `self` actor after the given [delay]. + * + * @param key The key of the timer. + * @param msg The message to send to the actor. + * @param delay The delay in simulation time after which it should be sent. + */ + fun startSingleTimer(key: Any, msg: T, delay: Duration) + + /** + * Run [block] periodically at a fixed [interval] + * + * @param key The key of the timer. + * @param interval The delay of simulation time after which the block should run. + * @param block The block to run. + */ + fun every(key: Any, interval: Duration, block: () -> Unit) + + /** + * Run [block] after the specified [delay]. + * + * @param key The key of the timer. + * @param delay The delay in simulation time after which the block should run. + * @param block The block to run. + */ + fun after(key: Any, delay: Duration, block: () -> Unit) +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/TimerSchedulerImpl.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/TimerSchedulerImpl.kt new file mode 100644 index 00000000..22bec507 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/TimerSchedulerImpl.kt @@ -0,0 +1,122 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.TimerScheduler + +/** + * Implementation of [TimerScheduler] that uses the actor's [ActorContext] to provide timer functionality. + * + * @property ctx The actor context to use. + */ +internal class TimerSchedulerImpl<T : Any>(private val ctx: ActorContext<T>) : TimerScheduler<T> { + private val timers = mutableMapOf<Any, Timer<T>>() + + override fun cancel(key: Any) { + val timer = timers[key] ?: return + ctx.log.debug("Cancel timer [{}] with generation [{}]", timer.key, timer.generation) + timers -= timer.key + } + + override fun cancelAll() { + ctx.log.debug("Cancel all timers") + timers.clear() + } + + override fun isTimerActive(key: Any): Boolean = timers.containsKey(key) + + override fun startPeriodicTimer(key: Any, msg: T, interval: Duration) { + startTimer(key, msg, interval, true) + } + + override fun startSingleTimer(key: Any, msg: T, delay: Duration) { + startTimer(key, msg, delay, false) + } + + override fun every(key: Any, interval: Duration, block: () -> Unit) { + @Suppress("UNCHECKED_CAST") + startTimer(key, Block(block) as T, interval, true) + } + + override fun after(key: Any, delay: Duration, block: () -> Unit) { + @Suppress("UNCHECKED_CAST") + startTimer(key, Block(block) as T, delay, false) + } + + private fun startTimer(key: Any, msg: T, duration: Duration, repeat: Boolean) { + val timer = timers.getOrPut(key) { Timer(key) } + timer.duration = duration + timer.generation += 1 + timer.msg = msg + timer.repeat = repeat + ctx.sendSignal(ctx.self, TimerSignal(key, timer.generation), duration) + ctx.log.debug("Start timer [{}] with generation [{}]", key, timer.generation) + } + + fun interceptTimerSignal(signal: TimerSignal): T? { + val timer = timers[signal.key] + + if (timer == null) { + // Message was from canceled timer that was already enqueued + ctx.log.debug("Received timer [{}] that has been removed, discarding", signal.key) + return null + } else if (signal.generation != timer.generation) { + // Message was from an old timer that was enqueued before canceled + ctx.log.debug("Received timer [{}] from old generation [{}], expected generation [{}], discarding", + signal.key, signal.generation, timer.generation) + } + + if (!timer.repeat) { + timers -= timer.key + } else { + ctx.sendSignal(ctx.self, signal, timer.duration) + } + + val msg = timer.msg + + if (msg is Block) { + msg() + return null + } + + return msg + } + + data class Timer<T : Any>(val key: Any) { + var duration: Duration = 0.0 + var repeat: Boolean = false + var generation: Int = 0 + lateinit var msg: T + } + + data class TimerSignal(val key: Any, val generation: Int) : Signal + + data class Block(val block: () -> Unit) { + operator fun invoke() = block() + } +} |
