summaryrefslogtreecommitdiff
path: root/odcsim-engine-omega/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2018-10-28 12:50:27 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-06 18:19:23 +0200
commitdecb8fb5297c7772f5319a47c784d44bf8bdbe9c (patch)
tree4151b2ffe1b99a3bfe91a8fd1b7dfeade91a5ec0 /odcsim-engine-omega/src
parentd37a139b357ded9ba048c10ccad320a0d8412f0b (diff)
refactor: Introduce initial API design for 2.x
This change introduces the new API design that will be introduced in the 2.x versions of the OpenDC Simulator. This changes focuses on simplifying simulation primitives provided by the simulator and introduces a new concept of actors based on the model designed by the Akka Typed project. For now, the old simulation models have been removed from the branch, but will be ported back as this branch is being finalized.
Diffstat (limited to 'odcsim-engine-omega/src')
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt186
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt38
-rw-r--r--odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory1
-rw-r--r--odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/SmokeTest.kt51
4 files changed, 276 insertions, 0 deletions
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
new file mode 100644
index 00000000..ffc68d0b
--- /dev/null
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -0,0 +1,186 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.engine.omega
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.ActorPath
+import com.atlarge.odcsim.ActorRef
+import com.atlarge.odcsim.ActorSystem
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.Duration
+import com.atlarge.odcsim.Instant
+import com.atlarge.odcsim.PostStop
+import com.atlarge.odcsim.PreStart
+import com.atlarge.odcsim.Signal
+import mu.KotlinLogging
+import java.util.PriorityQueue
+import kotlin.math.max
+
+/**
+ * The reference implementation of the [ActorSystem] instance for the OpenDC simulation core.
+ *
+ * This engine implementation is a single-threaded implementation, running actors synchronously and
+ * provides a single priority queue for all events (messages, ticks, etc) that occur.
+ *
+ * @param root The behavior of the root actor.
+ * @param name The name of the engine instance.
+ */
+class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> {
+ /**
+ * The current point in simulation time.
+ */
+ override var time: Instant = .0
+
+ /**
+ * The path to the root actor.
+ */
+ override val path: ActorPath = ActorPath.Root()
+
+ /**
+ * The event queue to process
+ */
+ private val queue: PriorityQueue<Envelope> = PriorityQueue(Comparator
+ .comparingDouble(Envelope::time)
+ .thenComparingLong(Envelope::id))
+
+ /**
+ * The registry of actors in the system.
+ */
+ private val registry: MutableMap<ActorPath, Actor<*>> = HashMap()
+
+ private val logger = KotlinLogging.logger {}
+
+ override fun run(until: Duration) {
+ require(until >= .0) { "The given instant must be a positive number" }
+
+ while (true) {
+ val envelope = queue.peek() ?: break
+ val delivery = envelope.time.takeUnless { it > until } ?: break
+
+ if (delivery < time) {
+ // Message out of order
+ logger.warn { "Message delivered out of order [expected=$delivery, actual=$time]" }
+ }
+
+ time = delivery
+ queue.poll()
+
+ // Notice that messages for unknown/terminated actors are ignored for now
+ registry[envelope.destination]?.interpretMessage(envelope.message)
+ }
+
+ // Jump forward in time as the caller expects the system to have run until the specified instant
+ // Taking the maximum value prevents the caller to jump backwards in time
+ time = max(time, until)
+ }
+
+ override fun send(msg: T, after: Duration) = schedule(this, msg, after)
+
+ /**
+ * The identifier for the next message to be scheduled.
+ */
+ private var nextId: Long = 0
+
+ init {
+ registry[path] = Actor(this, root)
+ schedule(this, PreStart, .0)
+ }
+
+ private inner class Actor<T : Any>(override val self: ActorRef<T>, var behavior: Behavior<T>) : ActorContext<T> {
+ val children: MutableSet<Actor<*>> = mutableSetOf()
+
+ override val time: Instant
+ get() = this@OmegaActorSystem.time
+
+ override fun <U : Any> spawn(name: String, behavior: Behavior<U>): ActorRef<U> {
+ val ref = ActorRefImpl<U>(self.path.child(name))
+ if (ref.path !in registry) {
+ val actor = Actor(ref, behavior)
+ registry[ref.path] = actor
+ children += actor
+ schedule(ref, PreStart, .0)
+ }
+ return ref
+ }
+
+ override fun <U : Any> stop(child: ActorRef<U>): Boolean {
+ if (child.path.root != this@OmegaActorSystem.path) {
+ // This child is not part of the hierarchy.
+ return false
+ }
+ val ref = registry[child.path] ?: return false
+ ref.terminate()
+ return true
+ }
+
+ /**
+ * Terminate this actor and its children.
+ */
+ fun terminate() {
+ children.forEach { it.terminate() }
+ registry.remove(self.path)
+ interpretMessage(PostStop)
+ }
+
+ /**
+ * Interpret the given message send to an actor. Make sure the message is of the correct type.
+ */
+ fun interpretMessage(msg: Any) {
+ @Suppress("UNCHECKED_CAST")
+ behavior = if (msg is Signal) behavior.receiveSignal(this, msg) else behavior.receive(this, msg as T)
+ }
+
+ override fun equals(other: Any?): Boolean =
+ other is OmegaActorSystem<*>.Actor<*> && self.path == other.self.path
+
+ override fun hashCode(): Int = self.path.hashCode()
+ }
+
+ private inner class ActorRefImpl<T : Any>(override val path: ActorPath) : ActorRef<T> {
+ override fun send(msg: T, after: Duration) = schedule(this, msg, after)
+ }
+
+ /**
+ * A wrapper around a message that has been scheduled for processing.
+ *
+ * @property time The point in time to deliver the message.
+ * @property id The identifier of the message to keep the priority queue stable.
+ * @property destination The destination of the message.
+ * @property message The message to wrap.
+ */
+ private class Envelope(val time: Instant, val id: Long, val destination: ActorPath, val message: Any)
+
+ /**
+ * Schedule a message to be processed by the engine.
+ *
+ * @property destination The destination of the message.
+ * @param message The message to schedule.
+ * @param delay The time to wait before processing the message.
+ */
+ private fun schedule(destination: ActorRef<*>, message: Any, delay: Duration) {
+ require(delay >= .0) { "The given delay must be a non-negative number" }
+ queue.add(Envelope(time + delay, nextId++, destination.path, message))
+ }
+}
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt
new file mode 100644
index 00000000..84bf1efb
--- /dev/null
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt
@@ -0,0 +1,38 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.engine.omega
+
+import com.atlarge.odcsim.ActorSystem
+import com.atlarge.odcsim.ActorSystemFactory
+import com.atlarge.odcsim.Behavior
+import java.util.ServiceLoader
+
+/**
+ * An [ActorSystemFactory] for the Omega engine, used by the [ServiceLoader] API to create [OmegaActorSystem] instances.
+ */
+class OmegaActorSystemFactory : ActorSystemFactory {
+ override operator fun <T : Any> invoke(root: Behavior<T>, name: String): ActorSystem<T> =
+ OmegaActorSystem(root, name)
+}
diff --git a/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory b/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory
new file mode 100644
index 00000000..d0ca8859
--- /dev/null
+++ b/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory
@@ -0,0 +1 @@
+com.atlarge.odcsim.engine.omega.OmegaActorSystemFactory
diff --git a/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/SmokeTest.kt b/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/SmokeTest.kt
new file mode 100644
index 00000000..a543d9d1
--- /dev/null
+++ b/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/SmokeTest.kt
@@ -0,0 +1,51 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim
+
+import com.atlarge.odcsim.engine.omega.OmegaActorSystem
+import org.junit.jupiter.api.Test
+
+/**
+ * A test to verify the system runs without smoking.
+ */
+class SmokeTest {
+ @Test
+ fun `hello-world`() {
+ val system = OmegaActorSystem(object : Behavior<String> {
+ override fun receive(ctx: ActorContext<String>, msg: String): Behavior<String> {
+ println("${ctx.time} $msg")
+ return this
+ }
+
+ override fun receiveSignal(ctx: ActorContext<String>, signal: Signal): Behavior<String> {
+ println("${ctx.time} $signal")
+ return this
+ }
+ }, name = "test")
+
+ system.send("Hello World", after = 1.2)
+ system.run()
+ }
+}