summaryrefslogtreecommitdiff
path: root/odcsim/odcsim-engine-omega/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:02:22 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:13:37 +0100
commit4d13f702c87bc195d8edbd19c5cd6567ecfd2af4 (patch)
tree3262880c18204adce76e3d4a8091d1b24b2b4c9a /odcsim/odcsim-engine-omega/src
parentce952cf3f27c154e06cfa56ca1ad7db9ba3eac7c (diff)
refactor: Introduce revised API design for 2.x
This change introduces the revised API design for version 2.0 of the OpenDC simulator. This version drops built-in support for Java and instead opts to build on Kotlin coroutines to simplify the API surface. During development of and experimentation with the previous API for version 2.x, we found that the design based on Akka Typed was too limiting and caused too much boilerplate for the models we needed to implement. Essential patterns such as request-response were found to be hard to implement with only a single mailbox. Moveover, limiting each actor's mailbox to a single type hindered composition and often resulted in unchecked casts or the type being changed to `Any`, eliminating the type-safety of the API. In this revised API design, a simulation is now represented as the interplay of logical processes that communicate via multiple message passing channels. We use Kotlin coroutines to describe the behavior of the processes. The API has been design from the start to take into account distributed/parallel simulations by disallowing messages from arbitrary processes, which was possible in the previous design. Instead, the 'communication graph' is known during runtime as procsses must register themselves before being able to send/receive messages to/from channels. We are still figuring out process/channel identity and supervision. Currently, all logical processes run on a single level, instead of being hierachical. However, this might change in the future.
Diffstat (limited to 'odcsim/odcsim-engine-omega/src')
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt360
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt38
-rw-r--r--odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory1
-rw-r--r--odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt37
-rw-r--r--odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt37
5 files changed, 0 insertions, 473 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
deleted file mode 100644
index 0cb0ed8d..00000000
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorContext
-import com.atlarge.odcsim.ActorPath
-import com.atlarge.odcsim.ActorRef
-import com.atlarge.odcsim.ActorSystem
-import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.Duration
-import com.atlarge.odcsim.Envelope
-import com.atlarge.odcsim.Instant
-import com.atlarge.odcsim.PostStop
-import com.atlarge.odcsim.PreStart
-import com.atlarge.odcsim.Signal
-import com.atlarge.odcsim.Terminated
-import com.atlarge.odcsim.empty
-import com.atlarge.odcsim.internal.BehaviorInterpreter
-import com.atlarge.odcsim.internal.logging.LoggerImpl
-import java.util.Collections
-import java.util.PriorityQueue
-import java.util.UUID
-import java.util.WeakHashMap
-import kotlin.math.max
-import org.jetbrains.annotations.Async
-import org.slf4j.Logger
-
-/**
- * The reference implementation of the [ActorSystem] instance for the OpenDC simulation core.
- *
- * This engine implementation is a single-threaded implementation, running actors synchronously and
- * provides a single priority queue for all events (messages, ticks, etc) that occur.
- *
- * @param guardianBehavior The behavior of the guardian (root) actor.
- * @param name The name of the engine instance.
- */
-class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> {
- /**
- * The state of the actor system.
- */
- private var state: ActorSystemState = ActorSystemState.CREATED
-
- /**
- * The event queue to process
- */
- private val queue: PriorityQueue<EnvelopeImpl> = PriorityQueue(
- Comparator
- .comparingDouble(EnvelopeImpl::time)
- .thenComparingLong(EnvelopeImpl::id)
- )
-
- /**
- * The registry of actors in the system.
- */
- private val registry: MutableMap<ActorPath, Actor<*>> = HashMap()
-
- /**
- * The root actor path of the system.
- */
- private val root: ActorPath = ActorPath.Root()
-
- /**
- * The system actor path.
- */
- private val system: ActorPath = root / "system"
-
- /**
- * The current point in simulation time.
- */
- override var time: Instant = .0
-
- /**
- * The path to the root actor.
- */
- override val path: ActorPath = root / "user"
-
- init {
- registry[system] = Actor(ActorRefImpl(this, system), empty<Nothing>())
- registry[path] = Actor(this, guardianBehavior)
- schedule(path, PreStart, .0)
- }
-
- override fun run(until: Duration) {
- require(until >= .0) { "The given instant must be a non-negative number" }
-
- // Start the system/guardian actor on initial run
- if (state == ActorSystemState.CREATED) {
- state = ActorSystemState.STARTED
- registry[system]!!.isolate { it.start() }
- registry[path]!!.isolate { it.start() }
- } else if (state == ActorSystemState.TERMINATED) {
- throw IllegalStateException("The ActorSystem has been terminated.")
- }
-
- while (time < until) {
- // Check whether the system was interrupted
- if (Thread.interrupted()) {
- throw InterruptedException()
- }
-
- val envelope = queue.peek() ?: break
- val delivery = envelope.time.takeUnless { it > until } ?: break
-
- // A message should never be delivered out of order in this single-threaded implementation. Assert for
- // sanity
- assert(delivery >= time) { "Message delivered out of order [expected=$delivery, actual=$time]" }
-
- time = delivery
- queue.poll()
-
- processEnvelope(envelope)
- }
-
- // Jump forward in time as the caller expects the system to have run until the specified instant
- // Taking the maximum value prevents the caller to jump backwards in time
- time = max(time, until)
- }
-
- override fun send(msg: T, after: Duration) = schedule(path, msg, after)
-
- override fun terminate() {
- registry[path]?.stop(null)
- registry[system]?.stop(null)
- }
-
- override suspend fun <U : Any> spawnSystem(behavior: Behavior<U>, name: String): ActorRef<U> {
- return registry[system]!!.spawn(behavior, name)
- }
-
- override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path)
-
- /**
- * The identifier for the next message to be scheduled.
- */
- private var nextId: Long = 0
-
- /**
- * Schedule a message to be processed by the engine.
- *
- * @param path The path to the destination of the message.
- * @param message The message to schedule.
- * @param delay The time to wait before processing the message.
- */
- private fun schedule(@Async.Schedule path: ActorPath, message: Any, delay: Duration) {
- require(delay >= .0) { "The given delay must be a non-negative number" }
- scheduleEnvelope(EnvelopeImpl(nextId++, path, time + delay, message))
- }
-
- /**
- * Schedule the specified envelope to be processed by the engine.
- */
- private fun scheduleEnvelope(@Async.Schedule envelope: EnvelopeImpl) {
- queue.add(envelope)
- }
-
- /**
- * Process the delivery of a message.
- */
- private fun processEnvelope(@Async.Execute envelope: EnvelopeImpl) {
- val actor = registry[envelope.destination] ?: return
-
- // Notice that messages for unknown/terminated actors are ignored for now
- actor.isolate { it.interpretMessage(envelope.message) }
- }
-
- /**
- * An actor as represented in the Omega engine.
- *
- * @param self The [ActorRef] to this actor.
- * @param initialBehavior The initial behavior of this actor.
- */
- private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> {
- val childActors: MutableMap<String, Actor<*>> = mutableMapOf()
- val interpreter = BehaviorInterpreter(initialBehavior)
- val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>())
-
- override val time: Instant
- get() = this@OmegaActorSystem.time
-
- override val children: List<ActorRef<*>>
- get() = childActors.values.map { it.self }
-
- override val system: ActorSystem<*>
- get() = this@OmegaActorSystem
-
- override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl(this) }
-
- override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self
-
- override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref.path, msg, after)
-
- override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> {
- require(name.isNotEmpty()) { "Actor name may not be empty" }
- require(!name.startsWith("$")) { "Actor name may not start with $-sign" }
- return internalSpawn(behavior, name)
- }
-
- override fun <U : Any> spawnAnonymous(behavior: Behavior<U>): ActorRef<U> {
- val name = "$" + UUID.randomUUID()
- return internalSpawn(behavior, name)
- }
-
- private fun <U : Any> internalSpawn(behavior: Behavior<U>, name: String): ActorRef<U> {
- require(name !in childActors) { "Actor name $name not unique" }
- val ref = ActorRefImpl<U>(this@OmegaActorSystem, self.path.child(name))
- val actor = Actor(ref, behavior)
- registry[ref.path] = actor
- childActors[name] = actor
- schedule(ref.path, PreStart, .0)
- actor.start()
- return ref
- }
-
- override fun stop(child: ActorRef<*>) {
- when {
- // Must be a direct child of this actor
- child.path.parent == self.path -> {
- val ref = childActors[child.path.name] ?: return
- ref.stop(null)
- }
- self == child -> throw IllegalArgumentException(
- "Only direct children of an actor may be stopped through the actor context, " +
- "but you tried to stop [$self] by passing its ActorRef to the `stop` method. " +
- "Stopping self has to be expressed as explicitly returning a Stop Behavior."
- )
- else -> throw IllegalArgumentException(
- "Only direct children of an actor may be stopped through the actor context, " +
- "but [$child] is not a child of [$self]. Stopping other actors has to be expressed as " +
- "an explicit stop message that the actor accepts."
- )
- }
- }
-
- override fun watch(target: ActorRef<*>) {
- registry[target.path]?.watchers?.add(path)
- }
-
- override fun unwatch(target: ActorRef<*>) {
- registry[target.path]?.watchers?.remove(path)
- }
-
- // Synchronization of actors in a single-threaded simulation is trivial: all actors are consistent in virtual
- // time.
- override fun sync(target: ActorRef<*>) {}
-
- override fun unsync(target: ActorRef<*>) {}
-
- override fun isSync(target: ActorRef<*>): Boolean = true
-
- /**
- * Start this actor.
- */
- fun start() {
- interpreter.start(this)
- }
-
- /**
- * Stop this actor.
- */
- fun stop(failure: Throwable?) {
- interpreter.stop(this)
- childActors.values.forEach { it.stop(failure) }
- registry.remove(self.path)
- interpreter.interpretSignal(this, PostStop)
- val termination = Terminated(self, failure)
- watchers.forEach { schedule(it, termination, 0.0) }
- }
-
- /**
- * Interpret the given message send to an actor.
- */
- fun interpretMessage(msg: Any) {
- if (msg is Signal) {
- interpreter.interpretSignal(this, msg)
- } else {
- @Suppress("UNCHECKED_CAST")
- interpreter.interpretMessage(this, msg as T)
- }
-
- if (!interpreter.isAlive) {
- stop(null)
- }
- }
-
- override fun equals(other: Any?): Boolean =
- other is OmegaActorSystem<*>.Actor<*> && self.path == other.self.path
-
- override fun hashCode(): Int = self.path.hashCode()
- }
-
- /**
- * Isolate uncaught exceptions originating from actor interpreter invocations.
- */
- private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? {
- return try {
- block(this)
- } catch (t: Throwable) {
- // Forcefully stop the actor if it crashed
- stop(t)
- log.error("Unhandled exception in actor $path", t)
- null
- }
- }
-
- /**
- * Enumeration to track the state of the actor system.
- */
- private enum class ActorSystemState {
- CREATED, STARTED, TERMINATED
- }
-
- /**
- * Internal [ActorRef] implementation for this actor system.
- */
- private data class ActorRefImpl<T : Any>(
- private val owner: OmegaActorSystem<*>,
- override val path: ActorPath
- ) : ActorRef<T> {
- override fun toString(): String = "Actor[$path]"
-
- override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path)
- }
-
- /**
- * A wrapper around a message that has been scheduled for processing.
- *
- * @property id The identifier of the message to keep the priority queue stable.
- * @property destination The destination of the message.
- * @property time The point in time to deliver the message.
- * @property message The message to wrap.
- */
- private class EnvelopeImpl(
- val id: Long,
- val destination: ActorPath,
- override val time: Instant,
- override val message: Any
- ) : Envelope<Any>
-}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt
deleted file mode 100644
index 84bf1efb..00000000
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorSystem
-import com.atlarge.odcsim.ActorSystemFactory
-import com.atlarge.odcsim.Behavior
-import java.util.ServiceLoader
-
-/**
- * An [ActorSystemFactory] for the Omega engine, used by the [ServiceLoader] API to create [OmegaActorSystem] instances.
- */
-class OmegaActorSystemFactory : ActorSystemFactory {
- override operator fun <T : Any> invoke(root: Behavior<T>, name: String): ActorSystem<T> =
- OmegaActorSystem(root, name)
-}
diff --git a/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory b/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory
deleted file mode 100644
index d0ca8859..00000000
--- a/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory
+++ /dev/null
@@ -1 +0,0 @@
-com.atlarge.odcsim.engine.omega.OmegaActorSystemFactory
diff --git a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt
deleted file mode 100644
index 4e195e6e..00000000
--- a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorSystemFactory
-import com.atlarge.odcsim.engine.tests.ActorSystemFactoryContract
-import org.junit.jupiter.api.DisplayName
-
-/**
- * The [ActorSystemFactory] test suite for the Omega engine implementation.
- */
-@DisplayName("OmegaActorSystemFactory")
-class OmegaActorSystemFactoryTest : ActorSystemFactoryContract() {
- override fun createFactory(): ActorSystemFactory = OmegaActorSystemFactory()
-}
diff --git a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt
deleted file mode 100644
index dc310d47..00000000
--- a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorSystem
-import com.atlarge.odcsim.engine.tests.ActorSystemContract
-import org.junit.jupiter.api.DisplayName
-
-/**
- * The [ActorSystem] test suite for the Omega engine implementation.
- */
-@DisplayName("OmegaActorSystem")
-class OmegaActorSystemTest : ActorSystemContract() {
- override val factory = OmegaActorSystemFactory()
-}