summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:34:28 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:34:28 +0100
commitefb31902e6950f52ba8b50c2472f867e6d98313b (patch)
tree0ed4ec88bf6ab54d74a3ae4925cbb48854574ceb
parent4d13f702c87bc195d8edbd19c5cd6567ecfd2af4 (diff)
feat: Add prototype reference implementation of revised API
This change adds a prototype implementation of the revised version of the API of version 2.0 of the simulator.
-rw-r--r--odcsim/odcsim-engine-omega/build.gradle.kts47
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt332
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt39
-rw-r--r--odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.SimulationEngineProvider1
-rw-r--r--settings.gradle.kts1
5 files changed, 420 insertions, 0 deletions
diff --git a/odcsim/odcsim-engine-omega/build.gradle.kts b/odcsim/odcsim-engine-omega/build.gradle.kts
new file mode 100644
index 00000000..98e2469e
--- /dev/null
+++ b/odcsim/odcsim-engine-omega/build.gradle.kts
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Single-threaded reference implementation for the odcsim API"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-convention`
+}
+
+/* Project configuration */
+repositories {
+ jcenter()
+}
+
+dependencies {
+ api(project(":odcsim:odcsim-api"))
+
+ implementation(kotlin("stdlib"))
+ implementation("org.jetbrains:annotations:17.0.0")
+
+ testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
+ testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
+ testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
+ testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
+}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
new file mode 100644
index 00000000..b0df32d4
--- /dev/null
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -0,0 +1,332 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.engine.omega
+
+import com.atlarge.odcsim.SimulationEngine
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.Channel
+import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.ProcessRef
+import com.atlarge.odcsim.ReceivePort
+import com.atlarge.odcsim.ReceiveRef
+import com.atlarge.odcsim.SendPort
+import com.atlarge.odcsim.SendRef
+import kotlinx.coroutines.CancellableContinuation
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.Delay
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Runnable
+import kotlinx.coroutines.channels.Channel as KChannel
+import kotlinx.coroutines.isActive
+import org.jetbrains.annotations.Async
+import java.time.Clock
+import java.time.Instant
+import java.time.ZoneId
+import java.util.PriorityQueue
+import java.util.UUID
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.coroutineContext
+import kotlin.coroutines.startCoroutine
+
+/**
+ * The reference implementation of the [SimulationEngine] instance for the OpenDC simulation core.
+ *
+ * This engine implementation is a single-threaded implementation, running logical processes synchronously and
+ * provides a single priority queue for all events (messages, ticks, etc) that occur.
+ *
+ * @param rootBehavior The behavior of the root actor.
+ * @param name The name of the engine instance.
+ */
+class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine {
+ /**
+ * The state of the actor system.
+ */
+ private var state: SimulationEngineState = SimulationEngineState.CREATED
+
+ /**
+ * The clock tracking the simulation time.
+ */
+ private val clock: VirtualClock = VirtualClock(0)
+
+ /**
+ * The event queue to process
+ */
+ private val queue: PriorityQueue<Event> = PriorityQueue()
+
+ /**
+ * The active processes in the simulation engine.
+ */
+ private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap()
+
+ /**
+ * The channels that have been registered by this engine.
+ */
+ private val channels: MutableSet<ChannelImpl<*>> = HashSet()
+
+ /**
+ * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
+ */
+ @InternalCoroutinesApi
+ private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
+ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+ schedule(Event.Resume(clock.time + timeMillis, this, continuation))
+ }
+
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, block))
+ }
+ }
+
+ init {
+ spawn(rootBehavior, "/")
+ }
+
+ override suspend fun run() {
+ check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" }
+
+ if (state == SimulationEngineState.CREATED) {
+ state = SimulationEngineState.STARTED
+ }
+
+ while (coroutineContext.isActive) {
+ val event = queue.peek() ?: break
+ val delivery = event.time
+
+ // A message should never be delivered out of order in this single-threaded implementation. Assert for
+ // sanity
+ assert(delivery >= clock.time) { "Message delivered out of order [expected=$delivery, actual=${clock.time}]" }
+
+ clock.time = delivery
+ queue.poll()
+
+ process(event)
+ }
+ }
+
+ override suspend fun terminate() {
+ state = SimulationEngineState.TERMINATED
+ }
+
+ /**
+ * Schedule the specified event to be processed by the engine.
+ */
+ private fun schedule(@Async.Schedule event: Event) {
+ queue.add(event)
+ }
+
+ /**
+ * Process the delivery of an event.
+ */
+ private fun process(@Async.Execute event: Event) {
+ event.run()
+ }
+
+ /**
+ * Spawn a new logical process in the simulation universe.
+ */
+ private fun spawn(behavior: Behavior, name: String): ProcessRef {
+ val ref = ProcessRefImpl(this, name)
+ require(ref !in registry) { "Process name $name not unique" }
+ val lp = ProcessImpl(ref, behavior)
+ registry[ref] = lp
+ lp.start()
+ return ref
+ }
+
+ /**
+ * Register a new communication channel.
+ */
+ private fun <T : Any> open(): Channel<T> {
+ val channel = ChannelImpl<T>()
+ channels += channel
+ return channel
+ }
+
+ private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
+ override val clock: Clock
+ get() = this@OmegaSimulationEngine.clock
+
+ override fun spawn(behavior: Behavior): ProcessRef {
+ val name = "$" + UUID.randomUUID()
+ return spawn(behavior, name)
+ }
+
+ override fun spawn(behavior: Behavior, name: String): ProcessRef {
+ require(name.isNotEmpty()) { "Process name may not be empty" }
+ require(!name.startsWith("$")) { "Process name may not start with $-sign" }
+ return this@OmegaSimulationEngine.spawn(behavior, name)
+ }
+
+ override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open()
+
+ override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> {
+ require(send is ChannelImpl && send in channels) { "Invalid reference to channel" }
+ return SendPortImpl(send)
+ }
+
+ override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> {
+ require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" }
+ return ReceivePortImpl(receive)
+ }
+
+ /**
+ * Start this logical process.
+ */
+ fun start() = behavior.startCoroutine(this, this)
+
+ override fun resumeWith(result: Result<Unit>) {
+ // Stop the logical process
+ if (result.isFailure) {
+ result.exceptionOrNull()!!.printStackTrace()
+ }
+ }
+
+ override val key: CoroutineContext.Key<*> = ProcessContext.Key
+
+ @InternalCoroutinesApi
+ override val context: CoroutineContext = this + dispatcher
+ }
+
+ /**
+ * Enumeration to track the state of the actor system.
+ */
+ private enum class SimulationEngineState {
+ CREATED, STARTED, TERMINATED
+ }
+
+ /**
+ * Internal [ProcessRef] implementation for this simulation engine.
+ */
+ private data class ProcessRefImpl(
+ private val owner: OmegaSimulationEngine,
+ override val name: String
+ ) : ProcessRef {
+ override fun toString(): String = "Process[$name]"
+ }
+
+ /**
+ * Internal [Channel] implementation.
+ */
+ private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> {
+ override val send: SendRef<T> = this
+ override val receive: ReceiveRef<T> = this
+
+ /**
+ * The underlying `kotlinx.coroutines` channel to back this channel implementation.
+ */
+ private val channel = KChannel<T>(KChannel.CONFLATED)
+
+ /**
+ * Receive a message from this channel.
+ */
+ suspend fun receive(): T = channel.receive()
+
+ /**
+ * Send a message to this channel.
+ */
+ fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" }
+ }
+
+ private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> {
+ private var closed = false
+
+ override fun close(): Boolean {
+ if (closed) {
+ return false
+ }
+
+ closed = true
+ return true
+ }
+
+ override fun send(message: T) {
+ check(!closed) { "Port is closed" }
+ schedule(Event.Send(clock.time, channelImpl, message))
+ }
+
+ }
+
+ private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> {
+ private var closed = false
+
+ override fun close(): Boolean {
+ if (closed) {
+ return false
+ }
+
+ closed = true
+ return true
+ }
+
+ override suspend fun receive(): T {
+ check(!closed) { "Port is closed" }
+ return channel.receive()
+ }
+ }
+
+ /**
+ * A wrapper around a message that has been scheduled for processing.
+ *
+ * @property time The point in time to deliver the message.
+ */
+ private sealed class Event(val time: Long) : Comparable<Event>, Runnable {
+ override fun compareTo(other: Event): Int = time.compareTo(other.time)
+
+ class Dispatch(time: Long, val block: Runnable) : Event(time) {
+ override fun run() = block.run()
+
+ override fun toString(): String = "Dispatch[$time]"
+ }
+
+ class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time) {
+ @InternalCoroutinesApi
+ override fun run() {
+ with(continuation) { dispatcher.resumeUndispatched(Unit) }
+ }
+
+ override fun toString(): String = "Resume[$time]"
+ }
+
+ class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) {
+ override fun run() {
+ channel.send(message)
+ }
+ }
+ }
+
+ /**
+ * A virtual [Clock] implementation for keeping track of simulation time.
+ */
+ private data class VirtualClock(var time: Long) : Clock() {
+ override fun withZone(zone: ZoneId?): Clock = TODO("not implemented")
+
+ override fun getZone(): ZoneId = ZoneId.systemDefault()
+
+ override fun instant(): Instant = Instant.ofEpochMilli(time)
+
+ override fun millis(): Long = time
+ }
+}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
new file mode 100644
index 00000000..b9a1c30f
--- /dev/null
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.engine.omega
+
+import com.atlarge.odcsim.SimulationEngine
+import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.Behavior
+import java.util.ServiceLoader
+
+/**
+ * An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create
+ * [OmegaSimulationEngine] instances.
+ */
+class OmegaSimulationEngineProvider : SimulationEngineProvider {
+ override operator fun invoke(root: Behavior, name: String): SimulationEngine =
+ OmegaSimulationEngine(root, name)
+}
diff --git a/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.SimulationEngineProvider b/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.SimulationEngineProvider
new file mode 100644
index 00000000..1131cebd
--- /dev/null
+++ b/odcsim/odcsim-engine-omega/src/main/resources/META-INF/services/com.atlarge.odcsim.SimulationEngineProvider
@@ -0,0 +1 @@
+com.atlarge.odcsim.engine.omega.OmegaSimulationEngineProvider
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 955109e9..72d85c80 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -24,3 +24,4 @@
rootProject.name = "opendc-simulator"
include(":odcsim:odcsim-api")
+include(":odcsim:odcsim-engine-omega")