summaryrefslogtreecommitdiff
path: root/odcsim-core
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-04 21:28:09 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-14 12:55:55 +0200
commit9ebe07bdbcdd7ca8f8b3c1f76a7f26b560cf2625 (patch)
tree49aa5d7d59444b864458e3414a396b801edb6b98 /odcsim-core
parent54da96844b9581224a353c1012ec8c2c85810e44 (diff)
feat: Add support for timers
This change adds support for timers in Behavior, in order to simplify sending messages to the `self` actor.
Diffstat (limited to 'odcsim-core')
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt20
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/TimerScheduler.kt92
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/TimerSchedulerImpl.kt122
3 files changed, 234 insertions, 0 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
index f2736437..ea68f34b 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
@@ -27,6 +27,7 @@ package com.atlarge.odcsim
import com.atlarge.odcsim.internal.BehaviorInterpreter
import com.atlarge.odcsim.internal.EmptyBehavior
import com.atlarge.odcsim.internal.IgnoreBehavior
+import com.atlarge.odcsim.internal.TimerSchedulerImpl
/**
* This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors
@@ -117,6 +118,25 @@ fun <T : Any> wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Beha
}
/**
+ * Obtain a [TimerScheduler] for building a [Behavior] instance.
+ */
+fun <T : Any> withTimers(handler: (TimerScheduler<T>) -> Behavior<T>): Behavior<T> {
+ return setup { ctx ->
+ val scheduler = TimerSchedulerImpl(ctx)
+ receiveSignal<T> { _, signal ->
+ if (signal is TimerSchedulerImpl.TimerSignal) {
+ val res = scheduler.interceptTimerSignal(signal)
+ if (res != null) {
+ ctx.send(ctx.self, res)
+ return@receiveSignal same()
+ }
+ }
+ unhandled()
+ }.join(handler(scheduler))
+ }
+}
+
+/**
* Join together both [Behavior] with another [Behavior], essentially running them side-by-side, only directly
* propagating stopped behavior.
*
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/TimerScheduler.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/TimerScheduler.kt
new file mode 100644
index 00000000..c5c54b64
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/TimerScheduler.kt
@@ -0,0 +1,92 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim
+
+/**
+ * An interface to provide support for scheduled self messages in an actor. It is used with [withTimers].
+ * Timers are bound to the lifecycle of the actor that owns it, and thus are cancelled automatically when it is
+ * restarted or stopped.
+ *
+ * Please be aware that [TimerScheduler] is not thread-safe and must only be used within the actor that owns it.
+ *
+ * @param T The shape of the messages the owning actor of this scheduling accepts.
+ */
+interface TimerScheduler<T : Any> {
+ /**
+ * Cancel a timer with the given key.
+ *
+ * @param key The key of the timer.
+ */
+ fun cancel(key: Any)
+
+ /**
+ * Cancel all timers.
+ */
+ fun cancelAll()
+
+ /**
+ * Check if a timer with a given [key] is active.
+ *
+ * @param key The key to check if it is active.
+ * @return `true` if a timer with the specified key is active, `false` otherwise.
+ */
+ fun isTimerActive(key: Any): Boolean
+
+ /**
+ * Start a periodic timer that will send [msg] to the `self` actor at a fixed [interval].
+ *
+ * @param key The key of the timer.
+ * @param msg The message to send to the actor.
+ * @param interval The interval of simulation time after which it should be sent.
+ */
+ fun startPeriodicTimer(key: Any, msg: T, interval: Duration)
+
+ /**
+ * Start a timer that will send [msg] once to the `self` actor after the given [delay].
+ *
+ * @param key The key of the timer.
+ * @param msg The message to send to the actor.
+ * @param delay The delay in simulation time after which it should be sent.
+ */
+ fun startSingleTimer(key: Any, msg: T, delay: Duration)
+
+ /**
+ * Run [block] periodically at a fixed [interval]
+ *
+ * @param key The key of the timer.
+ * @param interval The delay of simulation time after which the block should run.
+ * @param block The block to run.
+ */
+ fun every(key: Any, interval: Duration, block: () -> Unit)
+
+ /**
+ * Run [block] after the specified [delay].
+ *
+ * @param key The key of the timer.
+ * @param delay The delay in simulation time after which the block should run.
+ * @param block The block to run.
+ */
+ fun after(key: Any, delay: Duration, block: () -> Unit)
+}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/TimerSchedulerImpl.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/TimerSchedulerImpl.kt
new file mode 100644
index 00000000..22bec507
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/TimerSchedulerImpl.kt
@@ -0,0 +1,122 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.internal
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.Duration
+import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.TimerScheduler
+
+/**
+ * Implementation of [TimerScheduler] that uses the actor's [ActorContext] to provide timer functionality.
+ *
+ * @property ctx The actor context to use.
+ */
+internal class TimerSchedulerImpl<T : Any>(private val ctx: ActorContext<T>) : TimerScheduler<T> {
+ private val timers = mutableMapOf<Any, Timer<T>>()
+
+ override fun cancel(key: Any) {
+ val timer = timers[key] ?: return
+ ctx.log.debug("Cancel timer [{}] with generation [{}]", timer.key, timer.generation)
+ timers -= timer.key
+ }
+
+ override fun cancelAll() {
+ ctx.log.debug("Cancel all timers")
+ timers.clear()
+ }
+
+ override fun isTimerActive(key: Any): Boolean = timers.containsKey(key)
+
+ override fun startPeriodicTimer(key: Any, msg: T, interval: Duration) {
+ startTimer(key, msg, interval, true)
+ }
+
+ override fun startSingleTimer(key: Any, msg: T, delay: Duration) {
+ startTimer(key, msg, delay, false)
+ }
+
+ override fun every(key: Any, interval: Duration, block: () -> Unit) {
+ @Suppress("UNCHECKED_CAST")
+ startTimer(key, Block(block) as T, interval, true)
+ }
+
+ override fun after(key: Any, delay: Duration, block: () -> Unit) {
+ @Suppress("UNCHECKED_CAST")
+ startTimer(key, Block(block) as T, delay, false)
+ }
+
+ private fun startTimer(key: Any, msg: T, duration: Duration, repeat: Boolean) {
+ val timer = timers.getOrPut(key) { Timer(key) }
+ timer.duration = duration
+ timer.generation += 1
+ timer.msg = msg
+ timer.repeat = repeat
+ ctx.sendSignal(ctx.self, TimerSignal(key, timer.generation), duration)
+ ctx.log.debug("Start timer [{}] with generation [{}]", key, timer.generation)
+ }
+
+ fun interceptTimerSignal(signal: TimerSignal): T? {
+ val timer = timers[signal.key]
+
+ if (timer == null) {
+ // Message was from canceled timer that was already enqueued
+ ctx.log.debug("Received timer [{}] that has been removed, discarding", signal.key)
+ return null
+ } else if (signal.generation != timer.generation) {
+ // Message was from an old timer that was enqueued before canceled
+ ctx.log.debug("Received timer [{}] from old generation [{}], expected generation [{}], discarding",
+ signal.key, signal.generation, timer.generation)
+ }
+
+ if (!timer.repeat) {
+ timers -= timer.key
+ } else {
+ ctx.sendSignal(ctx.self, signal, timer.duration)
+ }
+
+ val msg = timer.msg
+
+ if (msg is Block) {
+ msg()
+ return null
+ }
+
+ return msg
+ }
+
+ data class Timer<T : Any>(val key: Any) {
+ var duration: Duration = 0.0
+ var repeat: Boolean = false
+ var generation: Int = 0
+ lateinit var msg: T
+ }
+
+ data class TimerSignal(val key: Any, val generation: Int) : Signal
+
+ data class Block(val block: () -> Unit) {
+ operator fun invoke() = block()
+ }
+}