summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt43
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt54
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt31
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulation.kt95
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt16
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt15
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Receipt.kt53
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt17
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Clock.kt79
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/TickClock.kt60
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Time.kt (renamed from opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt)19
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/platform/Experiment.kt (renamed from opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Clock.kt)17
-rw-r--r--opendc-integration-jpa/build.gradle87
-rw-r--r--opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt68
-rw-r--r--opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt244
-rw-r--r--opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt344
-rw-r--r--opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt8
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt41
18 files changed, 948 insertions, 343 deletions
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt
new file mode 100644
index 00000000..83100587
--- /dev/null
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.extension
+
+import nl.atlarge.opendc.topology.Edge
+
+/**
+ * Filter a [Set] of [Edge]s based on the tag of the edges and return the origin nodes casted to type `T`.
+ *
+ * @param tag The tag of the edges to get.
+ * @return An [Iterable] of the specified type `T` with the given tag.
+ */
+inline fun <reified T> Set<Edge<*>>.origins(tag: String) = filter { it.tag == tag }.map { it.from as T }
+
+/**
+ * Filter a [Set] of [Edge]s based on the tag of the edges and return the destination nodes casted to type `T`.
+ *
+ * @param tag The tag of the edges to get.
+ * @return An [Iterable] of the specified type `T` with the given tag.
+ */
+inline fun <reified T> Set<Edge<*>>.destinations(tag: String) = filter { it.tag == tag }.map { it.to as T }
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
index 600a9cee..46cb271e 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
@@ -26,9 +26,13 @@ package nl.atlarge.opendc.kernel
import nl.atlarge.opendc.kernel.messaging.Readable
import nl.atlarge.opendc.kernel.messaging.Writable
+import nl.atlarge.opendc.kernel.time.Clock
+import nl.atlarge.opendc.kernel.time.Duration
+import nl.atlarge.opendc.kernel.time.Instant
import nl.atlarge.opendc.topology.Entity
import nl.atlarge.opendc.topology.Topology
import nl.atlarge.opendc.topology.TopologyContext
+import java.lang.Process
/**
* This interface provides a context for simulation [Process]es, which defines the environment in which the simulation
@@ -38,19 +42,25 @@ import nl.atlarge.opendc.topology.TopologyContext
*/
interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext {
/**
+ * The [Entity] in simulation by the [Process].
+ */
+ val entity: E
+
+ /**
* The [Topology] over which the simulation is run.
*/
val topology: Topology
/**
- * The global [Clock] that keeps track of the simulation time.
+ * The current point in simulation time.
*/
- val clock: Clock
+ val time: Instant
/**
- * The [Entity] in simulation by the [Process].
+ * The duration between the current point in simulation time and the last point in simulation time where the
+ * [Process] has executed some work. This means the `run()` co-routine has been resumed.
*/
- val entity: E
+ val delta: Duration
/**
* The observable state of an [Entity] in simulation, which is provided by the simulation context.
@@ -58,29 +68,39 @@ interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext {
val <E : Entity<S>, S> E.state: S
/**
- * Interrupt the [Process] of an [Entity] in simulation.
+ * Update the observable state of the entity being simulated.
+ *
+ * Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects
+ * referencing the old entity having their data changed.
+ *
+ * @param next The next state of the entity.
*/
- suspend fun Entity<*>.interrupt()
+ suspend fun <C : Context<E>, E : Entity<S>, S> C.update(next: S)
/**
- * Suspend the [Process] of the [Entity] in simulation until the next tick has occurred in the simulation.
+ * Interrupt the [Process] of an [Entity] in simulation.
+ *
+ * If a [Process] has been suspended, the suspending call will throw an [Interrupt] object as a result of this call.
+ * Make sure the [Process] actually has error handling in place, so it won't take down the whole [Process].
*/
- suspend fun tick(): Boolean
+ suspend fun Entity<*>.interrupt()
/**
- * Suspend the [Process] of the [Entity] in simulation for <code>n</code> ticks before resuming execution.
+ * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming
+ * execution.
+ *
+ * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend
+ * the process until the no more messages at an earlier point in time have to be processed.
*
- * @param n The amount of ticks to suspend the process.
+ * @param duration The duration of simulation time to wait before resuming execution.
*/
- suspend fun wait(n: Int)
+ suspend fun wait(duration: Duration)
/**
- * Update the observable state of the entity being simulated.
- *
- * Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects
- * referencing the old entity having their data changed.
+ * Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the
+ * [Clock].
*
- * @param next The next state of the entity.
+ * @return `true` to allow usage in while statements.
*/
- suspend fun <C : Context<E>, E : Entity<S>, S> C.update(next: S)
+ suspend fun tick(): Boolean
}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
index 9678db41..524c9131 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
@@ -24,45 +24,26 @@
package nl.atlarge.opendc.kernel
-import nl.atlarge.opendc.topology.Entity
import nl.atlarge.opendc.topology.Topology
-import java.lang.Process
/**
* A message-based discrete event simulator (DES). This interface allows running simulations over a [Topology].
* This discrete event simulator works by having entities in a [Topology] interchange messages between each other and
* updating their observable state accordingly.
*
- * In order to run a simulation, a kernel needs to bootstrapped by an initial set of messages to be processed by
- * entities in the topology of the simulation. Otherwise, the simulation will immediately exit.
- *
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
interface Kernel {
/**
- * The [Topology] over which the simulation is run.
- */
- val topology: Topology
-
- /**
- * Step through one cycle in the simulation. This method will process all events in a single tick, update the
- * internal clock and then return the control to the user.
- */
- fun step()
-
- /**
- * Run a simulation over the specified [Topology].
- * This method will step through multiple cycles in the simulation until no more message exist in the queue.
+ * The name of the kernel.
*/
- fun run()
+ val name: String
/**
- * Schedule a message for processing by a [Process].
+ * Create a new [Simulation] of the given [Topology] that is facilitated by this simulation kernel.
*
- * @param message The message to schedule.
- * @param destination The destination of the message.
- * @param sender The sender of the message.
- * @param delay The amount of ticks to wait before processing the message.
+ * @param topology The [Topology] to create a [Simulation] of.
+ * @return A [Simulation] instance.
*/
- fun schedule(message: Any?, destination: Entity<*>, sender: Entity<*>? = null, delay: Int = 0)
+ fun create(topology: Topology): Simulation
}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulation.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulation.kt
new file mode 100644
index 00000000..32b45ca2
--- /dev/null
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulation.kt
@@ -0,0 +1,95 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.kernel
+
+import nl.atlarge.opendc.kernel.messaging.Receipt
+import nl.atlarge.opendc.kernel.time.Clock
+import nl.atlarge.opendc.kernel.time.Duration
+import nl.atlarge.opendc.kernel.time.Instant
+import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.Topology
+import java.lang.Process
+
+/**
+ * A message based discrete event simulation facilitated by a simulation [Kernel].
+ *
+ * In order for the simulation to run, the simulation kernel needs to bootstrapped by an set of messages to be processed
+ * initially by entities in the topology of the simulation. Otherwise, the simulation will immediately exit.
+ * Bootstrapping can be achieved by scheduling messages before running the simulation via [Simulation.schedule]:
+ *
+ * `val simulation = kernel.create(topology).apply {
+ * schedule(Boot, entity)
+ * }`
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+interface Simulation {
+ /**
+ * The [Kernel] that facilitates the simulation.
+ */
+ val kernel: Kernel
+
+ /**
+ * The [Topology] over which the simulation is run.
+ */
+ val topology: Topology
+
+ /**
+ * The [Clock] instance that keeps track of simulation time.
+ */
+ val clock: Clock
+
+ /**
+ * Step through one cycle in the simulation. This method will process all events in a single tick, update the
+ * internal clock and then return the control to the user.
+ */
+ fun step()
+
+ /**
+ * Run a simulation over the specified [Topology].
+ * This method will step through multiple cycles in the simulation until no more message exist in the queue.
+ */
+ fun run()
+
+ /**
+ * Run a simulation over the specified [Topology], stepping through cycles until the specified clock tick has
+ * occurred. The control is then handed back to the user.
+ *
+ * @param until The point in simulation time at which the simulation should be paused and the control is handed
+ * back to the user.
+ */
+ fun run(until: Instant)
+
+ /**
+ * Schedule a message for processing by a [Process].
+ *
+ * @param message The message to schedule.
+ * @param destination The destination of the message.
+ * @param sender The sender of the message.
+ * @param delay The amount of time to wait before processing the message.
+ * @return A [Receipt] of the message that has been scheduled.
+ */
+ fun schedule(message: Any, destination: Entity<*>, sender: Entity<*>? = null, delay: Duration = 0): Receipt
+}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt
index 61d1a0cf..608d325f 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt
@@ -24,33 +24,27 @@
package nl.atlarge.opendc.kernel.messaging
-import nl.atlarge.opendc.kernel.Tick
import nl.atlarge.opendc.topology.Entity
/**
- * The envelope of a message that is received from a [Channel], also containing the metadata of the message.
+ * The envelope of a message that is sent to an [Entity], also containing the metadata of the message.
*
* @param T The shape of the message inside the envelope.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-data class Envelope<out T>(
+interface Envelope<out T: Any> {
/**
* The message in this envelope.
*/
- val message: T,
-
- /**
- * The tick at which the message should be delivered.
- */
- val tick: Tick,
+ val message: T
/**
* The sender of the message.
*/
- val sender: Entity<*>?,
+ val sender: Entity<*>?
/**
* The destination of the message.
*/
val destination: Entity<*>
-)
+}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt
index 422c5668..772d9013 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt
@@ -25,23 +25,26 @@
package nl.atlarge.opendc.kernel.messaging
/**
- * A [Readable] instance allows objects to pull messages from the instance.
+ * A [Readable] instance has a mailbox associated with the instance to which objects can send messages, which can be
+ * received by the class.
*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
interface Readable {
/**
- * Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
+ * Retrieve and removes a single message from the entity's mailbox, suspending the function if the mailbox is empty.
+ * The execution is resumed after the message has landed in the entity's mailbox after which the message [Envelope]
+ * is mapped through `block` to generate a processed message.
*
* @param block The block to process the message with.
* @return The processed message.
*/
- suspend fun <T> receive(block: Envelope<*>.(Any?) -> T): T
+ suspend fun <T> receive(block: Envelope<*>.(Any) -> T): T
/**
- * Retrieve a single message from this [Channel].
+ * Retrieve and removes a single message from the entity's mailbox, suspending the function if the mailbox is empty.
*
- * @return The message that was received from the channel
+ * @return The message that was received from the entity's mailbox.
*/
- suspend fun receive(): Any? = receive { it }
+ suspend fun receive(): Any = receive { it }
}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Receipt.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Receipt.kt
new file mode 100644
index 00000000..74433f5e
--- /dev/null
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Receipt.kt
@@ -0,0 +1,53 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.kernel.messaging
+
+import nl.atlarge.opendc.kernel.Kernel
+import nl.atlarge.opendc.topology.Entity
+
+/**
+ * A receipt of a message that has been scheduled by a simulation [Kernel]. This interface allows the cancellation of a
+ * message that has been scheduled for delivery and for checking the status of a delivery.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+interface Receipt {
+ /**
+ * A flag to indicate the message has been canceled.
+ */
+ val canceled: Boolean
+
+ /**
+ * A flag to indicate the message has been delivered.
+ */
+ val delivered: Boolean
+
+ /**
+ * Cancel the message to prevent it from being received by an [Entity].
+ *
+ * @throws IllegalStateException if the message has already been delivered.
+ */
+ fun cancel()
+}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt
index 45c81e39..0d2b2725 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt
@@ -24,6 +24,7 @@
package nl.atlarge.opendc.kernel.messaging
+import nl.atlarge.opendc.kernel.time.Duration
import nl.atlarge.opendc.topology.Entity
/**
@@ -33,11 +34,21 @@ import nl.atlarge.opendc.topology.Entity
*/
interface Writable {
/**
- * Send the given message downstream.
+ * Send the given message to the specified entity.
+ *
+ * @param msg The message to send.
+ * @param delay The amount of time to wait before the message should be received.
+ * @return A [Receipt] of the message that has been sent.
+ */
+ suspend fun Entity<*>.send(msg: Any, delay: Duration = 0): Receipt
+
+ /**
+ * Send the given message to the specified entity.
*
* @param msg The message to send.
* @param sender The sender of the message.
- * @param delay The number of ticks before the message should be received.
+ * @param delay The amount of time to wait before the message should be received.
+ * @return A [Receipt] of the message that has been sent.
*/
- suspend fun Entity<*>.send(msg: Any?, sender: Entity<*>? = null, delay: Int = 0)
+ suspend fun Entity<*>.send(msg: Any, sender: Entity<*>, delay: Duration = 0): Receipt
}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Clock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Clock.kt
new file mode 100644
index 00000000..f03a98fa
--- /dev/null
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Clock.kt
@@ -0,0 +1,79 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.kernel.time
+
+import nl.atlarge.opendc.kernel.Simulation
+
+/**
+ * A clock controls and provides access to the simulation time of a [Simulation].
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+interface Clock {
+ /**
+ * The moment in time the clock is currently at.
+ */
+ val now: Instant
+
+ /**
+ * The duration of a tick in this clock. This is an arbitrary duration of time in which entities in simulation
+ * perform some defined amount of work.
+ */
+ val tick: Duration
+
+ /**
+ * Advance the clock by the given duration.
+ *
+ * @param duration The duration to advance the clock by.
+ */
+ fun advance(duration: Duration) {
+ require(duration >= 0) { "The duration to advance the clock must not be a negative number" }
+ advanceTo(now + duration)
+ }
+
+ /**
+ * Rewind the clock by the given duration.
+ *
+ * @param duration The duration to rewind the clock by.
+ */
+ fun rewind(duration: Duration) {
+ require(duration >= 0) { "The duration to rewind the clock must not be a negative number" }
+ rewindTo(now - duration)
+ }
+
+ /**
+ * Rewind the clock to the given point in time.
+ *
+ * @param instant The point in time to rewind the clock to.
+ */
+ fun rewindTo(instant: Instant)
+
+ /**
+ * Advance the clock to the given point in time.
+ *
+ * @param instant The point in time to advance the clock to.
+ */
+ fun advanceTo(instant: Instant)
+}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/TickClock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/TickClock.kt
new file mode 100644
index 00000000..d960f454
--- /dev/null
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/TickClock.kt
@@ -0,0 +1,60 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.kernel.time
+
+/**
+ * A tick based clock which divides time into a discrete interval of points.
+ *
+ * @param initial The initial point in time of the clock.
+ * @param tick The duration of a tick.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+class TickClock(initial: Instant = 0, override val tick: Duration = 1) : Clock {
+ /**
+ * The moment in time the clock is currently at.
+ */
+ override var now: Instant = initial
+ private set
+
+ /**
+ * Advance the clock to the given point in time.
+ *
+ * @param instant The moment in time to advance the clock to.
+ */
+ override fun advanceTo(instant: Instant) {
+ require(instant >= now) { "The point to advance to must be at the same point or further than now" }
+ now = instant
+ }
+
+ /**
+ * Rewind the clock to the given point in time.
+ *
+ * @param instant The point in time to rewind the clock to.
+ */
+ override fun rewindTo(instant: Instant) {
+ require(now >= instant) { "The point to rewind to must be before the current point in time" }
+ now = instant
+ }
+}
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Time.kt
index c84f4dbf..af9d547b 100644
--- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Time.kt
@@ -22,19 +22,14 @@
* SOFTWARE.
*/
-package nl.atlarge.opendc.kernel.omega
+package nl.atlarge.opendc.kernel.time
-import nl.atlarge.opendc.kernel.Clock
-import nl.atlarge.opendc.kernel.Tick
+/**
+ * An instantaneous point on the time-line, used to record event time-stamps in a simulation.
+ */
+typealias Instant = Long
/**
- * A [Clock] implementation used by the Omega simulation kernel.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ * A time interval which represents the amount of elapsed time between two events.
*/
-class OmegaClock: Clock {
- /**
- * The simulation time expressed as the amount of ticks that passed.
- */
- override var tick: Tick = 0
-}
+typealias Duration = Long
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Clock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/platform/Experiment.kt
index 0328bbe6..225b2813 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Clock.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/platform/Experiment.kt
@@ -22,21 +22,20 @@
* SOFTWARE.
*/
-package nl.atlarge.opendc.kernel
+package nl.atlarge.opendc.platform
-/**
- * A tick represents a moment of time in which some work is done by an entity.
- */
-typealias Tick = Long
+import nl.atlarge.opendc.kernel.Kernel
/**
- * The clock of a simulation manages the simulation time of a simulation [Kernel].
+ * A blueprint for a reproducible simulation in a pre-defined setting.
*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-interface Clock {
+interface Experiment<out T> {
/**
- * The tick the clock is currently at.
+ * Run the experiment on the specified simulation [Kernel].
+ *
+ * @param kernel The simulation kernel to run the experiment.
*/
- val tick: Tick
+ fun run(kernel: Kernel): T
}
diff --git a/opendc-integration-jpa/build.gradle b/opendc-integration-jpa/build.gradle
new file mode 100644
index 00000000..89b5740d
--- /dev/null
+++ b/opendc-integration-jpa/build.gradle
@@ -0,0 +1,87 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+/* Build configuration */
+buildscript {
+ ext.kotlin_version = '1.1.4-3'
+ ext.dokka_version = '0.9.15'
+
+ repositories {
+ mavenCentral()
+ jcenter()
+ }
+
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ classpath "org.jetbrains.dokka:dokka-gradle-plugin:$dokka_version"
+ classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0-RC3'
+ }
+}
+
+apply plugin: 'java'
+apply plugin: 'kotlin'
+apply plugin: 'org.jetbrains.dokka'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+compileKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
+
+compileTestKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
+
+kotlin {
+ experimental {
+ coroutines 'enable'
+ }
+}
+
+dokka {
+ outputFormat = 'html'
+ outputDirectory = "$buildDir/javadoc"
+}
+
+/* Project configuration */
+group 'nl.atlarge.opendc'
+version '1.0'
+
+repositories {
+ jcenter()
+}
+
+dependencies {
+ compile project(':opendc-core')
+ compile "io.github.microutils:kotlin-logging:1.4.6"
+
+ testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3"
+ testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3"
+ testCompile "org.junit.platform:junit-platform-launcher:1.0.0-RC3"
+ testCompile "org.slf4j:slf4j-simple:1.7.25"
+ testCompile project(':opendc-stdlib')
+}
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt
new file mode 100644
index 00000000..1554a9e6
--- /dev/null
+++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt
@@ -0,0 +1,68 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.kernel.omega
+
+import nl.atlarge.opendc.kernel.messaging.Envelope
+import nl.atlarge.opendc.kernel.messaging.Receipt
+import nl.atlarge.opendc.kernel.time.Instant
+import nl.atlarge.opendc.topology.Entity
+
+/**
+ * A wrapper around a message that has been scheduled for processing.
+ *
+ * @property message The message to wrap.
+ * @property time The point in time to deliver the message.
+ * @property sender The sender of the message.
+ * @property destination The destination of the message.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal data class MessageContainer(override val message: Any,
+ val time: Instant,
+ override val sender: Entity<*>?,
+ override val destination: Entity<*>) : Envelope<Any>, Receipt {
+ /**
+ * A flag to indicate the message has been canceled.
+ */
+ override var canceled: Boolean = false
+
+ /**
+ * A flag to indicate the message has been delivered.
+ */
+ override var delivered: Boolean = false
+
+ /**
+ * Cancel the message to prevent it from being received by an [Entity].
+ *
+ * @throws IllegalStateException if the message has already been delivered.
+ */
+ override fun cancel() {
+ if (delivered) {
+ throw IllegalStateException("The message has already been delivered")
+ }
+
+ canceled = true
+ }
+
+}
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt
index 631b6d45..5367e674 100644
--- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt
+++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt
@@ -24,14 +24,11 @@
package nl.atlarge.opendc.kernel.omega
-import mu.KotlinLogging
-import nl.atlarge.opendc.kernel.*
-import nl.atlarge.opendc.kernel.messaging.Envelope
+import nl.atlarge.opendc.kernel.Kernel
+import nl.atlarge.opendc.kernel.Process
+import nl.atlarge.opendc.kernel.Simulation
import nl.atlarge.opendc.topology.Entity
import nl.atlarge.opendc.topology.Topology
-import nl.atlarge.opendc.topology.TopologyContext
-import java.util.*
-import kotlin.coroutines.experimental.*
/**
* The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
@@ -42,240 +39,19 @@ import kotlin.coroutines.experimental.*
* By default, [Process]s are resolved as part of the [Topology], meaning each [Entity] in the topology should also
* implement its simulation behaviour by deriving from the [Process] interface.
*
- * @param topology The topology to run the simulation over.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class OmegaKernel(override val topology: Topology) : Kernel {
+object OmegaKernel : Kernel {
/**
- * The logger instance to use for the simulator.
+ * The name of the kernel.
*/
- private val logger = KotlinLogging.logger {}
+ override val name: String = "opendc-omega"
/**
- * The registry of the simulation kernels used in the experiment.
- */
- private val registry: MutableMap<Entity<*>, OmegaContext<*, *>> = HashMap()
-
- /**
- * The message queue.
- */
- private val queue: PriorityQueue<Envelope<*>> = PriorityQueue(Comparator.comparingLong(Envelope<*>::tick))
-
- /**
- * The clock of the simulator.
- */
- private val clock: OmegaClock = OmegaClock()
-
- /**
- * Initialise the simulator.
- */
- init {
- topology.forEach { resolve(it) }
- registry.values.forEach { context ->
- @Suppress("UNCHECKED_CAST")
- val process = context.entity as Process<Entity<*>>
-
- // Start all process co-routines
- val block: suspend () -> Unit = { process.run { context.run() } }
- block.startCoroutine(context)
- }
- }
-
- /**
- * Step through one event in the simulation.
- */
- override fun step() {
- while (true) {
- val envelope = queue.peek() ?: return
- val tick = envelope.tick
-
- if (tick > clock.tick) {
- // Tick has yet to occur
- // Jump in time to next event
- clock.tick = tick
- break
- } else if (tick < clock.tick) {
- // Tick has already occurred
- logger.warn { "message processed out of order" }
- }
- queue.poll()
-
- val context = registry[envelope.destination] ?: continue
-
- if (envelope.message !is Interrupt) {
- context.continuation.resume(envelope)
- } else {
- context.continuation.resumeWithException(envelope.message as Interrupt)
- }
- }
- }
-
- /**
- * Run a simulation over the specified [Topology].
- * This method will step through multiple cycles in the simulation until no more message exist in the queue.
- */
- override fun run() {
- while (queue.isNotEmpty()) {
- step()
- }
- }
-
- /**
- * Schedule a message for processing by a [Process].
+ * Create a new [Simulation] of the given [Topology] that is facilitated by this simulation kernel.
*
- * @param message The message to schedule.
- * @param destination The destination of the message.
- * @param sender The sender of the message.
- * @param delay The amount of ticks to wait before processing the message.
+ * @param topology The [Topology] to create a [Simulation] of.
+ * @return A [Simulation] instance.
*/
- override fun schedule(message: Any?, destination: Entity<*>, sender: Entity<*>?, delay: Int) {
- require(delay > 0) { "The amount of ticks to delay the message must be a positive number" }
- queue.add(Envelope(message, clock.tick + delay, sender, destination))
- }
-
- /**
- * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network.
- *
- * @param entity The [Entity] to resolve the [Context] for.
- * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated
- * with it.
- */
- private fun <E : Entity<*>> resolve(entity: E): Context<E>? {
- if (entity !is Process<*>)
- return null
-
- @Suppress("UNCHECKED_CAST")
- return registry.computeIfAbsent(entity, {
- OmegaContext(entity)
- }) as Context<E>
- }
-
- /**
- * This internal class provides the default implementation for the [Context] interface for this simulator.
- */
- private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>,
- Continuation<Unit>, TopologyContext by topology {
- /**
- * The continuation to resume the execution of the process.
- */
- lateinit var continuation: Continuation<Envelope<*>>
-
- /**
- * The state of the entity.
- */
- var state: S = entity.initialState
-
- /**
- * The [Topology] over which the simulation is run.
- */
- override val topology: Topology = this@OmegaKernel.topology
-
- /**
- * The global [Clock] that keeps track of the simulation time.
- */
- override val clock: Clock = this@OmegaKernel.clock
-
- /**
- * The [CoroutineContext] for a [Process].
- */
- override val context: CoroutineContext = EmptyCoroutineContext
-
- /**
- * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
- */
- @Suppress("UNCHECKED_CAST")
- override val <T : Entity<S>, S> T.state: S
- get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState
-
- /**
- * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the
- * message has been received.
- *
- * @return The envelope containing the message.
- */
- suspend fun receiveEnvelope(): Envelope<*> {
- return suspendCoroutine { continuation = it }
- }
-
- /**
- * Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
- *
- * @param block The block to process the message with.
- * @return The processed message.
- */
- suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T {
- val envelope = receiveEnvelope()
- return block(envelope, envelope.message)
- }
-
- /**
- * Send the given message downstream.
- *
- * @param msg The message to send.
- * @param sender The sender of the message.
- * @param delay The number of ticks before the message should be received.
- */
- suspend override fun Entity<*>.send(msg: Any?, sender: Entity<*>?, delay: Int) {
- schedule(msg, this, sender, delay)
- }
-
- /**
- * Send an interruption message to the given [Entity].
- */
- suspend override fun Entity<*>.interrupt() = send(Interrupt, this)
-
- /**
- * Suspend the simulation kernel until the next tick occurs in the simulation.
- */
- suspend override fun tick(): Boolean {
- wait(1)
- return true
- }
-
- /**
- * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution.
- *
- * @param n The amount of ticks to suspend the simulation kernel, with <code>n > 0</code>
- */
- suspend override fun wait(n: Int) {
- require(n > 0) { "The amount of ticks to suspend must be a non-zero positive number" }
- queue.add(Envelope(Resume, clock.tick + n, entity, entity))
-
- while (true) {
- if (receive() is Resume)
- return
- }
- }
-
- /**
- * Update the state of the entity being simulated.
- *
- * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects
- * referencing the old entity having their data changed.
- *
- * @param next The next state of the entity.
- */
- suspend override fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) {
- @Suppress("UNCHECKED_CAST")
- (this as OmegaContext<E, S>).state = next
- }
-
- // Completion continuation implementation
- /**
- * Resume the execution of this continuation with the given value.
- *
- * @param value The value to resume with.
- */
- override fun resume(value: Unit) {}
-
- /**
- * Resume the execution of this continuation with an exception.
- *
- * @param exception The exception to resume with.
- */
- override fun resumeWithException(exception: Throwable) {
- val currentThread = Thread.currentThread()
- currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
- }
- }
+ override fun create(topology: Topology): Simulation = OmegaSimulation(this, topology)
}
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
new file mode 100644
index 00000000..ec7701e7
--- /dev/null
+++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
@@ -0,0 +1,344 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.kernel.omega
+
+import mu.KotlinLogging
+import nl.atlarge.opendc.kernel.*
+import nl.atlarge.opendc.kernel.messaging.Envelope
+import nl.atlarge.opendc.kernel.messaging.Receipt
+import nl.atlarge.opendc.kernel.time.Clock
+import nl.atlarge.opendc.kernel.time.Duration
+import nl.atlarge.opendc.kernel.time.Instant
+import nl.atlarge.opendc.kernel.time.TickClock
+import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.Topology
+import nl.atlarge.opendc.topology.TopologyContext
+import java.util.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
+ *
+ * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
+ * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
+ *
+ * By default, [Process]s are resolved as part of the [Topology], meaning each [Entity] in the topology should also
+ * implement its simulation behaviour by deriving from the [Process] interface.
+ *
+ * @property kernel The kernel that facilitates the simulation.
+ * @property topology The topology to run the simulation over.
+ * @property clock The clock to use for simulation time.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal class OmegaSimulation(override val kernel: OmegaKernel, override val topology: Topology,
+ override val clock: Clock = TickClock()) : Simulation {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The registry of the simulation kernels used in the experiment.
+ */
+ private val registry: MutableMap<Entity<*>, OmegaContext<*, *>> = HashMap()
+
+ /**
+ * The message queue.
+ */
+ private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
+
+ /**
+ * Initialise the simulator.
+ */
+ init {
+ topology.forEach { resolve(it) }
+ registry.values.forEach { context ->
+ @Suppress("UNCHECKED_CAST")
+ val process = context.entity as Process<Entity<*>>
+
+ // Start all process co-routines
+ val block: suspend () -> Unit = { process.run { context.run() } }
+ block.startCoroutine(context)
+ }
+ }
+
+ /**
+ * Step through one event in the simulation.
+ */
+ override fun step() {
+ while (true) {
+ val envelope = queue.peek() ?: return
+ val delivery = envelope.time
+
+ if (delivery > clock.now) {
+ // Tick has yet to occur
+ // Jump in time to next event
+ clock.advanceTo(delivery)
+ break
+ } else if (delivery < clock.now) {
+ // Tick has already occurred
+ logger.warn { "message processed out of order" }
+ }
+ queue.poll()
+
+ val context = registry[envelope.destination] ?: continue
+
+ if (envelope.message !is Interrupt) {
+ context.continuation.resume(envelope)
+ } else {
+ context.continuation.resumeWithException(envelope.message)
+ }
+
+ context.last = clock.now
+ }
+ }
+
+ /**
+ * Run a simulation over the specified [Topology].
+ * This method will step through multiple cycles in the simulation until no more message exist in the queue.
+ */
+ override fun run() {
+ while (queue.isNotEmpty()) {
+ step()
+ }
+ }
+
+ /**
+ * Run a simulation over the specified [Topology], stepping through cycles until (exclusive) the specified clock
+ * tick has occurred. The control is then handed back to the user.
+ *
+ * @param until The point in simulation time at which the simulation should be paused and the control is handed
+ * back to the user.
+ */
+ override fun run(until: Instant) {
+ require(until > 0) { "The given instant must be a non-zero positive number" }
+
+ if (clock.now >= until) {
+ return
+ }
+
+ while (clock.now < until && queue.isNotEmpty()) {
+ step()
+ }
+
+ // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at
+ // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will
+ // just jump forward again.
+ if (clock.now > until) {
+ clock.rewindTo(until)
+ }
+ }
+
+ /**
+ * Schedule a message for processing by a [Process].
+ *
+ * @param message The message to schedule.
+ * @param destination The destination of the message.
+ * @param sender The sender of the message.
+ * @param delay The amount of time to wait before processing the message.
+ */
+ override fun schedule(message: Any, destination: Entity<*>, sender: Entity<*>?, delay: Duration): Receipt {
+ require(delay > 0) { "The amount of time to delay the message must be a positive number" }
+ val wrapped = MessageContainer(message, clock.now + delay, sender, destination)
+ queue.add(wrapped)
+ return wrapped
+ }
+
+ /**
+ * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network.
+ *
+ * @param entity The [Entity] to resolve the [Context] for.
+ * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated
+ * with it.
+ */
+ private fun <E : Entity<*>> resolve(entity: E): Context<E>? {
+ if (entity !is Process<*>)
+ return null
+
+ @Suppress("UNCHECKED_CAST")
+ return registry.computeIfAbsent(entity, {
+ OmegaContext(entity)
+ }) as Context<E>
+ }
+
+ /**
+ * This internal class provides the default implementation for the [Context] interface for this simulator.
+ */
+ private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>,
+ Continuation<Unit>, TopologyContext by topology {
+ /**
+ * The continuation to resume the execution of the process.
+ */
+ lateinit var continuation: Continuation<Envelope<*>>
+
+ /**
+ * The last point in time the process has done some work.
+ */
+ var last: Instant = 0
+
+ /**
+ * The state of the entity.
+ */
+ var state: S = entity.initialState
+
+ /**
+ * The [Topology] over which the simulation is run.
+ */
+ override val topology: Topology = this@OmegaSimulation.topology
+
+ /**
+ * The current point in simulation time.
+ */
+ override val time: Instant
+ get() = clock.now
+
+ /**
+ * The duration between the current point in simulation time and the last point in simulation time where the
+ * [Process] has executed some work.
+ */
+ override val delta: Duration
+ get() = clock.now - last
+
+ /**
+ * The [CoroutineContext] for a [Process].
+ */
+ override val context: CoroutineContext = EmptyCoroutineContext
+
+ /**
+ * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
+ */
+ @Suppress("UNCHECKED_CAST")
+ override val <T : Entity<S>, S> T.state: S
+ get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState
+
+ /**
+ * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the
+ * message has been received.
+ *
+ * @return The envelope containing the message.
+ */
+ suspend fun receiveEnvelope(): Envelope<*> {
+ return suspendCoroutine { continuation = it }
+ }
+
+ /**
+ * Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
+ *
+ * @param block The block to process the message with.
+ * @return The processed message.
+ */
+ suspend override fun <T> receive(block: Envelope<*>.(Any) -> T): T {
+ val envelope = receiveEnvelope()
+ return block(envelope, envelope.message)
+ }
+
+ /**
+ * Send the given message to the specified entity.
+ *
+ * @param msg The message to send.
+ * @param delay The amount of time to wait before the message should be received.
+ */
+ suspend override fun Entity<*>.send(msg: Any, delay: Duration) = send(msg, entity, delay)
+
+ /**
+ * Send the given message to the specified entity.
+ *
+ * @param msg The message to send.
+ * @param sender The sender of the message.
+ * @param delay The amount of time to wait before the message should be received.
+ */
+ suspend override fun Entity<*>.send(msg: Any, sender: Entity<*>, delay: Duration): Receipt {
+ return schedule(msg, this, sender, delay)
+ }
+
+ /**
+ * Send an interruption message to the given [Entity].
+ */
+ suspend override fun Entity<*>.interrupt() {
+ send(Interrupt)
+ }
+
+ /**
+ * Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the
+ * [Clock].
+ *
+ * @return `true` to allow usage in while statements.
+ */
+ suspend override fun tick(): Boolean {
+ wait(clock.tick)
+ return true
+ }
+
+ /**
+ * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming
+ * execution.
+ *
+ * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend
+ * the process until the no more messages at an earlier point in time have to be processed.
+ *
+ * @param duration The duration of simulation time to wait before resuming execution.
+ */
+ suspend override fun wait(duration: Duration) {
+ require(duration > 0) { "The amount of time to suspend must be a non-zero positive number" }
+ schedule(Resume, entity, entity, duration)
+
+ while (true) {
+ if (receive() is Resume)
+ return
+ }
+ }
+
+ /**
+ * Update the state of the entity being simulated.
+ *
+ * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects
+ * referencing the old entity having their data changed.
+ *
+ * @param next The next state of the entity.
+ */
+ suspend override fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) {
+ @Suppress("UNCHECKED_CAST")
+ (this as OmegaContext<E, S>).state = next
+ }
+
+ // Completion continuation implementation
+ /**
+ * Resume the execution of this continuation with the given value.
+ *
+ * @param value The value to resume with.
+ */
+ override fun resume(value: Unit) {}
+
+ /**
+ * Resume the execution of this continuation with an exception.
+ *
+ * @param exception The exception to resume with.
+ */
+ override fun resumeWithException(exception: Throwable) {
+ val currentThread = Thread.currentThread()
+ currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
+ }
+ }
+}
diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
index 1408d03f..4f48f20d 100644
--- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
+++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
@@ -34,11 +34,11 @@ import org.junit.jupiter.api.Test
internal class SmokeTest {
@Test
fun smoke() {
+ val rack = Rack()
val builder = AdjacencyList.builder()
val topology = builder.construct {
- val rack = Rack()
add(rack)
- val n = 1000
+ val n = 100
// Create n machines in the rack
repeat(n) {
val machine = Machine()
@@ -55,7 +55,7 @@ internal class SmokeTest {
}
}
- val simulator = OmegaKernel(topology)
- simulator.run()
+ val simulation = OmegaKernel.create(topology)
+ simulation.run()
}
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
index dba0fe1b..0884a725 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
@@ -24,11 +24,11 @@
package nl.atlarge.opendc.topology.machine
-import nl.atlarge.opendc.experiment.Task
+import nl.atlarge.opendc.extension.destinations
+import nl.atlarge.opendc.workload.Task
import nl.atlarge.opendc.kernel.Context
import nl.atlarge.opendc.kernel.Process
import nl.atlarge.opendc.topology.Entity
-import java.util.*
/**
* A Physical Machine (PM) inside a rack of a datacenter. It has a speed, and can be given a workload on which it will
@@ -47,7 +47,7 @@ class Machine : Entity<Machine.State>, Process<Machine> {
/**
* The shape of the state of a [Machine] entity.
*/
- data class State(val status: Status)
+ data class State(val status: Status, val task: Task? = null)
/**
* The initial state of a [Machine] entity.
@@ -58,30 +58,27 @@ class Machine : Entity<Machine.State>, Process<Machine> {
* Run the simulation kernel for this entity.
*/
override suspend fun Context<Machine>.run() {
- update(state.copy(status = Status.IDLE))
+ update(State(Status.IDLE))
- val cpus = outgoingEdges.filter { it.tag == "cpu" }.map { it.to as Cpu }
- val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores })
- val task: Task
+ val cpus = outgoingEdges.destinations<Cpu>("cpu")
+ val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong()
+ var task: Task? = null
- val delay = Random().nextInt(1000) + 1
- wait(delay)
-
- loop@ while (true) {
- val msg = receive()
- when (msg) {
- is Task -> {
- task = msg
- break@loop
+ while (true) {
+ if (task != null) {
+ if (task.finished) {
+ task = null
+ update(State(Status.IDLE))
+ } else {
+ task.consume(speed * delta)
}
- else -> println("warning: unhandled message $msg")
}
- }
- update(state.copy(status = Status.RUNNING))
- while (tick()) {
- task.consume(speed.toLong())
+ val msg = receive()
+ if (msg is Task) {
+ task = msg
+ update(State(Status.RUNNING, task))
+ }
}
- update(state.copy(status = Status.HALT))
}
}