diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-11-20 17:51:58 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-11-20 21:49:20 +0100 |
| commit | 6ca0ae07669d20a5a34ef697610df90754024035 (patch) | |
| tree | 3c26a21970fa5693b18edb34e8203d711c381ba5 /odcsim/odcsim-engine-omega/src | |
| parent | 4cc3c6dea5c5536d47fcbaf8414d74de7b6fdc4b (diff) | |
refactor: Move build logic to buildSrc
Diffstat (limited to 'odcsim/odcsim-engine-omega/src')
5 files changed, 473 insertions, 0 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt new file mode 100644 index 00000000..dd92f90a --- /dev/null +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -0,0 +1,360 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.engine.omega + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorPath +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.ActorSystem +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.Envelope +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.PostStop +import com.atlarge.odcsim.PreStart +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.empty +import com.atlarge.odcsim.internal.BehaviorInterpreter +import com.atlarge.odcsim.internal.logging.LoggerImpl +import org.jetbrains.annotations.Async +import org.slf4j.Logger +import java.util.Collections +import java.util.PriorityQueue +import java.util.UUID +import java.util.WeakHashMap +import kotlin.math.max + +/** + * The reference implementation of the [ActorSystem] instance for the OpenDC simulation core. + * + * This engine implementation is a single-threaded implementation, running actors synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur. + * + * @param guardianBehavior The behavior of the guardian (root) actor. + * @param name The name of the engine instance. + */ +class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> { + /** + * The state of the actor system. + */ + private var state: ActorSystemState = ActorSystemState.CREATED + + /** + * The event queue to process + */ + private val queue: PriorityQueue<EnvelopeImpl> = PriorityQueue( + Comparator + .comparingDouble(EnvelopeImpl::time) + .thenComparingLong(EnvelopeImpl::id) + ) + + /** + * The registry of actors in the system. + */ + private val registry: MutableMap<ActorPath, Actor<*>> = HashMap() + + /** + * The root actor path of the system. + */ + private val root: ActorPath = ActorPath.Root() + + /** + * The system actor path. + */ + private val system: ActorPath = root / "system" + + /** + * The current point in simulation time. + */ + override var time: Instant = .0 + + /** + * The path to the root actor. + */ + override val path: ActorPath = root / "user" + + init { + registry[system] = Actor(ActorRefImpl(this, system), empty<Nothing>()) + registry[path] = Actor(this, guardianBehavior) + schedule(path, PreStart, .0) + } + + override fun run(until: Duration) { + require(until >= .0) { "The given instant must be a non-negative number" } + + // Start the system/guardian actor on initial run + if (state == ActorSystemState.CREATED) { + state = ActorSystemState.STARTED + registry[system]!!.isolate { it.start() } + registry[path]!!.isolate { it.start() } + } else if (state == ActorSystemState.TERMINATED) { + throw IllegalStateException("The ActorSystem has been terminated.") + } + + while (time < until) { + // Check whether the system was interrupted + if (Thread.interrupted()) { + throw InterruptedException() + } + + val envelope = queue.peek() ?: break + val delivery = envelope.time.takeUnless { it > until } ?: break + + // A message should never be delivered out of order in this single-threaded implementation. Assert for + // sanity + assert(delivery >= time) { "Message delivered out of order [expected=$delivery, actual=$time]" } + + time = delivery + queue.poll() + + processEnvelope(envelope) + } + + // Jump forward in time as the caller expects the system to have run until the specified instant + // Taking the maximum value prevents the caller to jump backwards in time + time = max(time, until) + } + + override fun send(msg: T, after: Duration) = schedule(path, msg, after) + + override fun terminate() { + registry[path]?.stop(null) + registry[system]?.stop(null) + } + + override suspend fun <U : Any> spawnSystem(behavior: Behavior<U>, name: String): ActorRef<U> { + return registry[system]!!.spawn(behavior, name) + } + + override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path) + + /** + * The identifier for the next message to be scheduled. + */ + private var nextId: Long = 0 + + /** + * Schedule a message to be processed by the engine. + * + * @param path The path to the destination of the message. + * @param message The message to schedule. + * @param delay The time to wait before processing the message. + */ + private fun schedule(@Async.Schedule path: ActorPath, message: Any, delay: Duration) { + require(delay >= .0) { "The given delay must be a non-negative number" } + scheduleEnvelope(EnvelopeImpl(nextId++, path, time + delay, message)) + } + + /** + * Schedule the specified envelope to be processed by the engine. + */ + private fun scheduleEnvelope(@Async.Schedule envelope: EnvelopeImpl) { + queue.add(envelope) + } + + /** + * Process the delivery of a message. + */ + private fun processEnvelope(@Async.Execute envelope: EnvelopeImpl) { + val actor = registry[envelope.destination] ?: return + + // Notice that messages for unknown/terminated actors are ignored for now + actor.isolate { it.interpretMessage(envelope.message) } + } + + /** + * An actor as represented in the Omega engine. + * + * @param self The [ActorRef] to this actor. + * @param initialBehavior The initial behavior of this actor. + */ + private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> { + val childActors: MutableMap<String, Actor<*>> = mutableMapOf() + val interpreter = BehaviorInterpreter(initialBehavior) + val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>()) + + override val time: Instant + get() = this@OmegaActorSystem.time + + override val children: List<ActorRef<*>> + get() = childActors.values.map { it.self } + + override val system: ActorSystem<*> + get() = this@OmegaActorSystem + + override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl(this) } + + override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self + + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref.path, msg, after) + + override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> { + require(name.isNotEmpty()) { "Actor name may not be empty" } + require(!name.startsWith("$")) { "Actor name may not start with $-sign" } + return internalSpawn(behavior, name) + } + + override fun <U : Any> spawnAnonymous(behavior: Behavior<U>): ActorRef<U> { + val name = "$" + UUID.randomUUID() + return internalSpawn(behavior, name) + } + + private fun <U : Any> internalSpawn(behavior: Behavior<U>, name: String): ActorRef<U> { + require(name !in childActors) { "Actor name $name not unique" } + val ref = ActorRefImpl<U>(this@OmegaActorSystem, self.path.child(name)) + val actor = Actor(ref, behavior) + registry[ref.path] = actor + childActors[name] = actor + schedule(ref.path, PreStart, .0) + actor.start() + return ref + } + + override fun stop(child: ActorRef<*>) { + when { + // Must be a direct child of this actor + child.path.parent == self.path -> { + val ref = childActors[child.path.name] ?: return + ref.stop(null) + } + self == child -> throw IllegalArgumentException( + "Only direct children of an actor may be stopped through the actor context, " + + "but you tried to stop [$self] by passing its ActorRef to the `stop` method. " + + "Stopping self has to be expressed as explicitly returning a Stop Behavior." + ) + else -> throw IllegalArgumentException( + "Only direct children of an actor may be stopped through the actor context, " + + "but [$child] is not a child of [$self]. Stopping other actors has to be expressed as " + + "an explicit stop message that the actor accepts." + ) + } + } + + override fun watch(target: ActorRef<*>) { + registry[target.path]?.watchers?.add(path) + } + + override fun unwatch(target: ActorRef<*>) { + registry[target.path]?.watchers?.remove(path) + } + + // Synchronization of actors in a single-threaded simulation is trivial: all actors are consistent in virtual + // time. + override fun sync(target: ActorRef<*>) {} + + override fun unsync(target: ActorRef<*>) {} + + override fun isSync(target: ActorRef<*>): Boolean = true + + /** + * Start this actor. + */ + fun start() { + interpreter.start(this) + } + + /** + * Stop this actor. + */ + fun stop(failure: Throwable?) { + interpreter.stop(this) + childActors.values.forEach { it.stop(failure) } + registry.remove(self.path) + interpreter.interpretSignal(this, PostStop) + val termination = Terminated(self, failure) + watchers.forEach { schedule(it, termination, 0.0) } + } + + /** + * Interpret the given message send to an actor. + */ + fun interpretMessage(msg: Any) { + if (msg is Signal) { + interpreter.interpretSignal(this, msg) + } else { + @Suppress("UNCHECKED_CAST") + interpreter.interpretMessage(this, msg as T) + } + + if (!interpreter.isAlive) { + stop(null) + } + } + + override fun equals(other: Any?): Boolean = + other is OmegaActorSystem<*>.Actor<*> && self.path == other.self.path + + override fun hashCode(): Int = self.path.hashCode() + } + + /** + * Isolate uncaught exceptions originating from actor interpreter invocations. + */ + private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? { + return try { + block(this) + } catch (t: Throwable) { + // Forcefully stop the actor if it crashed + stop(t) + log.error("Unhandled exception in actor $path", t) + null + } + } + + /** + * Enumeration to track the state of the actor system. + */ + private enum class ActorSystemState { + CREATED, STARTED, TERMINATED + } + + /** + * Internal [ActorRef] implementation for this actor system. + */ + private data class ActorRefImpl<T : Any>( + private val owner: OmegaActorSystem<*>, + override val path: ActorPath + ) : ActorRef<T> { + override fun toString(): String = "Actor[$path]" + + override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path) + } + + /** + * A wrapper around a message that has been scheduled for processing. + * + * @property id The identifier of the message to keep the priority queue stable. + * @property destination The destination of the message. + * @property time The point in time to deliver the message. + * @property message The message to wrap. + */ + private class EnvelopeImpl( + val id: Long, + val destination: ActorPath, + override val time: Instant, + override val message: Any + ) : Envelope<Any> +} diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt new file mode 100644 index 00000000..84bf1efb --- /dev/null +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.engine.omega + +import com.atlarge.odcsim.ActorSystem +import com.atlarge.odcsim.ActorSystemFactory +import com.atlarge.odcsim.Behavior +import java.util.ServiceLoader + +/** + * An [ActorSystemFactory] for the Omega engine, used by the [ServiceLoader] API to create [OmegaActorSystem] instances. + */ +class OmegaActorSystemFactory : ActorSystemFactory { + override operator fun <T : Any> invoke(root: Behavior<T>, name: String): ActorSystem<T> = + OmegaActorSystem(root, name) +} diff --git a/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory b/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory new file mode 100644 index 00000000..d0ca8859 --- /dev/null +++ b/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory @@ -0,0 +1 @@ +com.atlarge.odcsim.engine.omega.OmegaActorSystemFactory diff --git a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt new file mode 100644 index 00000000..4e195e6e --- /dev/null +++ b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.engine.omega + +import com.atlarge.odcsim.ActorSystemFactory +import com.atlarge.odcsim.engine.tests.ActorSystemFactoryContract +import org.junit.jupiter.api.DisplayName + +/** + * The [ActorSystemFactory] test suite for the Omega engine implementation. + */ +@DisplayName("OmegaActorSystemFactory") +class OmegaActorSystemFactoryTest : ActorSystemFactoryContract() { + override fun createFactory(): ActorSystemFactory = OmegaActorSystemFactory() +} diff --git a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt new file mode 100644 index 00000000..dc310d47 --- /dev/null +++ b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.engine.omega + +import com.atlarge.odcsim.ActorSystem +import com.atlarge.odcsim.engine.tests.ActorSystemContract +import org.junit.jupiter.api.DisplayName + +/** + * The [ActorSystem] test suite for the Omega engine implementation. + */ +@DisplayName("OmegaActorSystem") +class OmegaActorSystemTest : ActorSystemContract() { + override val factory = OmegaActorSystemFactory() +} |
