summaryrefslogtreecommitdiff
path: root/opendc-kernel-omega
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-kernel-omega')
-rw-r--r--opendc-kernel-omega/build.gradle56
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt16
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt543
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt18
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt142
5 files changed, 387 insertions, 388 deletions
diff --git a/opendc-kernel-omega/build.gradle b/opendc-kernel-omega/build.gradle
index 1ca00aa6..e02da931 100644
--- a/opendc-kernel-omega/build.gradle
+++ b/opendc-kernel-omega/build.gradle
@@ -24,19 +24,19 @@
/* Build configuration */
buildscript {
- ext.kotlin_version = '1.2.21'
- ext.dokka_version = '0.9.15'
+ ext.kotlin_version = '1.2.21'
+ ext.dokka_version = '0.9.15'
- repositories {
- mavenCentral()
- jcenter()
- }
+ repositories {
+ mavenCentral()
+ jcenter()
+ }
- dependencies {
- classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
- classpath "org.jetbrains.dokka:dokka-gradle-plugin:$dokka_version"
- classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.3'
- }
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ classpath "org.jetbrains.dokka:dokka-gradle-plugin:$dokka_version"
+ classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.3'
+ }
}
apply plugin: 'java'
@@ -45,26 +45,26 @@ apply plugin: 'org.jetbrains.dokka'
apply plugin: 'org.junit.platform.gradle.plugin'
compileKotlin {
- kotlinOptions {
- jvmTarget = "1.8"
- }
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
}
compileTestKotlin {
- kotlinOptions {
- jvmTarget = "1.8"
- }
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
}
kotlin {
- experimental {
- coroutines 'enable'
- }
+ experimental {
+ coroutines 'enable'
+ }
}
dokka {
- outputFormat = 'html'
- outputDirectory = "$buildDir/javadoc"
+ outputFormat = 'html'
+ outputDirectory = "$buildDir/javadoc"
}
/* Project configuration */
@@ -72,17 +72,17 @@ group 'com.atlarge.opendc'
version '1.1'
repositories {
- jcenter()
+ jcenter()
}
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.2"
compile project(':opendc-core')
- compile "io.github.microutils:kotlin-logging:1.4.6"
+ compile "io.github.microutils:kotlin-logging:1.4.6"
- testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3"
- testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3"
- testCompile "org.junit.platform:junit-platform-launcher:1.0.0-RC3"
- testCompile "org.slf4j:slf4j-simple:1.7.25"
+ testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3"
+ testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3"
+ testCompile "org.junit.platform:junit-platform-launcher:1.0.0-RC3"
+ testCompile "org.slf4j:slf4j-simple:1.7.25"
}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt
index af13d1fd..32f27111 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt
@@ -24,9 +24,9 @@
package com.atlarge.opendc.omega
-import com.atlarge.opendc.simulator.Instant
import com.atlarge.opendc.simulator.Entity
import com.atlarge.opendc.simulator.Envelope
+import com.atlarge.opendc.simulator.Instant
/**
* A wrapper around a message that has been scheduled for processing.
@@ -38,11 +38,11 @@ import com.atlarge.opendc.simulator.Envelope
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
internal data class MessageContainer(override val message: Any,
- val time: Instant,
- override val sender: Entity<*, *>?,
- override val destination: Entity<*, *>) : Envelope<Any> {
- /**
- * A flag to indicate the message has been canceled.
- */
- internal var canceled: Boolean = false
+ val time: Instant,
+ override val sender: Entity<*, *>?,
+ override val destination: Entity<*, *>) : Envelope<Any> {
+ /**
+ * A flag to indicate the message has been canceled.
+ */
+ internal var canceled: Boolean = false
}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
index fb5ce24b..c729a63d 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
@@ -26,7 +26,6 @@ package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.*
import com.atlarge.opendc.simulator.kernel.Kernel
-import com.atlarge.opendc.simulator.Bootstrap
import mu.KotlinLogging
import java.util.*
import kotlin.coroutines.experimental.*
@@ -41,275 +40,275 @@ import kotlin.coroutines.experimental.*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Context<M> {
- /**
- * The logger instance to use for the simulator.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The registry of the simulation kernels used in the experiment.
- */
- private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap()
-
- /**
- * The message queue.
- */
- private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
-
- /**
- * The simulation time.
- */
- override var time: Instant = 0
-
- /**
- * The model of simulation.
- */
- override val model: M = bootstrap.bootstrap(this)
-
- override val <E : Entity<S, *>, S> E.state: S
- get() = context?.state ?: initialState
-
- /**
- * The context associated with an [Entity].
- */
- @Suppress("UNCHECKED_CAST")
- private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
- get() = registry[this] as? OmegaContext<S>
-
- override fun register(entity: Entity<*, M>): Boolean {
- if (!registry.containsKey(entity) && entity !is Process) {
- return false
- }
-
- @Suppress("UNCHECKED_CAST")
- val process = entity as Process<Any, M>
- val context = OmegaContext(entity).also { registry.put(entity, it) }
-
- // Bootstrap the process coroutine
- val block: suspend () -> Unit = { process.run { context.run() } }
- block.startCoroutine(context)
-
- return true
- }
-
- override fun deregister(entity: Entity<*, M>): Boolean {
- val context = entity.context ?: return false
- context.resume(Unit)
- return true
- }
-
- override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) =
- schedule(prepare(message, destination, sender, delay))
-
- override fun step() {
- while (true) {
- val envelope = queue.peek() ?: return
- val delivery = envelope.time
-
- if (delivery > time) {
- // Tick has yet to occur
- // Jump in time to next event
- time = delivery
- break
- } else if (delivery < time) {
- // Tick has already occurred
- logger.warn { "message processed out of order" }
- }
-
- queue.poll()
-
- // If the sender has canceled the message, we move on to the next message
- if (envelope.canceled) {
- continue
- }
-
- val context = envelope.destination.context ?: continue
-
- if (envelope.message !is Interrupt) {
- context.continuation.resume(envelope)
- } else {
- context.continuation.resumeWithException(envelope.message)
- }
-
- context.last = time
- }
- }
-
-
- override fun run() {
- while (queue.isNotEmpty()) {
- step()
- }
- }
-
- override fun run(until: Instant) {
- require(until > 0) { "The given instant must be a non-zero positive number" }
-
- if (time >= until) {
- return
- }
-
- while (time < until && queue.isNotEmpty()) {
- step()
- }
-
- // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at
- // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will
- // just jump forward again.
- if (time > until) {
- time = until
- }
- }
-
- private fun schedule(envelope: MessageContainer) {
- queue.add(envelope)
- }
-
- private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
- delay: Duration): MessageContainer {
- require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
- return MessageContainer(message, time + delay, sender, destination)
- }
-
- /**
- * This internal class provides the default implementation for the [Context] interface for this simulator.
- */
- private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> {
- /**
- * The continuation to resume the execution of the process.
- */
- lateinit var continuation: Continuation<Envelope<*>>
-
- /**
- * The last point in time the process has done some work.
- */
- var last: Instant = -1
-
- /**
- * The model in which the process exists.
- */
- override val model: M
- get() = this@OmegaKernel.model
-
- /**
- * The state of the entity.
- */
- override var state: S = process.initialState
-
- /**
- * The current point in simulation time.
- */
- override val time: Instant
- get() = this@OmegaKernel.time
-
- /**
- * The duration between the current point in simulation time and the last point in simulation time where the
- * [Context] has executed some work.
- */
- override val delta: Duration
- get() = maxOf(time - last, 0)
-
- /**
- * The [CoroutineContext] for a [Context].
- */
- override val context: CoroutineContext = EmptyCoroutineContext
-
- /**
- * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
- */
- override val <T : Entity<S, *>, S> T.state: S
- get() = context?.state ?: initialState
-
- /**
- * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
- * message has been received.
- *
- * @return The envelope containing the message.
- */
- suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it }
-
- suspend override fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T {
- val envelope = receiveEnvelope()
- return transform(envelope, envelope.message)
- }
-
-
- suspend override fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? {
- val send = prepare(Timeout, process, process, timeout).also { schedule(it) }
-
- try {
- val received = receiveEnvelope()
-
- if (received.message !is Timeout) {
- send.canceled = true
- return transform(received, received.message)
- }
-
- return null
- } finally {
- send.canceled = true
- }
- }
-
- suspend override fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay)
-
- suspend override fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) =
- schedule(prepare(msg, sender, delay = delay))
-
- suspend override fun Entity<*, *>.interrupt() = send(Interrupt)
-
- suspend override fun hold(duration: Duration) {
- require(duration >= 0) { "The amount of time to hold must be a positive number" }
- val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
-
- try {
- while (true) {
- if (receive() is Resume)
- return
- }
- } finally {
- envelope.canceled = true
- }
- }
-
- suspend override fun hold(duration: Duration, queue: Queue<Any>) {
- require(duration >= 0) { "The amount of time to hold must be a positive number" }
- val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
-
- try {
- while (true) {
- val msg = receive()
- if (msg is Resume)
- return
- queue.add(msg)
- }
- } finally {
- envelope.canceled = true
- }
- }
-
-
- // Completion continuation implementation
- /**
- * Resume the execution of this continuation with the given value.
- *
- * @param value The value to resume with.
- */
- override fun resume(value: Unit) {
- // Deregister process from registry in order to have the GC collect this context
- registry.remove(process)
- }
-
- /**
- * Resume the execution of this continuation with an exception.
- *
- * @param exception The exception to resume with.
- */
- override fun resumeWithException(exception: Throwable) {
- // Deregister process from registry in order to have the GC collect this context:w
- registry.remove(process)
-
- logger.error(exception) { "An exception occurred during the execution of a process" }
- }
- }
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The registry of the simulation kernels used in the experiment.
+ */
+ private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap()
+
+ /**
+ * The message queue.
+ */
+ private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
+
+ /**
+ * The simulation time.
+ */
+ override var time: Instant = 0
+
+ /**
+ * The model of simulation.
+ */
+ override val model: M = bootstrap.bootstrap(this)
+
+ override val <E : Entity<S, *>, S> E.state: S
+ get() = context?.state ?: initialState
+
+ /**
+ * The context associated with an [Entity].
+ */
+ @Suppress("UNCHECKED_CAST")
+ private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
+ get() = registry[this] as? OmegaContext<S>
+
+ override fun register(entity: Entity<*, M>): Boolean {
+ if (!registry.containsKey(entity) && entity !is Process) {
+ return false
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ val process = entity as Process<Any, M>
+ val context = OmegaContext(entity).also { registry[entity] = it }
+
+ // Bootstrap the process coroutine
+ val block: suspend () -> Unit = { process.run { context.run() } }
+ block.startCoroutine(context)
+
+ return true
+ }
+
+ override fun deregister(entity: Entity<*, M>): Boolean {
+ val context = entity.context ?: return false
+ context.resume(Unit)
+ return true
+ }
+
+ override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) =
+ schedule(prepare(message, destination, sender, delay))
+
+ override fun step() {
+ while (true) {
+ val envelope = queue.peek() ?: return
+ val delivery = envelope.time
+
+ if (delivery > time) {
+ // Tick has yet to occur
+ // Jump in time to next event
+ time = delivery
+ break
+ } else if (delivery < time) {
+ // Tick has already occurred
+ logger.warn { "message processed out of order" }
+ }
+
+ queue.poll()
+
+ // If the sender has canceled the message, we move on to the next message
+ if (envelope.canceled) {
+ continue
+ }
+
+ val context = envelope.destination.context ?: continue
+
+ if (envelope.message !is Interrupt) {
+ context.continuation.resume(envelope)
+ } else {
+ context.continuation.resumeWithException(envelope.message)
+ }
+
+ context.last = time
+ }
+ }
+
+
+ override fun run() {
+ while (queue.isNotEmpty()) {
+ step()
+ }
+ }
+
+ override fun run(until: Instant) {
+ require(until > 0) { "The given instant must be a non-zero positive number" }
+
+ if (time >= until) {
+ return
+ }
+
+ while (time < until && queue.isNotEmpty()) {
+ step()
+ }
+
+ // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at
+ // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will
+ // just jump forward again.
+ if (time > until) {
+ time = until
+ }
+ }
+
+ private fun schedule(envelope: MessageContainer) {
+ queue.add(envelope)
+ }
+
+ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
+ delay: Duration): MessageContainer {
+ require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
+ return MessageContainer(message, time + delay, sender, destination)
+ }
+
+ /**
+ * This internal class provides the default implementation for the [Context] interface for this simulator.
+ */
+ private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> {
+ /**
+ * The continuation to resume the execution of the process.
+ */
+ lateinit var continuation: Continuation<Envelope<*>>
+
+ /**
+ * The last point in time the process has done some work.
+ */
+ var last: Instant = -1
+
+ /**
+ * The model in which the process exists.
+ */
+ override val model: M
+ get() = this@OmegaKernel.model
+
+ /**
+ * The state of the entity.
+ */
+ override var state: S = process.initialState
+
+ /**
+ * The current point in simulation time.
+ */
+ override val time: Instant
+ get() = this@OmegaKernel.time
+
+ /**
+ * The duration between the current point in simulation time and the last point in simulation time where the
+ * [Context] has executed some work.
+ */
+ override val delta: Duration
+ get() = maxOf(time - last, 0)
+
+ /**
+ * The [CoroutineContext] for a [Context].
+ */
+ override val context: CoroutineContext = EmptyCoroutineContext
+
+ /**
+ * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
+ */
+ override val <T : Entity<S, *>, S> T.state: S
+ get() = context?.state ?: initialState
+
+ /**
+ * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
+ * message has been received.
+ *
+ * @return The envelope containing the message.
+ */
+ suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it }
+
+ override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T {
+ val envelope = receiveEnvelope()
+ return transform(envelope, envelope.message)
+ }
+
+
+ override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? {
+ val send = prepare(Timeout, process, process, timeout).also { schedule(it) }
+
+ try {
+ val received = receiveEnvelope()
+
+ if (received.message == Timeout) {
+ send.canceled = true
+ return transform(received, received.message)
+ }
+
+ return null
+ } finally {
+ send.canceled = true
+ }
+ }
+
+ override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay)
+
+ override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) =
+ schedule(prepare(msg, sender, delay = delay))
+
+ override suspend fun Entity<*, *>.interrupt() = send(Interrupt)
+
+ override suspend fun hold(duration: Duration) {
+ require(duration >= 0) { "The amount of time to hold must be a positive number" }
+ val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
+
+ try {
+ while (true) {
+ if (receive() == Resume)
+ return
+ }
+ } finally {
+ envelope.canceled = true
+ }
+ }
+
+ override suspend fun hold(duration: Duration, queue: Queue<Any>) {
+ require(duration >= 0) { "The amount of time to hold must be a positive number" }
+ val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
+
+ try {
+ while (true) {
+ val msg = receive()
+ if (msg == Resume)
+ return
+ queue.add(msg)
+ }
+ } finally {
+ envelope.canceled = true
+ }
+ }
+
+
+ // Completion continuation implementation
+ /**
+ * Resume the execution of this continuation with the given value.
+ *
+ * @param value The value to resume with.
+ */
+ override fun resume(value: Unit) {
+ // Deregister process from registry in order to have the GC collect this context
+ registry.remove(process)
+ }
+
+ /**
+ * Resume the execution of this continuation with an exception.
+ *
+ * @param exception The exception to resume with.
+ */
+ override fun resumeWithException(exception: Throwable) {
+ // Deregister process from registry in order to have the GC collect this context:w
+ registry.remove(process)
+
+ logger.error(exception) { "An exception occurred during the execution of a process" }
+ }
+ }
}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt
index dcad4dce..139cbd19 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt
@@ -24,9 +24,9 @@
package com.atlarge.opendc.omega
-import com.atlarge.opendc.simulator.kernel.KernelFactory
-import com.atlarge.opendc.simulator.kernel.Kernel
import com.atlarge.opendc.simulator.Bootstrap
+import com.atlarge.opendc.simulator.kernel.Kernel
+import com.atlarge.opendc.simulator.kernel.KernelFactory
/**
* The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
@@ -37,11 +37,11 @@ import com.atlarge.opendc.simulator.Bootstrap
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
object OmegaKernelFactory : KernelFactory {
- /**
- * Create a simulation over the given model facilitated by this simulation kernel.
- *
- * @param bootstrap The bootstrap procedure to bootstrap the simulation with.
- * @return A [Kernel] instance to control the simulation.
- */
- override fun <M> create(bootstrap: Bootstrap<M>): Kernel<M> = OmegaKernel(bootstrap)
+ /**
+ * Create a simulation over the given model facilitated by this simulation kernel.
+ *
+ * @param bootstrap The bootstrap procedure to bootstrap the simulation with.
+ * @return A [Kernel] instance to control the simulation.
+ */
+ override fun <M> create(bootstrap: Bootstrap<M>): Kernel<M> = OmegaKernel(bootstrap)
}
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index b358d618..a1ec8e88 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -24,9 +24,9 @@
package com.atlarge.opendc.omega
+import com.atlarge.opendc.simulator.Bootstrap
import com.atlarge.opendc.simulator.Context
import com.atlarge.opendc.simulator.Process
-import com.atlarge.opendc.simulator.Bootstrap
import org.junit.jupiter.api.Test
/**
@@ -35,82 +35,82 @@ import org.junit.jupiter.api.Test
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
internal class SmokeTest {
- class EchoProcess : Process<Unit, Unit> {
- override val initialState = Unit
- suspend override fun Context<Unit, Unit>.run() {
- while (true) {
- receive {
- sender?.send(message)
- }
- }
- }
- }
+ class EchoProcess : Process<Unit, Unit> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, Unit>.run() {
+ while (true) {
+ receive {
+ sender?.send(message)
+ }
+ }
+ }
+ }
- /**
- * Run a large amount of simulations and test if any exceptions occur.
- */
- @Test
- fun smoke() {
- val n = 1000
- val messages = 100
- val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
- repeat(n) {
- EchoProcess().also {
- ctx.register(it)
+ /**
+ * Run a large amount of simulations and test if any exceptions occur.
+ */
+ @Test
+ fun smoke() {
+ val n = 1000
+ val messages = 100
+ val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
+ repeat(n) {
+ EchoProcess().also {
+ ctx.register(it)
- for (i in 1 until messages) {
- ctx.schedule(i, it, delay = i.toLong())
- }
- }
- }
- }
- val kernel = OmegaKernelFactory.create(bootstrap)
- kernel.run()
- }
+ for (i in 1 until messages) {
+ ctx.schedule(i, it, delay = i.toLong())
+ }
+ }
+ }
+ }
+ val kernel = OmegaKernelFactory.create(bootstrap)
+ kernel.run()
+ }
- class NullProcess : Process<Unit, Unit> {
- override val initialState = Unit
- suspend override fun Context<Unit, Unit>.run() {}
- }
+ class NullProcess : Process<Unit, Unit> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, Unit>.run() {}
+ }
- /**
- * Test if the kernel allows sending messages to [Context] instances that have already stopped.
- */
- @Test
- fun `sending message to process that has gracefully stopped`() {
- val process = NullProcess()
- val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
- process.also {
- ctx.register(it)
- ctx.schedule(0, it)
- }
- }
+ /**
+ * Test if the kernel allows sending messages to [Context] instances that have already stopped.
+ */
+ @Test
+ fun `sending message to process that has gracefully stopped`() {
+ val process = NullProcess()
+ val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
+ process.also {
+ ctx.register(it)
+ ctx.schedule(0, it)
+ }
+ }
- val kernel = OmegaKernelFactory.create(bootstrap)
- kernel.run()
- }
+ val kernel = OmegaKernelFactory.create(bootstrap)
+ kernel.run()
+ }
- class CrashProcess : Process<Unit, Unit> {
- override val initialState = Unit
- suspend override fun Context<Unit, Unit>.run() {
- TODO("This process should crash")
- }
- }
+ class CrashProcess : Process<Unit, Unit> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, Unit>.run() {
+ TODO("This process should crash")
+ }
+ }
- /**
- * Test if the kernel allows sending messages to [Context] instances that have crashed.
- */
- @Test
- fun `sending message to process that has crashed`() {
- val process = CrashProcess()
- val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
- process.also {
- ctx.register(it)
- ctx.schedule(0, it)
- }
- }
+ /**
+ * Test if the kernel allows sending messages to [Context] instances that have crashed.
+ */
+ @Test
+ fun `sending message to process that has crashed`() {
+ val process = CrashProcess()
+ val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
+ process.also {
+ ctx.register(it)
+ ctx.schedule(0, it)
+ }
+ }
- val kernel = OmegaKernelFactory.create(bootstrap)
- kernel.run()
- }
+ val kernel = OmegaKernelFactory.create(bootstrap)
+ kernel.run()
+ }
}