summaryrefslogtreecommitdiff
path: root/odcsim/odcsim-engine-omega/src
diff options
context:
space:
mode:
Diffstat (limited to 'odcsim/odcsim-engine-omega/src')
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt360
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt38
-rw-r--r--odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory1
-rw-r--r--odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt37
-rw-r--r--odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt37
5 files changed, 0 insertions, 473 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
deleted file mode 100644
index 0cb0ed8d..00000000
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorContext
-import com.atlarge.odcsim.ActorPath
-import com.atlarge.odcsim.ActorRef
-import com.atlarge.odcsim.ActorSystem
-import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.Duration
-import com.atlarge.odcsim.Envelope
-import com.atlarge.odcsim.Instant
-import com.atlarge.odcsim.PostStop
-import com.atlarge.odcsim.PreStart
-import com.atlarge.odcsim.Signal
-import com.atlarge.odcsim.Terminated
-import com.atlarge.odcsim.empty
-import com.atlarge.odcsim.internal.BehaviorInterpreter
-import com.atlarge.odcsim.internal.logging.LoggerImpl
-import java.util.Collections
-import java.util.PriorityQueue
-import java.util.UUID
-import java.util.WeakHashMap
-import kotlin.math.max
-import org.jetbrains.annotations.Async
-import org.slf4j.Logger
-
-/**
- * The reference implementation of the [ActorSystem] instance for the OpenDC simulation core.
- *
- * This engine implementation is a single-threaded implementation, running actors synchronously and
- * provides a single priority queue for all events (messages, ticks, etc) that occur.
- *
- * @param guardianBehavior The behavior of the guardian (root) actor.
- * @param name The name of the engine instance.
- */
-class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> {
- /**
- * The state of the actor system.
- */
- private var state: ActorSystemState = ActorSystemState.CREATED
-
- /**
- * The event queue to process
- */
- private val queue: PriorityQueue<EnvelopeImpl> = PriorityQueue(
- Comparator
- .comparingDouble(EnvelopeImpl::time)
- .thenComparingLong(EnvelopeImpl::id)
- )
-
- /**
- * The registry of actors in the system.
- */
- private val registry: MutableMap<ActorPath, Actor<*>> = HashMap()
-
- /**
- * The root actor path of the system.
- */
- private val root: ActorPath = ActorPath.Root()
-
- /**
- * The system actor path.
- */
- private val system: ActorPath = root / "system"
-
- /**
- * The current point in simulation time.
- */
- override var time: Instant = .0
-
- /**
- * The path to the root actor.
- */
- override val path: ActorPath = root / "user"
-
- init {
- registry[system] = Actor(ActorRefImpl(this, system), empty<Nothing>())
- registry[path] = Actor(this, guardianBehavior)
- schedule(path, PreStart, .0)
- }
-
- override fun run(until: Duration) {
- require(until >= .0) { "The given instant must be a non-negative number" }
-
- // Start the system/guardian actor on initial run
- if (state == ActorSystemState.CREATED) {
- state = ActorSystemState.STARTED
- registry[system]!!.isolate { it.start() }
- registry[path]!!.isolate { it.start() }
- } else if (state == ActorSystemState.TERMINATED) {
- throw IllegalStateException("The ActorSystem has been terminated.")
- }
-
- while (time < until) {
- // Check whether the system was interrupted
- if (Thread.interrupted()) {
- throw InterruptedException()
- }
-
- val envelope = queue.peek() ?: break
- val delivery = envelope.time.takeUnless { it > until } ?: break
-
- // A message should never be delivered out of order in this single-threaded implementation. Assert for
- // sanity
- assert(delivery >= time) { "Message delivered out of order [expected=$delivery, actual=$time]" }
-
- time = delivery
- queue.poll()
-
- processEnvelope(envelope)
- }
-
- // Jump forward in time as the caller expects the system to have run until the specified instant
- // Taking the maximum value prevents the caller to jump backwards in time
- time = max(time, until)
- }
-
- override fun send(msg: T, after: Duration) = schedule(path, msg, after)
-
- override fun terminate() {
- registry[path]?.stop(null)
- registry[system]?.stop(null)
- }
-
- override suspend fun <U : Any> spawnSystem(behavior: Behavior<U>, name: String): ActorRef<U> {
- return registry[system]!!.spawn(behavior, name)
- }
-
- override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path)
-
- /**
- * The identifier for the next message to be scheduled.
- */
- private var nextId: Long = 0
-
- /**
- * Schedule a message to be processed by the engine.
- *
- * @param path The path to the destination of the message.
- * @param message The message to schedule.
- * @param delay The time to wait before processing the message.
- */
- private fun schedule(@Async.Schedule path: ActorPath, message: Any, delay: Duration) {
- require(delay >= .0) { "The given delay must be a non-negative number" }
- scheduleEnvelope(EnvelopeImpl(nextId++, path, time + delay, message))
- }
-
- /**
- * Schedule the specified envelope to be processed by the engine.
- */
- private fun scheduleEnvelope(@Async.Schedule envelope: EnvelopeImpl) {
- queue.add(envelope)
- }
-
- /**
- * Process the delivery of a message.
- */
- private fun processEnvelope(@Async.Execute envelope: EnvelopeImpl) {
- val actor = registry[envelope.destination] ?: return
-
- // Notice that messages for unknown/terminated actors are ignored for now
- actor.isolate { it.interpretMessage(envelope.message) }
- }
-
- /**
- * An actor as represented in the Omega engine.
- *
- * @param self The [ActorRef] to this actor.
- * @param initialBehavior The initial behavior of this actor.
- */
- private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> {
- val childActors: MutableMap<String, Actor<*>> = mutableMapOf()
- val interpreter = BehaviorInterpreter(initialBehavior)
- val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>())
-
- override val time: Instant
- get() = this@OmegaActorSystem.time
-
- override val children: List<ActorRef<*>>
- get() = childActors.values.map { it.self }
-
- override val system: ActorSystem<*>
- get() = this@OmegaActorSystem
-
- override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl(this) }
-
- override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self
-
- override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref.path, msg, after)
-
- override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> {
- require(name.isNotEmpty()) { "Actor name may not be empty" }
- require(!name.startsWith("$")) { "Actor name may not start with $-sign" }
- return internalSpawn(behavior, name)
- }
-
- override fun <U : Any> spawnAnonymous(behavior: Behavior<U>): ActorRef<U> {
- val name = "$" + UUID.randomUUID()
- return internalSpawn(behavior, name)
- }
-
- private fun <U : Any> internalSpawn(behavior: Behavior<U>, name: String): ActorRef<U> {
- require(name !in childActors) { "Actor name $name not unique" }
- val ref = ActorRefImpl<U>(this@OmegaActorSystem, self.path.child(name))
- val actor = Actor(ref, behavior)
- registry[ref.path] = actor
- childActors[name] = actor
- schedule(ref.path, PreStart, .0)
- actor.start()
- return ref
- }
-
- override fun stop(child: ActorRef<*>) {
- when {
- // Must be a direct child of this actor
- child.path.parent == self.path -> {
- val ref = childActors[child.path.name] ?: return
- ref.stop(null)
- }
- self == child -> throw IllegalArgumentException(
- "Only direct children of an actor may be stopped through the actor context, " +
- "but you tried to stop [$self] by passing its ActorRef to the `stop` method. " +
- "Stopping self has to be expressed as explicitly returning a Stop Behavior."
- )
- else -> throw IllegalArgumentException(
- "Only direct children of an actor may be stopped through the actor context, " +
- "but [$child] is not a child of [$self]. Stopping other actors has to be expressed as " +
- "an explicit stop message that the actor accepts."
- )
- }
- }
-
- override fun watch(target: ActorRef<*>) {
- registry[target.path]?.watchers?.add(path)
- }
-
- override fun unwatch(target: ActorRef<*>) {
- registry[target.path]?.watchers?.remove(path)
- }
-
- // Synchronization of actors in a single-threaded simulation is trivial: all actors are consistent in virtual
- // time.
- override fun sync(target: ActorRef<*>) {}
-
- override fun unsync(target: ActorRef<*>) {}
-
- override fun isSync(target: ActorRef<*>): Boolean = true
-
- /**
- * Start this actor.
- */
- fun start() {
- interpreter.start(this)
- }
-
- /**
- * Stop this actor.
- */
- fun stop(failure: Throwable?) {
- interpreter.stop(this)
- childActors.values.forEach { it.stop(failure) }
- registry.remove(self.path)
- interpreter.interpretSignal(this, PostStop)
- val termination = Terminated(self, failure)
- watchers.forEach { schedule(it, termination, 0.0) }
- }
-
- /**
- * Interpret the given message send to an actor.
- */
- fun interpretMessage(msg: Any) {
- if (msg is Signal) {
- interpreter.interpretSignal(this, msg)
- } else {
- @Suppress("UNCHECKED_CAST")
- interpreter.interpretMessage(this, msg as T)
- }
-
- if (!interpreter.isAlive) {
- stop(null)
- }
- }
-
- override fun equals(other: Any?): Boolean =
- other is OmegaActorSystem<*>.Actor<*> && self.path == other.self.path
-
- override fun hashCode(): Int = self.path.hashCode()
- }
-
- /**
- * Isolate uncaught exceptions originating from actor interpreter invocations.
- */
- private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? {
- return try {
- block(this)
- } catch (t: Throwable) {
- // Forcefully stop the actor if it crashed
- stop(t)
- log.error("Unhandled exception in actor $path", t)
- null
- }
- }
-
- /**
- * Enumeration to track the state of the actor system.
- */
- private enum class ActorSystemState {
- CREATED, STARTED, TERMINATED
- }
-
- /**
- * Internal [ActorRef] implementation for this actor system.
- */
- private data class ActorRefImpl<T : Any>(
- private val owner: OmegaActorSystem<*>,
- override val path: ActorPath
- ) : ActorRef<T> {
- override fun toString(): String = "Actor[$path]"
-
- override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path)
- }
-
- /**
- * A wrapper around a message that has been scheduled for processing.
- *
- * @property id The identifier of the message to keep the priority queue stable.
- * @property destination The destination of the message.
- * @property time The point in time to deliver the message.
- * @property message The message to wrap.
- */
- private class EnvelopeImpl(
- val id: Long,
- val destination: ActorPath,
- override val time: Instant,
- override val message: Any
- ) : Envelope<Any>
-}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt
deleted file mode 100644
index 84bf1efb..00000000
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactory.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorSystem
-import com.atlarge.odcsim.ActorSystemFactory
-import com.atlarge.odcsim.Behavior
-import java.util.ServiceLoader
-
-/**
- * An [ActorSystemFactory] for the Omega engine, used by the [ServiceLoader] API to create [OmegaActorSystem] instances.
- */
-class OmegaActorSystemFactory : ActorSystemFactory {
- override operator fun <T : Any> invoke(root: Behavior<T>, name: String): ActorSystem<T> =
- OmegaActorSystem(root, name)
-}
diff --git a/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory b/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory
deleted file mode 100644
index d0ca8859..00000000
--- a/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.ActorSystemFactory
+++ /dev/null
@@ -1 +0,0 @@
-com.atlarge.odcsim.engine.omega.OmegaActorSystemFactory
diff --git a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt
deleted file mode 100644
index 4e195e6e..00000000
--- a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemFactoryTest.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorSystemFactory
-import com.atlarge.odcsim.engine.tests.ActorSystemFactoryContract
-import org.junit.jupiter.api.DisplayName
-
-/**
- * The [ActorSystemFactory] test suite for the Omega engine implementation.
- */
-@DisplayName("OmegaActorSystemFactory")
-class OmegaActorSystemFactoryTest : ActorSystemFactoryContract() {
- override fun createFactory(): ActorSystemFactory = OmegaActorSystemFactory()
-}
diff --git a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt b/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt
deleted file mode 100644
index dc310d47..00000000
--- a/odcsim/odcsim-engine-omega/src/test/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystemTest.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim.engine.omega
-
-import com.atlarge.odcsim.ActorSystem
-import com.atlarge.odcsim.engine.tests.ActorSystemContract
-import org.junit.jupiter.api.DisplayName
-
-/**
- * The [ActorSystem] test suite for the Omega engine implementation.
- */
-@DisplayName("OmegaActorSystem")
-class OmegaActorSystemTest : ActorSystemContract() {
- override val factory = OmegaActorSystemFactory()
-}