summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt20
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt50
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/KernelFactory.kt42
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt74
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt6
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt300
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt47
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt331
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt18
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt8
-rw-r--r--opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt8
-rw-r--r--opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt5
-rw-r--r--opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt2
13 files changed, 453 insertions, 458 deletions
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt
index 10a89704..5f41c727 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt
@@ -9,15 +9,15 @@ package com.atlarge.opendc.simulator
*/
interface Bootstrap<M> {
/**
- * Bootstrap a model `M` for a kernel in the given context.
+ * Apply the apply procedure for model `M` for a simulation in the given context.
*
- * @param context The context to bootstrap to model in.
- * @return The initialised model for the simulation.
+ * @param context The context to apply to model in.
+ * @return The initialised, resulting model for the simulation.
*/
- fun bootstrap(context: Context<M>): M
+ fun apply(context: Context<M>): M
/**
- * A context for the bootstrap of some model type `M` that allows the model to register the entities of the model to
+ * A context for the apply of some model type `M` that allows the model to register the entities of the model to
* the simulation kernel.
*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
@@ -40,7 +40,7 @@ interface Bootstrap<M> {
fun deregister(entity: Entity<*, M>): Boolean
/**
- * Schedule a message for processing by a [Context].
+ * Schedule a message to be received by the given [Entity].
*
* @param message The message to schedule.
* @param destination The destination of the message.
@@ -52,13 +52,13 @@ interface Bootstrap<M> {
companion object {
/**
- * Create a [Bootstrap] procedure using the given block to produce a bootstrap for a model of type `M`.
+ * Create a [Bootstrap] procedure using the given block to produce a apply for a model of type `M`.
*
- * @param block The block to produce the bootstrap.
- * @return The bootstrap procedure that has been built.
+ * @param block The block to produce the apply.
+ * @return The apply procedure that has been built.
*/
fun <M> create(block: (Context<M>) -> M): Bootstrap<M> = object : Bootstrap<M> {
- override fun bootstrap(context: Context<M>) = block(context)
+ override fun apply(context: Context<M>) = block(context)
}
}
}
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt
index 29b3bdee..d4995283 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt
@@ -25,55 +25,19 @@
package com.atlarge.opendc.simulator.kernel
import com.atlarge.opendc.simulator.Bootstrap
-import com.atlarge.opendc.simulator.Entity
-import com.atlarge.opendc.simulator.Instant
/**
- * A message based discrete event simulation kernel.
+ * A message-based discrete event simulator (DES). This interface is a factory for creating [Simulation]s using the
+ * provided [Bootstrap] for the model.
*
- * The kernel is created by bootstrapping some model `M` (see [Bootstrap]) to simulate and controls the simulation by
- * allowing the user to step over cycles in the simulation and inspect the internal state using [Entity.state].
- *
- * A kernel should provide additionally a [KernelFactory] to create new kernel instances given a certain model
- * [Bootstrap].
- *
- * @param M The shape of the model over which the simulation runs.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-interface Kernel<out M> {
- /**
- * The model in which the simulation runs.
- */
- val model: M
-
- /**
- * The simulation time.
- */
- var time: Instant
-
- /**
- * The observable state of an [Entity] in simulation, which is provided by the simulation context.
- */
- val <E : Entity<S, *>, S> E.state: S
-
- /**
- * Step through one cycle in the simulation. This method will process all events in a single tick, update the
- * internal clock and then return the control to the user.
- */
- fun step()
-
- /**
- * Run a simulation over the specified model.
- * This method will step through multiple cycles in the simulation until no more message exist in the queue.
- */
- fun run()
-
+interface Kernel {
/**
- * Run a simulation over the specified model, stepping through cycles until the specified clock tick has
- * occurred. The control is then handed back to the user.
+ * Create a simulation over the given model facilitated by this simulation kernel.
*
- * @param until The point in simulation time at which the simulation should be paused and the control is handed
- * back to the user.
+ * @param bootstrap The apply procedure to apply the simulation with.
+ * @return A [Simulation] instance representing the simulation.
*/
- fun run(until: Instant)
+ fun <M> create(bootstrap: Bootstrap<M>): Simulation<M>
}
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/KernelFactory.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/KernelFactory.kt
deleted file mode 100644
index 30abb7ca..00000000
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/KernelFactory.kt
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.simulator.kernel
-
-import com.atlarge.opendc.simulator.Bootstrap
-
-/**
- * A factory for bootstrapping simulation [Kernel] instances.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-interface KernelFactory {
- /**
- * Create a simulation over the given model facilitated by this simulation kernel.
- *
- * @param bootstrap The bootstrap procedure to bootstrap the simulation with.
- * @return A [Kernel] instance to control the simulation.
- */
- fun <M> create(bootstrap: Bootstrap<M>): Kernel<M>
-}
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
new file mode 100644
index 00000000..bb2ef818
--- /dev/null
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
@@ -0,0 +1,74 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.simulator.kernel
+
+import com.atlarge.opendc.simulator.Entity
+import com.atlarge.opendc.simulator.Instant
+
+/**
+ * A message based discrete event simulation over some model `M`. This interface provides direct control over the
+ * simulation, allowing the user to step over cycles of the simulation and inspecting the state of the simulation via
+ * [Entity.state].
+ *
+ * @param M The shape of the model over which the simulation runs.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+interface Simulation<out M> {
+ /**
+ * The model in which the simulation runs.
+ */
+ val model: M
+
+ /**
+ * The simulation time.
+ */
+ var time: Instant
+
+ /**
+ * The observable state of an [Entity] in simulation, which is provided by the simulation context.
+ */
+ val <E : Entity<S, *>, S> E.state: S
+
+ /**
+ * Step through one cycle in the simulation. This method will process all events in a single tick, update the
+ * internal clock and then return the control to the user.
+ */
+ fun step()
+
+ /**
+ * Run a simulation over the specified model.
+ * This method will step through multiple cycles in the simulation until no more message exist in the queue.
+ */
+ fun run()
+
+ /**
+ * Run a simulation over the specified model, stepping through cycles until the specified clock tick has
+ * occurred. The control is then handed back to the user.
+ *
+ * @param until The point in simulation time at which the simulation should be paused and the control is handed
+ * back to the user.
+ */
+ fun run(until: Instant)
+}
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt
index 92d56be1..eb959ded 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt
@@ -25,7 +25,7 @@
package com.atlarge.opendc.simulator.platform
import com.atlarge.opendc.simulator.Duration
-import com.atlarge.opendc.simulator.kernel.KernelFactory
+import com.atlarge.opendc.simulator.kernel.Kernel
/**
* A blueprint for a reproducible simulation in a pre-defined setting.
@@ -39,7 +39,7 @@ interface Experiment<out T> {
* @param factory The factory to create the simulation kernel with.
* @return The result of the experiment.
*/
- fun run(factory: KernelFactory): T
+ fun run(factory: Kernel): T
/**
* Run the experiment on the specified kernel implementation.
@@ -48,5 +48,5 @@ interface Experiment<out T> {
* @param timeout The maximum duration of the experiment before returning to the caller.
* @return The result of the experiment or `null`.
*/
- fun run(factory: KernelFactory, timeout: Duration): T?
+ fun run(factory: Kernel, timeout: Duration): T?
}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
index 6ece73aa..e0d70eed 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
@@ -24,11 +24,9 @@
package com.atlarge.opendc.omega
-import com.atlarge.opendc.simulator.*
+import com.atlarge.opendc.simulator.Bootstrap
+import com.atlarge.opendc.simulator.kernel.Simulation
import com.atlarge.opendc.simulator.kernel.Kernel
-import mu.KotlinLogging
-import java.util.*
-import kotlin.coroutines.experimental.*
/**
* The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
@@ -36,296 +34,14 @@ import kotlin.coroutines.experimental.*
* This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
* provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
*
- * @property model The model that is simulated.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Context<M> {
+object OmegaKernel : Kernel {
/**
- * The logger instance to use for the simulator.
+ * Create a simulation over the given model facilitated by this simulation kernel.
+ *
+ * @param bootstrap The apply procedure to apply the simulation with.
+ * @return A [Simulation] instance to control the simulation.
*/
- private val logger = KotlinLogging.logger {}
-
- /**
- * The registry of the simulation kernels used in the experiment.
- */
- private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap()
-
- /**
- * The message queue.
- */
- private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
-
- /**
- * The processes to be spawned.
- */
- private val spawnings: Queue<Process<*, M>> = ArrayDeque()
-
- /**
- * The simulation time.
- */
- override var time: Instant = 0
-
- /**
- * The model of simulation.
- */
- override val model: M = bootstrap.bootstrap(this)
-
- override val <E : Entity<S, *>, S> E.state: S
- get() = context?.state ?: initialState
-
- /**
- * The context associated with an [Entity].
- */
- @Suppress("UNCHECKED_CAST")
- private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
- get() = registry[this] as? OmegaContext<S>
-
- override fun register(entity: Entity<*, M>): Boolean {
- if (!registry.containsKey(entity) && entity !is Process) {
- return false
- }
-
- val process = entity as Process<*, M>
- spawnings.add(process)
- return true
- }
-
- override fun deregister(entity: Entity<*, M>): Boolean {
- val context = entity.context ?: return false
- context.resume(Unit)
- return true
- }
-
- override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) =
- schedule(prepare(message, destination, sender, delay))
-
- override fun step() {
- while (true) {
- // Initialise all spawned processes
- while (spawnings.isNotEmpty()) {
- val process = spawnings.poll()
- val context = OmegaContext(process).also { registry[process] = it }
-
- // Bootstrap the process coroutine
- val block: suspend () -> Unit = { context.start() }
- block.startCoroutine(context)
-
- }
-
- val envelope = queue.peek() ?: return
- val delivery = envelope.time
-
- if (delivery > time) {
- // Tick has yet to occur
- // Jump in time to next event
- time = delivery
- break
- } else if (delivery < time) {
- // Tick has already occurred
- logger.warn { "Message processed out of order" }
- }
-
- queue.poll()
-
- // If the sender has canceled the message, we move on to the next message
- if (envelope.canceled) {
- continue
- }
-
- val context = envelope.destination.context ?: continue
-
- if (envelope.message !is Interrupt) {
- context.continuation.resume(envelope)
- } else {
- context.continuation.resumeWithException(envelope.message)
- }
-
- context.last = time
- }
- }
-
-
- override fun run() {
- while (queue.isNotEmpty() || spawnings.isNotEmpty()) {
- step()
- }
- }
-
- override fun run(until: Instant) {
- require(until > 0) { "The given instant must be a non-zero positive number" }
-
- if (time >= until) {
- return
- }
-
- while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) {
- step()
- }
-
- // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at
- // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will
- // just jump forward again.
- if (time > until) {
- time = until
- }
- }
-
- private fun schedule(envelope: MessageContainer) {
- queue.add(envelope)
- }
-
- private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
- delay: Duration): MessageContainer {
- require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
- return MessageContainer(message, time + delay, sender, destination)
- }
-
- /**
- * This internal class provides the default implementation for the [Context] interface for this simulator.
- */
- private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> {
- /**
- * The continuation to resume the execution of the process.
- */
- lateinit var continuation: Continuation<Envelope<*>>
-
- /**
- * The last point in time the process has done some work.
- */
- var last: Instant = -1
-
- /**
- * The model in which the process exists.
- */
- override val model: M
- get() = this@OmegaKernel.model
-
- /**
- * The state of the entity.
- */
- override var state: S = process.initialState
-
- /**
- * The current point in simulation time.
- */
- override val time: Instant
- get() = this@OmegaKernel.time
-
- /**
- * The duration between the current point in simulation time and the last point in simulation time where the
- * [Context] has executed some work.
- */
- override val delta: Duration
- get() = maxOf(time - last, 0)
-
- /**
- * The [CoroutineContext] for a [Context].
- */
- override val context: CoroutineContext = EmptyCoroutineContext
-
- /**
- * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
- */
- override val <T : Entity<S, *>, S> T.state: S
- get() = context?.state ?: initialState
-
- /**
- * Start the process associated with this context.
- */
- internal suspend fun start() = process.run {
- run()
- }
-
- /**
- * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
- * message has been received.
- *
- * @return The envelope containing the message.
- */
- suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it }
-
- override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T {
- val envelope = receiveEnvelope()
- return transform(envelope, envelope.message)
- }
-
-
- override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? {
- val send = prepare(Timeout, process, process, timeout).also { schedule(it) }
-
- try {
- val received = receiveEnvelope()
-
- if (received.message == Timeout) {
- send.canceled = true
- return transform(received, received.message)
- }
-
- return null
- } finally {
- send.canceled = true
- }
- }
-
- override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay)
-
- override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) =
- schedule(prepare(msg, this, sender, delay))
-
- override suspend fun Entity<*, *>.interrupt() = send(Interrupt)
-
- override suspend fun hold(duration: Duration) {
- require(duration >= 0) { "The amount of time to hold must be a positive number" }
- val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
-
- try {
- while (true) {
- if (receive() == Resume)
- return
- }
- } finally {
- envelope.canceled = true
- }
- }
-
- override suspend fun hold(duration: Duration, queue: Queue<Any>) {
- require(duration >= 0) { "The amount of time to hold must be a positive number" }
- val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
-
- try {
- while (true) {
- val msg = receive()
- if (msg == Resume)
- return
- queue.add(msg)
- }
- } finally {
- envelope.canceled = true
- }
- }
-
-
- // Completion continuation implementation
- /**
- * Resume the execution of this continuation with the given value.
- *
- * @param value The value to resume with.
- */
- override fun resume(value: Unit) {
- // Deregister process from registry in order to have the GC collect this context
- registry.remove(process)
- }
-
- /**
- * Resume the execution of this continuation with an exception.
- *
- * @param exception The exception to resume with.
- */
- override fun resumeWithException(exception: Throwable) {
- // Deregister process from registry in order to have the GC collect this context:w
- registry.remove(process)
-
- logger.error(exception) { "An exception occurred during the execution of a process" }
- }
- }
+ override fun <M> create(bootstrap: Bootstrap<M>): Simulation<M> = OmegaSimulation(bootstrap)
}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt
deleted file mode 100644
index 139cbd19..00000000
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Bootstrap
-import com.atlarge.opendc.simulator.kernel.Kernel
-import com.atlarge.opendc.simulator.kernel.KernelFactory
-
-/**
- * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
- *
- * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
- * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-object OmegaKernelFactory : KernelFactory {
- /**
- * Create a simulation over the given model facilitated by this simulation kernel.
- *
- * @param bootstrap The bootstrap procedure to bootstrap the simulation with.
- * @return A [Kernel] instance to control the simulation.
- */
- override fun <M> create(bootstrap: Bootstrap<M>): Kernel<M> = OmegaKernel(bootstrap)
-}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
new file mode 100644
index 00000000..80ac4600
--- /dev/null
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -0,0 +1,331 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.omega
+
+import com.atlarge.opendc.simulator.*
+import com.atlarge.opendc.simulator.kernel.Simulation
+import mu.KotlinLogging
+import java.util.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
+ *
+ * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
+ * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
+ *
+ * @property model The model that is simulated.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Bootstrap.Context<M> {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The registry of the simulation kernels used in the experiment.
+ */
+ private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap()
+
+ /**
+ * The message queue.
+ */
+ private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
+
+ /**
+ * The processes to be spawned.
+ */
+ private val spawnings: Queue<Process<*, M>> = ArrayDeque()
+
+ /**
+ * The simulation time.
+ */
+ override var time: Instant = 0
+
+ /**
+ * The model of simulation.
+ */
+ override val model: M = bootstrap.apply(this)
+
+ override val <E : Entity<S, *>, S> E.state: S
+ get() = context?.state ?: initialState
+
+ /**
+ * The context associated with an [Entity].
+ */
+ @Suppress("UNCHECKED_CAST")
+ private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
+ get() = registry[this] as? OmegaContext<S>
+
+ override fun register(entity: Entity<*, M>): Boolean {
+ if (!registry.containsKey(entity) && entity !is Process) {
+ return false
+ }
+
+ val process = entity as Process<*, M>
+ spawnings.add(process)
+ return true
+ }
+
+ override fun deregister(entity: Entity<*, M>): Boolean {
+ val context = entity.context ?: return false
+ context.resume(Unit)
+ return true
+ }
+
+ override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) =
+ schedule(prepare(message, destination, sender, delay))
+
+ override fun step() {
+ while (true) {
+ // Initialise all spawned processes
+ while (spawnings.isNotEmpty()) {
+ val process = spawnings.poll()
+ val context = OmegaContext(process).also { registry[process] = it }
+
+ // Bootstrap the process coroutine
+ val block: suspend () -> Unit = { context.start() }
+ block.startCoroutine(context)
+
+ }
+
+ val envelope = queue.peek() ?: return
+ val delivery = envelope.time
+
+ if (delivery > time) {
+ // Tick has yet to occur
+ // Jump in time to next event
+ time = delivery
+ break
+ } else if (delivery < time) {
+ // Tick has already occurred
+ logger.warn { "Message processed out of order" }
+ }
+
+ queue.poll()
+
+ // If the sender has canceled the message, we move on to the next message
+ if (envelope.canceled) {
+ continue
+ }
+
+ val context = envelope.destination.context ?: continue
+
+ if (envelope.message !is Interrupt) {
+ context.continuation.resume(envelope)
+ } else {
+ context.continuation.resumeWithException(envelope.message)
+ }
+
+ context.last = time
+ }
+ }
+
+
+ override fun run() {
+ while (queue.isNotEmpty() || spawnings.isNotEmpty()) {
+ step()
+ }
+ }
+
+ override fun run(until: Instant) {
+ require(until > 0) { "The given instant must be a non-zero positive number" }
+
+ if (time >= until) {
+ return
+ }
+
+ while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) {
+ step()
+ }
+
+ // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at
+ // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will
+ // just jump forward again.
+ if (time > until) {
+ time = until
+ }
+ }
+
+ private fun schedule(envelope: MessageContainer) {
+ queue.add(envelope)
+ }
+
+ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
+ delay: Duration): MessageContainer {
+ require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
+ return MessageContainer(message, time + delay, sender, destination)
+ }
+
+ /**
+ * This internal class provides the default implementation for the [Context] interface for this simulator.
+ */
+ private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> {
+ /**
+ * The continuation to resume the execution of the process.
+ */
+ lateinit var continuation: Continuation<Envelope<*>>
+
+ /**
+ * The last point in time the process has done some work.
+ */
+ var last: Instant = -1
+
+ /**
+ * The model in which the process exists.
+ */
+ override val model: M
+ get() = this@OmegaSimulation.model
+
+ /**
+ * The state of the entity.
+ */
+ override var state: S = process.initialState
+
+ /**
+ * The current point in simulation time.
+ */
+ override val time: Instant
+ get() = this@OmegaSimulation.time
+
+ /**
+ * The duration between the current point in simulation time and the last point in simulation time where the
+ * [Context] has executed some work.
+ */
+ override val delta: Duration
+ get() = maxOf(time - last, 0)
+
+ /**
+ * The [CoroutineContext] for a [Context].
+ */
+ override val context: CoroutineContext = EmptyCoroutineContext
+
+ /**
+ * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
+ */
+ override val <T : Entity<S, *>, S> T.state: S
+ get() = context?.state ?: initialState
+
+ /**
+ * Start the process associated with this context.
+ */
+ internal suspend fun start() = process.run {
+ run()
+ }
+
+ /**
+ * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
+ * message has been received.
+ *
+ * @return The envelope containing the message.
+ */
+ suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it }
+
+ override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T {
+ val envelope = receiveEnvelope()
+ return transform(envelope, envelope.message)
+ }
+
+
+ override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? {
+ val send = prepare(Timeout, process, process, timeout).also { schedule(it) }
+
+ try {
+ val received = receiveEnvelope()
+
+ if (received.message == Timeout) {
+ send.canceled = true
+ return transform(received, received.message)
+ }
+
+ return null
+ } finally {
+ send.canceled = true
+ }
+ }
+
+ override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay)
+
+ override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) =
+ schedule(prepare(msg, this, sender, delay))
+
+ override suspend fun Entity<*, *>.interrupt() = send(Interrupt)
+
+ override suspend fun hold(duration: Duration) {
+ require(duration >= 0) { "The amount of time to hold must be a positive number" }
+ val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
+
+ try {
+ while (true) {
+ if (receive() == Resume)
+ return
+ }
+ } finally {
+ envelope.canceled = true
+ }
+ }
+
+ override suspend fun hold(duration: Duration, queue: Queue<Any>) {
+ require(duration >= 0) { "The amount of time to hold must be a positive number" }
+ val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
+
+ try {
+ while (true) {
+ val msg = receive()
+ if (msg == Resume)
+ return
+ queue.add(msg)
+ }
+ } finally {
+ envelope.canceled = true
+ }
+ }
+
+
+ // Completion continuation implementation
+ /**
+ * Resume the execution of this continuation with the given value.
+ *
+ * @param value The value to resume with.
+ */
+ override fun resume(value: Unit) {
+ // Deregister process from registry in order to have the GC collect this context
+ registry.remove(process)
+ }
+
+ /**
+ * Resume the execution of this continuation with an exception.
+ *
+ * @param exception The exception to resume with.
+ */
+ override fun resumeWithException(exception: Throwable) {
+ // Deregister process from registry in order to have the GC collect this context:w
+ registry.remove(process)
+
+ logger.error(exception) { "An exception occurred during the execution of a process" }
+ }
+ }
+}
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index 48b4f02a..74fa686b 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -66,8 +66,8 @@ internal class SmokeTest {
}
}
}
- val kernel = OmegaKernelFactory.create(bootstrap)
- kernel.run()
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run()
}
object NullProcess : Process<Unit, Unit> {
@@ -88,8 +88,8 @@ internal class SmokeTest {
}
}
- val kernel = OmegaKernelFactory.create(bootstrap)
- kernel.run()
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run()
}
object CrashProcess : Process<Unit, Unit> {
@@ -112,8 +112,8 @@ internal class SmokeTest {
}
}
- val kernel = OmegaKernelFactory.create(bootstrap)
- kernel.run()
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run()
}
class ModelProcess(private val value: Int) : Process<Boolean, Int> {
@@ -136,9 +136,9 @@ internal class SmokeTest {
value
}
- val kernel = OmegaKernelFactory.create(bootstrap)
- kernel.run(5)
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run(5)
- assertTrue(kernel.run { process.state })
+ assertTrue(simulation.run { process.state })
}
}
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt
index 8c32c54d..0a0c792c 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt
@@ -21,20 +21,20 @@ class JpaBootstrap(val experiment: Experiment) : Bootstrap<JpaModel> {
/**
* Bootstrap a model `M` for a kernel in the given context.
*
- * @param context The context to bootstrap to model in.
+ * @param context The context to apply to model in.
* @return The initialised model for the simulation.
*/
- override fun bootstrap(context: Bootstrap.Context<JpaModel>): JpaModel {
+ override fun apply(context: Bootstrap.Context<JpaModel>): JpaModel {
val section = experiment.path.sections.first()
- // TODO We should not modify parts of the experiment in a bootstrap as the bootstrap should be reproducible.
+ // TODO We should not modify parts of the experiment in a apply as the apply should be reproducible.
// Important: initialise the scheduler of the datacenter
section.datacenter.scheduler = experiment.scheduler
val topology = JpaTopologyFactory(section)
.create()
.bootstrap()
- .bootstrap(context)
+ .apply(context)
val trace = experiment.trace
val tasks = trace.jobs.flatMap { it.tasks }
diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt
index 74f96ccb..4e88b3d4 100644
--- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt
+++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt
@@ -34,7 +34,7 @@ import com.atlarge.opendc.model.odc.topology.container.Room
import com.atlarge.opendc.model.odc.topology.machine.Machine
import com.atlarge.opendc.model.topology.destinations
import com.atlarge.opendc.simulator.Duration
-import com.atlarge.opendc.simulator.kernel.KernelFactory
+import com.atlarge.opendc.simulator.kernel.Kernel
import com.atlarge.opendc.simulator.platform.Experiment
import mu.KotlinLogging
import java.io.Closeable
@@ -65,7 +65,7 @@ class JpaExperiment(private val manager: EntityManager,
* @param timeout The maximum duration of the experiment before returning to the caller.
* @return The result of the experiment or `null`.
*/
- override fun run(factory: KernelFactory, timeout: Duration): Unit? {
+ override fun run(factory: Kernel, timeout: Duration): Unit? {
if (experiment.state != ExperimentState.CLAIMED) {
throw IllegalStateException("The experiment is in illegal state ${experiment.state}")
}
@@ -136,7 +136,7 @@ class JpaExperiment(private val manager: EntityManager,
experiment.state = ExperimentState.FINISHED
}
- logger.info { "Kernel done" }
+ logger.info { "Simulation done" }
val waiting: Long = tasks.fold(0.toLong()) { acc, task ->
val finished = task.state as TaskState.Finished
acc + (finished.previous.at - finished.previous.previous.at)
@@ -165,7 +165,7 @@ class JpaExperiment(private val manager: EntityManager,
* @param factory The factory to create the simulation kernel with.
* @throws IllegalStateException if the simulation is already running or finished.
*/
- override fun run(factory: KernelFactory) = run(factory, -1)!!
+ override fun run(factory: Kernel) = run(factory, -1)!!
/**
* Closes this resource, relinquishing any underlying resources.
diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
index eb819a5b..13f322bc 100644
--- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
+++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
@@ -24,8 +24,7 @@
package com.atlarge.opendc.model.odc.platform
-import com.atlarge.opendc.model.odc.platform.JpaExperimentManager
-import com.atlarge.opendc.omega.OmegaKernelFactory
+import com.atlarge.opendc.omega.OmegaKernel
import mu.KotlinLogging
import java.util.concurrent.Executors
import javax.persistence.Persistence
@@ -50,7 +49,7 @@ fun main(args: Array<String>) {
val threads = 4
val executorService = Executors.newFixedThreadPool(threads)
val experiments = JpaExperimentManager(factory)
- val kernel = OmegaKernelFactory
+ val kernel = OmegaKernel
logger.info { "Waiting for enqueued experiments..." }
while (true) {
diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt
index 1a5bbcaf..e0b54a28 100644
--- a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt
+++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt
@@ -6,7 +6,7 @@ import com.atlarge.opendc.simulator.Entity
/**
* Create a [Bootstrap] procedure for the given [Topology].
*
- * @return A bootstrap procedure for the topology.
+ * @return A apply procedure for the topology.
*/
fun <T : Topology> T.bootstrap(): Bootstrap<T> = Bootstrap.create { ctx ->
forEach { ctx.register(it) }