summaryrefslogtreecommitdiff
path: root/opendc-kernel-omega/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-kernel-omega/src')
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt30
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt47
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt435
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/ProcessTest.kt61
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt169
5 files changed, 0 insertions, 742 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt
deleted file mode 100644
index d63a53c8..00000000
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Context
-import com.atlarge.opendc.simulator.Process
-
-/**
- * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up
- * and resume execution.
- *
- * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to
- * wake up a process from another entity.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-object Resume
-
-/**
- * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been
- * reached and that it should wake up and resume execution.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-object Timeout
-
-/**
- * An internal message used by the Omega simulation kernel to launch a process.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-data class Launch<M>(val process: Process<*, M>)
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
deleted file mode 100644
index c0ab9fb4..00000000
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Bootstrap
-import com.atlarge.opendc.simulator.kernel.Kernel
-import com.atlarge.opendc.simulator.kernel.Simulation
-
-/**
- * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
- *
- * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
- * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-object OmegaKernel : Kernel {
- /**
- * Create a simulation over the given model facilitated by this simulation kernel.
- *
- * @param bootstrap The apply procedure to apply the simulation with.
- * @return A [Simulation] instance to control the simulation.
- */
- override fun <M> create(bootstrap: Bootstrap<M>): Simulation<M> = OmegaSimulation(bootstrap)
-}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
deleted file mode 100644
index 03861864..00000000
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.*
-import com.atlarge.opendc.simulator.instrumentation.Instrument
-import com.atlarge.opendc.simulator.instrumentation.InstrumentScope
-import com.atlarge.opendc.simulator.instrumentation.Port
-import com.atlarge.opendc.simulator.kernel.Simulation
-import kotlinx.coroutines.experimental.channels.Channel
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import kotlinx.coroutines.experimental.channels.SendChannel
-import mu.KotlinLogging
-import java.lang.ref.WeakReference
-import java.util.*
-import kotlin.coroutines.experimental.*
-
-/**
- * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
- *
- * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
- * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
- *
- * @property model The model that is simulated.
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Bootstrap.Context<M> {
- /**
- * The logger instance to use for the simulator.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The registry of the processes used in the simulation.
- */
- private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap()
-
- /**
- * The message queue.
- */
- private val queue: Queue<Envelope> = PriorityQueue(Comparator
- .comparingLong(Envelope::time)
- .thenComparingLong(Envelope::id))
-
- /**
- * The kernel process instance that handles internal operations during the simulation.
- */
- private val process = object : Process<Unit, M> {
- override val initialState = Unit
-
- override suspend fun Context<Unit, M>.run() {
- while(true) {
- val msg = receive()
- when (msg) {
- is Launch<*> ->
- @Suppress("UNCHECKED_CAST")
- launch((msg as Launch<M>).process)
- }
- }
- }
- }
-
- /**
- * The context associated with an [Entity].
- */
- @Suppress("UNCHECKED_CAST")
- private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
- get() = registry[this] as? OmegaContext<S>
-
- /**
- * The simulation time.
- */
- override var time: Instant = 0
-
- /**
- * The model of simulation.
- */
- // XXX: the bootstrap requires the properties of this class to be initialised, so changing the order may cause NPEs
- override var model: M = bootstrap.apply(this)
-
- /**
- * The observable state of an [Entity] in simulation, which is provided by the simulation context.
- */
- override val <E : Entity<S, *>, S> E.state: S
- get() = context?.state ?: initialState
-
- /**
- * Initialise the simulation instance.
- */
- init {
- // Launch the Omega kernel process
- launch(process)
- }
-
- // Bootstrap Context implementation
- override fun register(entity: Entity<*, M>): Boolean {
- if (!registry.containsKey(entity) && entity !is Process) {
- return false
- }
-
- schedule(Launch(entity as Process<*, M>), process)
- return true
- }
-
- override fun deregister(entity: Entity<*, M>): Boolean {
- val context = entity.context ?: return false
- context.resume(Unit)
- return true
- }
-
- override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) =
- schedule(prepare(message, destination, sender, delay))
-
- // Simulation implementation
- override fun openPort(): Port<M> = object : Port<M> {
- val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf()
-
- override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
- val channel = Channel<T>(capacity)
- val process = object : Process<Unit, M> {
- override val initialState = Unit
- override suspend fun Context<Unit, M>.run() {
- val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
- try {
- instrument(builder)
- channel.close()
- } catch (cause: Throwable) {
- channel.close(cause)
- }
- }
- }
- channels.add(WeakReference(channel))
- register(process)
- return channel
- }
-
- override fun close(cause: Throwable?): Boolean = channels
- .map { it.get()?.close(cause) ?: false }
- .any()
- }
-
- override fun step() {
- while (true) {
- val envelope = queue.peek() ?: return
- val delivery = envelope.time
-
- if (delivery > time) {
- // Tick has yet to occur
- // Jump in time to next event
- time = delivery
- break
- } else if (delivery < time) {
- // Tick has already occurred
- logger.warn { "Message processed out of order" }
- }
-
- queue.poll()
-
- // If the sender has canceled the message, we move on to the next message
- if (envelope.canceled) {
- continue
- }
-
- val context = envelope.destination.context ?: continue
- val continuation = context.continuation ?: continue
-
- // Clear the continuation to prevent resuming an already resumed continuation
- context.continuation = null
-
- if (envelope.message is Interrupt) {
- continuation.resumeWithException(envelope.message)
- } else {
- continuation.resume(envelope)
- }
-
- context.last = time
- }
- }
-
- override fun run() {
- while (queue.isNotEmpty()) {
- step()
- }
- }
-
- override fun run(until: Instant) {
- require(until > 0) { "The given instant must be a non-zero positive number" }
-
- if (time >= until) {
- return
- }
-
- while (time < until && queue.isNotEmpty()) {
- step()
- }
-
- // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at
- // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will
- // just jump forward again.
- if (time > until) {
- time = until
- }
- }
-
- /**
- * The identifier for the next message to be scheduled.
- */
- private var nextId: Long = 0
-
- /**
- * A wrapper around a message that has been scheduled for processing.
- *
- * @property id The identifier of the message to keep the priority queue stable
- * @property message The message to wrap.
- * @property time The point in time to deliver the message.
- * @property sender The sender of the message.
- * @property destination The destination of the message.
- */
- private data class Envelope(val id: Long,
- val message: Any,
- val time: Instant,
- val sender: Entity<*, *>?,
- val destination: Entity<*, *>) {
- /**
- * A flag to indicate the message has been canceled.
- */
- internal var canceled: Boolean = false
- }
-
- /**
- * Schedule the given envelope to be processed by the kernel.
- *
- * @param envelope The envelope containing the message to schedule.
- */
- private fun schedule(envelope: Envelope) {
- queue.add(envelope)
- }
-
- /**
- * Prepare a message for scheduling by wrapping it into an envelope.
- *
- * @param message The message to send.
- * @param destination The destination entity that should receive the message.
- * @param sender The optional sender of the message.
- * @param delay The time to delay the message.
- */
- private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
- delay: Duration): Envelope {
- require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
- return Envelope(nextId++, message, time + delay, sender, destination)
- }
-
- /**
- * Launch the given [Process].
- *
- * @param process The process to launch.
- */
- private fun launch(process: Process<*, M>) {
- val context = OmegaContext(process).also { registry[process] = it }
-
- // Bootstrap the process coroutine
- val block: suspend () -> Unit = { context.start() }
- block.startCoroutine(context)
- }
-
- /**
- * This internal class provides the default implementation for the [Context] interface for this simulator.
- */
- private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit>,
- AbstractCoroutineContextElement(Context) {
- /**
- * The model in which the process exists.
- */
- override val model: M
- get() = this@OmegaSimulation.model
-
- /**
- * The current point in simulation time.
- */
- override val time: Instant
- get() = this@OmegaSimulation.time
-
- /**
- * The [Entity] associated with this context.
- */
- override val self: Entity<S, M>
- get() = process
-
- /**
- * The duration between the current point in simulation time and the last point in simulation time where the
- * [Context] has executed some work.
- */
- override val delta: Duration
- get() = maxOf(time - last, 0)
-
- /**
- * The state of the entity.
- */
- override var state: S = process.initialState
-
- /**
- * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
- */
- override val <T : Entity<S, *>, S> T.state: S
- get() = context?.state ?: initialState
-
- /**
- * The sender of the last received message or `null` in case the process has not received any messages yet.
- */
- override var sender: Entity<*, *>? = null
-
- /**
- * The [CoroutineContext] for a [Context].
- */
- override val context: CoroutineContext = this
-
- /**
- * The continuation to resume the execution of the process.
- */
- var continuation: Continuation<Envelope>? = null
-
- /**
- * The last point in time the process has done some work.
- */
- var last: Instant = -1
-
- override suspend fun receive(): Any = receiveEnvelope().message
-
- override suspend fun receive(timeout: Duration): Any? {
- val send = prepare(Timeout, process, process, timeout).also { schedule(it) }
-
- try {
- val received = receiveEnvelope()
-
- if (received.message != Timeout) {
- send.canceled = true
- return received.message
- }
-
- return null
- } finally {
- send.canceled = true
- }
- }
-
- override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) =
- schedule(prepare(msg, this, sender, delay))
-
- override suspend fun Entity<*, *>.interrupt(interrupt: Interrupt) = send(interrupt)
-
- override suspend fun hold(duration: Duration) {
- require(duration >= 0) { "The amount of time to hold must be a positive number" }
- val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
-
- try {
- while (true) {
- if (receive() == Resume)
- return
- }
- } finally {
- envelope.canceled = true
- }
- }
-
- override suspend fun hold(duration: Duration, queue: Queue<Any>) {
- require(duration >= 0) { "The amount of time to hold must be a positive number" }
- val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
-
- try {
- while (true) {
- val msg = receive()
- if (msg == Resume)
- return
- queue.add(msg)
- }
- } finally {
- envelope.canceled = true
- }
- }
-
- /**
- * Start the process associated with this context.
- */
- internal suspend fun start() = process.run { run() }
-
- /**
- * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
- * message has been received.
- *
- * @return The envelope containing the message.
- */
- suspend fun receiveEnvelope() = suspendCoroutine<Envelope> { continuation = it }
- .also { sender = it.sender }
-
- // Completion continuation implementation
- /**
- * Resume the execution of this continuation with the given value.
- *
- * @param value The value to resume with.
- */
- override fun resume(value: Unit) {
- // Deregister process from registry in order to have the GC collect this context
- registry.remove(process)
- }
-
- /**
- * Resume the execution of this continuation with an exception.
- *
- * @param exception The exception to resume with.
- */
- override fun resumeWithException(exception: Throwable) = throw exception
- }
-}
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/ProcessTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/ProcessTest.kt
deleted file mode 100644
index 334dca41..00000000
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/ProcessTest.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2018 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Bootstrap
-import com.atlarge.opendc.simulator.Context
-import com.atlarge.opendc.simulator.Process
-import org.junit.jupiter.api.Test
-import kotlin.coroutines.experimental.suspendCoroutine
-
-/**
- * A test suite for processes in simulation.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-internal class ProcessTest {
- object FreezeProcess : Process<Unit, Unit> {
- override val initialState = Unit
- override suspend fun Context<Unit, Unit>.run() {
- receive()
- suspendCoroutine<Unit> {}
- }
- }
-
- /**
- * Test whether the simulation will not resume an already resumed continuation
- * of a process.
- */
- @Test
- fun `simulation will not resume frozen process`() {
- val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
- ctx.register(FreezeProcess)
- ctx.schedule("Hello", destination = FreezeProcess, delay = 1)
- ctx.schedule("Hello", destination = FreezeProcess, delay = 1)
- }
- val simulation = OmegaKernel.create(bootstrap)
- simulation.run()
- }
-}
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
deleted file mode 100644
index b056837c..00000000
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Bootstrap
-import com.atlarge.opendc.simulator.Context
-import com.atlarge.opendc.simulator.Process
-import com.atlarge.opendc.simulator.instrumentation.Instrument
-import com.atlarge.opendc.simulator.kernel.Simulation
-import kotlinx.coroutines.experimental.Unconfined
-import kotlinx.coroutines.experimental.async
-import kotlinx.coroutines.experimental.channels.consumeEach
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-
-/**
- * This test suite checks for smoke when running a large amount of simulations.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-internal class SmokeTest {
- class EchoProcess : Process<Unit, Unit> {
- override val initialState = Unit
- override suspend fun Context<Unit, Unit>.run() {
- while (true) {
- val msg = receive()
- sender?.send(msg)
- }
- }
- }
-
- /**
- * Run a large amount of simulations and test if any exceptions occur.
- */
- @Test
- fun smoke() {
- val n = 1000
- val messages = 100
- val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
- repeat(n) {
- EchoProcess().also {
- ctx.register(it)
-
- for (i in 1 until messages) {
- ctx.schedule(i, it, delay = i.toLong())
- }
- }
- }
- }
- val simulation = OmegaKernel.create(bootstrap)
- simulation.run()
- }
-
- object NullProcess : Process<Unit, Unit> {
- override val initialState = Unit
- override suspend fun Context<Unit, Unit>.run() {}
- }
-
- /**
- * Test if the kernel allows sending messages to [Context] instances that have already stopped.
- */
- @Test
- fun `sending message to process that has gracefully stopped`() {
- val process = NullProcess
- val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
- process.also {
- ctx.register(it)
- ctx.schedule(0, it)
- }
- }
-
- val simulation = OmegaKernel.create(bootstrap)
- simulation.run()
- }
-
- object CrashProcess : Process<Unit, Unit> {
- override val initialState = Unit
- override suspend fun Context<Unit, Unit>.run() {
- throw RuntimeException("This process should crash")
- }
- }
-
- /**
- * Test if the kernel allows sending messages to [Context] instances that have crashed.
- */
- @Test
- fun `sending message to process that has crashed`() {
- val process = CrashProcess
- val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
- process.also {
- ctx.register(it)
- ctx.schedule(0, it)
- }
- }
-
- val simulation = OmegaKernel.create(bootstrap)
- assertThrows<RuntimeException> { simulation.run() }
- }
-
- class ModelProcess(private val value: Int) : Process<Boolean, Int> {
- override val initialState = false
- override suspend fun Context<Boolean, Int>.run() {
- assertEquals(value, model)
- state = true
- hold(10)
- }
- }
-
- /**
- * Test if the kernel allows access to the simulation model object.
- */
- @Test
- fun `access simulation model`() {
- val value = 1
- val process = ModelProcess(value)
- val bootstrap: Bootstrap<Int> = Bootstrap.create { ctx ->
- ctx.register(process)
- value
- }
-
- val kernel = OmegaKernel.create(bootstrap)
- kernel.run(5)
- }
-
-
- @Test
- fun `instrumentation works`() {
- val instrument: Instrument<Int, Unit> = {
- var value = 0
-
- for (i in 1..10) {
- send(value)
- value += 10
- hold(20)
- }
- }
-
- val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit })
- val stream = simulation.openPort().install(instrument)
-
- val res = async(Unconfined) {
- stream.consumeEach { println(it) }
- }
- simulation.run(100)
- }
-}