diff options
18 files changed, 948 insertions, 343 deletions
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt new file mode 100644 index 00000000..83100587 --- /dev/null +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.extension + +import nl.atlarge.opendc.topology.Edge + +/** + * Filter a [Set] of [Edge]s based on the tag of the edges and return the origin nodes casted to type `T`. + * + * @param tag The tag of the edges to get. + * @return An [Iterable] of the specified type `T` with the given tag. + */ +inline fun <reified T> Set<Edge<*>>.origins(tag: String) = filter { it.tag == tag }.map { it.from as T } + +/** + * Filter a [Set] of [Edge]s based on the tag of the edges and return the destination nodes casted to type `T`. + * + * @param tag The tag of the edges to get. + * @return An [Iterable] of the specified type `T` with the given tag. + */ +inline fun <reified T> Set<Edge<*>>.destinations(tag: String) = filter { it.tag == tag }.map { it.to as T } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt index 600a9cee..46cb271e 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt @@ -26,9 +26,13 @@ package nl.atlarge.opendc.kernel import nl.atlarge.opendc.kernel.messaging.Readable import nl.atlarge.opendc.kernel.messaging.Writable +import nl.atlarge.opendc.kernel.time.Clock +import nl.atlarge.opendc.kernel.time.Duration +import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.Topology import nl.atlarge.opendc.topology.TopologyContext +import java.lang.Process /** * This interface provides a context for simulation [Process]es, which defines the environment in which the simulation @@ -38,19 +42,25 @@ import nl.atlarge.opendc.topology.TopologyContext */ interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext { /** + * The [Entity] in simulation by the [Process]. + */ + val entity: E + + /** * The [Topology] over which the simulation is run. */ val topology: Topology /** - * The global [Clock] that keeps track of the simulation time. + * The current point in simulation time. */ - val clock: Clock + val time: Instant /** - * The [Entity] in simulation by the [Process]. + * The duration between the current point in simulation time and the last point in simulation time where the + * [Process] has executed some work. This means the `run()` co-routine has been resumed. */ - val entity: E + val delta: Duration /** * The observable state of an [Entity] in simulation, which is provided by the simulation context. @@ -58,29 +68,39 @@ interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext { val <E : Entity<S>, S> E.state: S /** - * Interrupt the [Process] of an [Entity] in simulation. + * Update the observable state of the entity being simulated. + * + * Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects + * referencing the old entity having their data changed. + * + * @param next The next state of the entity. */ - suspend fun Entity<*>.interrupt() + suspend fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) /** - * Suspend the [Process] of the [Entity] in simulation until the next tick has occurred in the simulation. + * Interrupt the [Process] of an [Entity] in simulation. + * + * If a [Process] has been suspended, the suspending call will throw an [Interrupt] object as a result of this call. + * Make sure the [Process] actually has error handling in place, so it won't take down the whole [Process]. */ - suspend fun tick(): Boolean + suspend fun Entity<*>.interrupt() /** - * Suspend the [Process] of the [Entity] in simulation for <code>n</code> ticks before resuming execution. + * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming + * execution. + * + * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend + * the process until the no more messages at an earlier point in time have to be processed. * - * @param n The amount of ticks to suspend the process. + * @param duration The duration of simulation time to wait before resuming execution. */ - suspend fun wait(n: Int) + suspend fun wait(duration: Duration) /** - * Update the observable state of the entity being simulated. - * - * Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects - * referencing the old entity having their data changed. + * Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the + * [Clock]. * - * @param next The next state of the entity. + * @return `true` to allow usage in while statements. */ - suspend fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) + suspend fun tick(): Boolean } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt index 9678db41..524c9131 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt @@ -24,45 +24,26 @@ package nl.atlarge.opendc.kernel -import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.Topology -import java.lang.Process /** * A message-based discrete event simulator (DES). This interface allows running simulations over a [Topology]. * This discrete event simulator works by having entities in a [Topology] interchange messages between each other and * updating their observable state accordingly. * - * In order to run a simulation, a kernel needs to bootstrapped by an initial set of messages to be processed by - * entities in the topology of the simulation. Otherwise, the simulation will immediately exit. - * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ interface Kernel { /** - * The [Topology] over which the simulation is run. - */ - val topology: Topology - - /** - * Step through one cycle in the simulation. This method will process all events in a single tick, update the - * internal clock and then return the control to the user. - */ - fun step() - - /** - * Run a simulation over the specified [Topology]. - * This method will step through multiple cycles in the simulation until no more message exist in the queue. + * The name of the kernel. */ - fun run() + val name: String /** - * Schedule a message for processing by a [Process]. + * Create a new [Simulation] of the given [Topology] that is facilitated by this simulation kernel. * - * @param message The message to schedule. - * @param destination The destination of the message. - * @param sender The sender of the message. - * @param delay The amount of ticks to wait before processing the message. + * @param topology The [Topology] to create a [Simulation] of. + * @return A [Simulation] instance. */ - fun schedule(message: Any?, destination: Entity<*>, sender: Entity<*>? = null, delay: Int = 0) + fun create(topology: Topology): Simulation } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulation.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulation.kt new file mode 100644 index 00000000..32b45ca2 --- /dev/null +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulation.kt @@ -0,0 +1,95 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel + +import nl.atlarge.opendc.kernel.messaging.Receipt +import nl.atlarge.opendc.kernel.time.Clock +import nl.atlarge.opendc.kernel.time.Duration +import nl.atlarge.opendc.kernel.time.Instant +import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.Topology +import java.lang.Process + +/** + * A message based discrete event simulation facilitated by a simulation [Kernel]. + * + * In order for the simulation to run, the simulation kernel needs to bootstrapped by an set of messages to be processed + * initially by entities in the topology of the simulation. Otherwise, the simulation will immediately exit. + * Bootstrapping can be achieved by scheduling messages before running the simulation via [Simulation.schedule]: + * + * `val simulation = kernel.create(topology).apply { + * schedule(Boot, entity) + * }` + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Simulation { + /** + * The [Kernel] that facilitates the simulation. + */ + val kernel: Kernel + + /** + * The [Topology] over which the simulation is run. + */ + val topology: Topology + + /** + * The [Clock] instance that keeps track of simulation time. + */ + val clock: Clock + + /** + * Step through one cycle in the simulation. This method will process all events in a single tick, update the + * internal clock and then return the control to the user. + */ + fun step() + + /** + * Run a simulation over the specified [Topology]. + * This method will step through multiple cycles in the simulation until no more message exist in the queue. + */ + fun run() + + /** + * Run a simulation over the specified [Topology], stepping through cycles until the specified clock tick has + * occurred. The control is then handed back to the user. + * + * @param until The point in simulation time at which the simulation should be paused and the control is handed + * back to the user. + */ + fun run(until: Instant) + + /** + * Schedule a message for processing by a [Process]. + * + * @param message The message to schedule. + * @param destination The destination of the message. + * @param sender The sender of the message. + * @param delay The amount of time to wait before processing the message. + * @return A [Receipt] of the message that has been scheduled. + */ + fun schedule(message: Any, destination: Entity<*>, sender: Entity<*>? = null, delay: Duration = 0): Receipt +} diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt index 61d1a0cf..608d325f 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Envelope.kt @@ -24,33 +24,27 @@ package nl.atlarge.opendc.kernel.messaging -import nl.atlarge.opendc.kernel.Tick import nl.atlarge.opendc.topology.Entity /** - * The envelope of a message that is received from a [Channel], also containing the metadata of the message. + * The envelope of a message that is sent to an [Entity], also containing the metadata of the message. * * @param T The shape of the message inside the envelope. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -data class Envelope<out T>( +interface Envelope<out T: Any> { /** * The message in this envelope. */ - val message: T, - - /** - * The tick at which the message should be delivered. - */ - val tick: Tick, + val message: T /** * The sender of the message. */ - val sender: Entity<*>?, + val sender: Entity<*>? /** * The destination of the message. */ val destination: Entity<*> -) +} diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt index 422c5668..772d9013 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt @@ -25,23 +25,26 @@ package nl.atlarge.opendc.kernel.messaging /** - * A [Readable] instance allows objects to pull messages from the instance. + * A [Readable] instance has a mailbox associated with the instance to which objects can send messages, which can be + * received by the class. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ interface Readable { /** - * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. + * Retrieve and removes a single message from the entity's mailbox, suspending the function if the mailbox is empty. + * The execution is resumed after the message has landed in the entity's mailbox after which the message [Envelope] + * is mapped through `block` to generate a processed message. * * @param block The block to process the message with. * @return The processed message. */ - suspend fun <T> receive(block: Envelope<*>.(Any?) -> T): T + suspend fun <T> receive(block: Envelope<*>.(Any) -> T): T /** - * Retrieve a single message from this [Channel]. + * Retrieve and removes a single message from the entity's mailbox, suspending the function if the mailbox is empty. * - * @return The message that was received from the channel + * @return The message that was received from the entity's mailbox. */ - suspend fun receive(): Any? = receive { it } + suspend fun receive(): Any = receive { it } } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Receipt.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Receipt.kt new file mode 100644 index 00000000..74433f5e --- /dev/null +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Receipt.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.messaging + +import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.topology.Entity + +/** + * A receipt of a message that has been scheduled by a simulation [Kernel]. This interface allows the cancellation of a + * message that has been scheduled for delivery and for checking the status of a delivery. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Receipt { + /** + * A flag to indicate the message has been canceled. + */ + val canceled: Boolean + + /** + * A flag to indicate the message has been delivered. + */ + val delivered: Boolean + + /** + * Cancel the message to prevent it from being received by an [Entity]. + * + * @throws IllegalStateException if the message has already been delivered. + */ + fun cancel() +} diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt index 45c81e39..0d2b2725 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Writable.kt @@ -24,6 +24,7 @@ package nl.atlarge.opendc.kernel.messaging +import nl.atlarge.opendc.kernel.time.Duration import nl.atlarge.opendc.topology.Entity /** @@ -33,11 +34,21 @@ import nl.atlarge.opendc.topology.Entity */ interface Writable { /** - * Send the given message downstream. + * Send the given message to the specified entity. + * + * @param msg The message to send. + * @param delay The amount of time to wait before the message should be received. + * @return A [Receipt] of the message that has been sent. + */ + suspend fun Entity<*>.send(msg: Any, delay: Duration = 0): Receipt + + /** + * Send the given message to the specified entity. * * @param msg The message to send. * @param sender The sender of the message. - * @param delay The number of ticks before the message should be received. + * @param delay The amount of time to wait before the message should be received. + * @return A [Receipt] of the message that has been sent. */ - suspend fun Entity<*>.send(msg: Any?, sender: Entity<*>? = null, delay: Int = 0) + suspend fun Entity<*>.send(msg: Any, sender: Entity<*>, delay: Duration = 0): Receipt } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Clock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Clock.kt new file mode 100644 index 00000000..f03a98fa --- /dev/null +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Clock.kt @@ -0,0 +1,79 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.time + +import nl.atlarge.opendc.kernel.Simulation + +/** + * A clock controls and provides access to the simulation time of a [Simulation]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Clock { + /** + * The moment in time the clock is currently at. + */ + val now: Instant + + /** + * The duration of a tick in this clock. This is an arbitrary duration of time in which entities in simulation + * perform some defined amount of work. + */ + val tick: Duration + + /** + * Advance the clock by the given duration. + * + * @param duration The duration to advance the clock by. + */ + fun advance(duration: Duration) { + require(duration >= 0) { "The duration to advance the clock must not be a negative number" } + advanceTo(now + duration) + } + + /** + * Rewind the clock by the given duration. + * + * @param duration The duration to rewind the clock by. + */ + fun rewind(duration: Duration) { + require(duration >= 0) { "The duration to rewind the clock must not be a negative number" } + rewindTo(now - duration) + } + + /** + * Rewind the clock to the given point in time. + * + * @param instant The point in time to rewind the clock to. + */ + fun rewindTo(instant: Instant) + + /** + * Advance the clock to the given point in time. + * + * @param instant The point in time to advance the clock to. + */ + fun advanceTo(instant: Instant) +} diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/TickClock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/TickClock.kt new file mode 100644 index 00000000..d960f454 --- /dev/null +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/TickClock.kt @@ -0,0 +1,60 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.time + +/** + * A tick based clock which divides time into a discrete interval of points. + * + * @param initial The initial point in time of the clock. + * @param tick The duration of a tick. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +class TickClock(initial: Instant = 0, override val tick: Duration = 1) : Clock { + /** + * The moment in time the clock is currently at. + */ + override var now: Instant = initial + private set + + /** + * Advance the clock to the given point in time. + * + * @param instant The moment in time to advance the clock to. + */ + override fun advanceTo(instant: Instant) { + require(instant >= now) { "The point to advance to must be at the same point or further than now" } + now = instant + } + + /** + * Rewind the clock to the given point in time. + * + * @param instant The point in time to rewind the clock to. + */ + override fun rewindTo(instant: Instant) { + require(now >= instant) { "The point to rewind to must be before the current point in time" } + now = instant + } +} diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Time.kt index c84f4dbf..af9d547b 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/time/Time.kt @@ -22,19 +22,14 @@ * SOFTWARE. */ -package nl.atlarge.opendc.kernel.omega +package nl.atlarge.opendc.kernel.time -import nl.atlarge.opendc.kernel.Clock -import nl.atlarge.opendc.kernel.Tick +/** + * An instantaneous point on the time-line, used to record event time-stamps in a simulation. + */ +typealias Instant = Long /** - * A [Clock] implementation used by the Omega simulation kernel. - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + * A time interval which represents the amount of elapsed time between two events. */ -class OmegaClock: Clock { - /** - * The simulation time expressed as the amount of ticks that passed. - */ - override var tick: Tick = 0 -} +typealias Duration = Long diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Clock.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/platform/Experiment.kt index 0328bbe6..225b2813 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Clock.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/platform/Experiment.kt @@ -22,21 +22,20 @@ * SOFTWARE. */ -package nl.atlarge.opendc.kernel +package nl.atlarge.opendc.platform -/** - * A tick represents a moment of time in which some work is done by an entity. - */ -typealias Tick = Long +import nl.atlarge.opendc.kernel.Kernel /** - * The clock of a simulation manages the simulation time of a simulation [Kernel]. + * A blueprint for a reproducible simulation in a pre-defined setting. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Clock { +interface Experiment<out T> { /** - * The tick the clock is currently at. + * Run the experiment on the specified simulation [Kernel]. + * + * @param kernel The simulation kernel to run the experiment. */ - val tick: Tick + fun run(kernel: Kernel): T } diff --git a/opendc-integration-jpa/build.gradle b/opendc-integration-jpa/build.gradle new file mode 100644 index 00000000..89b5740d --- /dev/null +++ b/opendc-integration-jpa/build.gradle @@ -0,0 +1,87 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +buildscript { + ext.kotlin_version = '1.1.4-3' + ext.dokka_version = '0.9.15' + + repositories { + mavenCentral() + jcenter() + } + + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + classpath "org.jetbrains.dokka:dokka-gradle-plugin:$dokka_version" + classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0-RC3' + } +} + +apply plugin: 'java' +apply plugin: 'kotlin' +apply plugin: 'org.jetbrains.dokka' +apply plugin: 'org.junit.platform.gradle.plugin' + +compileKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} + +compileTestKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} + +kotlin { + experimental { + coroutines 'enable' + } +} + +dokka { + outputFormat = 'html' + outputDirectory = "$buildDir/javadoc" +} + +/* Project configuration */ +group 'nl.atlarge.opendc' +version '1.0' + +repositories { + jcenter() +} + +dependencies { + compile project(':opendc-core') + compile "io.github.microutils:kotlin-logging:1.4.6" + + testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" + testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" + testCompile "org.junit.platform:junit-platform-launcher:1.0.0-RC3" + testCompile "org.slf4j:slf4j-simple:1.7.25" + testCompile project(':opendc-stdlib') +} diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt new file mode 100644 index 00000000..1554a9e6 --- /dev/null +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.omega + +import nl.atlarge.opendc.kernel.messaging.Envelope +import nl.atlarge.opendc.kernel.messaging.Receipt +import nl.atlarge.opendc.kernel.time.Instant +import nl.atlarge.opendc.topology.Entity + +/** + * A wrapper around a message that has been scheduled for processing. + * + * @property message The message to wrap. + * @property time The point in time to deliver the message. + * @property sender The sender of the message. + * @property destination The destination of the message. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal data class MessageContainer(override val message: Any, + val time: Instant, + override val sender: Entity<*>?, + override val destination: Entity<*>) : Envelope<Any>, Receipt { + /** + * A flag to indicate the message has been canceled. + */ + override var canceled: Boolean = false + + /** + * A flag to indicate the message has been delivered. + */ + override var delivered: Boolean = false + + /** + * Cancel the message to prevent it from being received by an [Entity]. + * + * @throws IllegalStateException if the message has already been delivered. + */ + override fun cancel() { + if (delivered) { + throw IllegalStateException("The message has already been delivered") + } + + canceled = true + } + +} diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt index 631b6d45..5367e674 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt @@ -24,14 +24,11 @@ package nl.atlarge.opendc.kernel.omega -import mu.KotlinLogging -import nl.atlarge.opendc.kernel.* -import nl.atlarge.opendc.kernel.messaging.Envelope +import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.kernel.Process +import nl.atlarge.opendc.kernel.Simulation import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.Topology -import nl.atlarge.opendc.topology.TopologyContext -import java.util.* -import kotlin.coroutines.experimental.* /** * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. @@ -42,240 +39,19 @@ import kotlin.coroutines.experimental.* * By default, [Process]s are resolved as part of the [Topology], meaning each [Entity] in the topology should also * implement its simulation behaviour by deriving from the [Process] interface. * - * @param topology The topology to run the simulation over. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class OmegaKernel(override val topology: Topology) : Kernel { +object OmegaKernel : Kernel { /** - * The logger instance to use for the simulator. + * The name of the kernel. */ - private val logger = KotlinLogging.logger {} + override val name: String = "opendc-omega" /** - * The registry of the simulation kernels used in the experiment. - */ - private val registry: MutableMap<Entity<*>, OmegaContext<*, *>> = HashMap() - - /** - * The message queue. - */ - private val queue: PriorityQueue<Envelope<*>> = PriorityQueue(Comparator.comparingLong(Envelope<*>::tick)) - - /** - * The clock of the simulator. - */ - private val clock: OmegaClock = OmegaClock() - - /** - * Initialise the simulator. - */ - init { - topology.forEach { resolve(it) } - registry.values.forEach { context -> - @Suppress("UNCHECKED_CAST") - val process = context.entity as Process<Entity<*>> - - // Start all process co-routines - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) - } - } - - /** - * Step through one event in the simulation. - */ - override fun step() { - while (true) { - val envelope = queue.peek() ?: return - val tick = envelope.tick - - if (tick > clock.tick) { - // Tick has yet to occur - // Jump in time to next event - clock.tick = tick - break - } else if (tick < clock.tick) { - // Tick has already occurred - logger.warn { "message processed out of order" } - } - queue.poll() - - val context = registry[envelope.destination] ?: continue - - if (envelope.message !is Interrupt) { - context.continuation.resume(envelope) - } else { - context.continuation.resumeWithException(envelope.message as Interrupt) - } - } - } - - /** - * Run a simulation over the specified [Topology]. - * This method will step through multiple cycles in the simulation until no more message exist in the queue. - */ - override fun run() { - while (queue.isNotEmpty()) { - step() - } - } - - /** - * Schedule a message for processing by a [Process]. + * Create a new [Simulation] of the given [Topology] that is facilitated by this simulation kernel. * - * @param message The message to schedule. - * @param destination The destination of the message. - * @param sender The sender of the message. - * @param delay The amount of ticks to wait before processing the message. + * @param topology The [Topology] to create a [Simulation] of. + * @return A [Simulation] instance. */ - override fun schedule(message: Any?, destination: Entity<*>, sender: Entity<*>?, delay: Int) { - require(delay > 0) { "The amount of ticks to delay the message must be a positive number" } - queue.add(Envelope(message, clock.tick + delay, sender, destination)) - } - - /** - * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. - * - * @param entity The [Entity] to resolve the [Context] for. - * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated - * with it. - */ - private fun <E : Entity<*>> resolve(entity: E): Context<E>? { - if (entity !is Process<*>) - return null - - @Suppress("UNCHECKED_CAST") - return registry.computeIfAbsent(entity, { - OmegaContext(entity) - }) as Context<E> - } - - /** - * This internal class provides the default implementation for the [Context] interface for this simulator. - */ - private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>, - Continuation<Unit>, TopologyContext by topology { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation<Envelope<*>> - - /** - * The state of the entity. - */ - var state: S = entity.initialState - - /** - * The [Topology] over which the simulation is run. - */ - override val topology: Topology = this@OmegaKernel.topology - - /** - * The global [Clock] that keeps track of the simulation time. - */ - override val clock: Clock = this@OmegaKernel.clock - - /** - * The [CoroutineContext] for a [Process]. - */ - override val context: CoroutineContext = EmptyCoroutineContext - - /** - * The observable state of an [Entity] within the simulation is provided by the context of the simulation. - */ - @Suppress("UNCHECKED_CAST") - override val <T : Entity<S>, S> T.state: S - get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState - - /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the - * message has been received. - * - * @return The envelope containing the message. - */ - suspend fun receiveEnvelope(): Envelope<*> { - return suspendCoroutine { continuation = it } - } - - /** - * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. - * - * @param block The block to process the message with. - * @return The processed message. - */ - suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T { - val envelope = receiveEnvelope() - return block(envelope, envelope.message) - } - - /** - * Send the given message downstream. - * - * @param msg The message to send. - * @param sender The sender of the message. - * @param delay The number of ticks before the message should be received. - */ - suspend override fun Entity<*>.send(msg: Any?, sender: Entity<*>?, delay: Int) { - schedule(msg, this, sender, delay) - } - - /** - * Send an interruption message to the given [Entity]. - */ - suspend override fun Entity<*>.interrupt() = send(Interrupt, this) - - /** - * Suspend the simulation kernel until the next tick occurs in the simulation. - */ - suspend override fun tick(): Boolean { - wait(1) - return true - } - - /** - * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution. - * - * @param n The amount of ticks to suspend the simulation kernel, with <code>n > 0</code> - */ - suspend override fun wait(n: Int) { - require(n > 0) { "The amount of ticks to suspend must be a non-zero positive number" } - queue.add(Envelope(Resume, clock.tick + n, entity, entity)) - - while (true) { - if (receive() is Resume) - return - } - } - - /** - * Update the state of the entity being simulated. - * - * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects - * referencing the old entity having their data changed. - * - * @param next The next state of the entity. - */ - suspend override fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) { - @Suppress("UNCHECKED_CAST") - (this as OmegaContext<E, S>).state = next - } - - // Completion continuation implementation - /** - * Resume the execution of this continuation with the given value. - * - * @param value The value to resume with. - */ - override fun resume(value: Unit) {} - - /** - * Resume the execution of this continuation with an exception. - * - * @param exception The exception to resume with. - */ - override fun resumeWithException(exception: Throwable) { - val currentThread = Thread.currentThread() - currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) - } - } + override fun create(topology: Topology): Simulation = OmegaSimulation(this, topology) } diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt new file mode 100644 index 00000000..ec7701e7 --- /dev/null +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -0,0 +1,344 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.omega + +import mu.KotlinLogging +import nl.atlarge.opendc.kernel.* +import nl.atlarge.opendc.kernel.messaging.Envelope +import nl.atlarge.opendc.kernel.messaging.Receipt +import nl.atlarge.opendc.kernel.time.Clock +import nl.atlarge.opendc.kernel.time.Duration +import nl.atlarge.opendc.kernel.time.Instant +import nl.atlarge.opendc.kernel.time.TickClock +import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.Topology +import nl.atlarge.opendc.topology.TopologyContext +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * By default, [Process]s are resolved as part of the [Topology], meaning each [Entity] in the topology should also + * implement its simulation behaviour by deriving from the [Process] interface. + * + * @property kernel The kernel that facilitates the simulation. + * @property topology The topology to run the simulation over. + * @property clock The clock to use for simulation time. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class OmegaSimulation(override val kernel: OmegaKernel, override val topology: Topology, + override val clock: Clock = TickClock()) : Simulation { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap<Entity<*>, OmegaContext<*, *>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + + /** + * Initialise the simulator. + */ + init { + topology.forEach { resolve(it) } + registry.values.forEach { context -> + @Suppress("UNCHECKED_CAST") + val process = context.entity as Process<Entity<*>> + + // Start all process co-routines + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + } + } + + /** + * Step through one event in the simulation. + */ + override fun step() { + while (true) { + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > clock.now) { + // Tick has yet to occur + // Jump in time to next event + clock.advanceTo(delivery) + break + } else if (delivery < clock.now) { + // Tick has already occurred + logger.warn { "message processed out of order" } + } + queue.poll() + + val context = registry[envelope.destination] ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = clock.now + } + } + + /** + * Run a simulation over the specified [Topology]. + * This method will step through multiple cycles in the simulation until no more message exist in the queue. + */ + override fun run() { + while (queue.isNotEmpty()) { + step() + } + } + + /** + * Run a simulation over the specified [Topology], stepping through cycles until (exclusive) the specified clock + * tick has occurred. The control is then handed back to the user. + * + * @param until The point in simulation time at which the simulation should be paused and the control is handed + * back to the user. + */ + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (clock.now >= until) { + return + } + + while (clock.now < until && queue.isNotEmpty()) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (clock.now > until) { + clock.rewindTo(until) + } + } + + /** + * Schedule a message for processing by a [Process]. + * + * @param message The message to schedule. + * @param destination The destination of the message. + * @param sender The sender of the message. + * @param delay The amount of time to wait before processing the message. + */ + override fun schedule(message: Any, destination: Entity<*>, sender: Entity<*>?, delay: Duration): Receipt { + require(delay > 0) { "The amount of time to delay the message must be a positive number" } + val wrapped = MessageContainer(message, clock.now + delay, sender, destination) + queue.add(wrapped) + return wrapped + } + + /** + * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. + * + * @param entity The [Entity] to resolve the [Context] for. + * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated + * with it. + */ + private fun <E : Entity<*>> resolve(entity: E): Context<E>? { + if (entity !is Process<*>) + return null + + @Suppress("UNCHECKED_CAST") + return registry.computeIfAbsent(entity, { + OmegaContext(entity) + }) as Context<E> + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>, + Continuation<Unit>, TopologyContext by topology { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation<Envelope<*>> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = 0 + + /** + * The state of the entity. + */ + var state: S = entity.initialState + + /** + * The [Topology] over which the simulation is run. + */ + override val topology: Topology = this@OmegaSimulation.topology + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = clock.now + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Process] has executed some work. + */ + override val delta: Duration + get() = clock.now - last + + /** + * The [CoroutineContext] for a [Process]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + @Suppress("UNCHECKED_CAST") + override val <T : Entity<S>, S> T.state: S + get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> { + return suspendCoroutine { continuation = it } + } + + /** + * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. + * + * @param block The block to process the message with. + * @return The processed message. + */ + suspend override fun <T> receive(block: Envelope<*>.(Any) -> T): T { + val envelope = receiveEnvelope() + return block(envelope, envelope.message) + } + + /** + * Send the given message to the specified entity. + * + * @param msg The message to send. + * @param delay The amount of time to wait before the message should be received. + */ + suspend override fun Entity<*>.send(msg: Any, delay: Duration) = send(msg, entity, delay) + + /** + * Send the given message to the specified entity. + * + * @param msg The message to send. + * @param sender The sender of the message. + * @param delay The amount of time to wait before the message should be received. + */ + suspend override fun Entity<*>.send(msg: Any, sender: Entity<*>, delay: Duration): Receipt { + return schedule(msg, this, sender, delay) + } + + /** + * Send an interruption message to the given [Entity]. + */ + suspend override fun Entity<*>.interrupt() { + send(Interrupt) + } + + /** + * Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the + * [Clock]. + * + * @return `true` to allow usage in while statements. + */ + suspend override fun tick(): Boolean { + wait(clock.tick) + return true + } + + /** + * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming + * execution. + * + * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend + * the process until the no more messages at an earlier point in time have to be processed. + * + * @param duration The duration of simulation time to wait before resuming execution. + */ + suspend override fun wait(duration: Duration) { + require(duration > 0) { "The amount of time to suspend must be a non-zero positive number" } + schedule(Resume, entity, entity, duration) + + while (true) { + if (receive() is Resume) + return + } + } + + /** + * Update the state of the entity being simulated. + * + * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects + * referencing the old entity having their data changed. + * + * @param next The next state of the entity. + */ + suspend override fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) { + @Suppress("UNCHECKED_CAST") + (this as OmegaContext<E, S>).state = next + } + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) {} + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + val currentThread = Thread.currentThread() + currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) + } + } +} diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt index 1408d03f..4f48f20d 100644 --- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt +++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -34,11 +34,11 @@ import org.junit.jupiter.api.Test internal class SmokeTest { @Test fun smoke() { + val rack = Rack() val builder = AdjacencyList.builder() val topology = builder.construct { - val rack = Rack() add(rack) - val n = 1000 + val n = 100 // Create n machines in the rack repeat(n) { val machine = Machine() @@ -55,7 +55,7 @@ internal class SmokeTest { } } - val simulator = OmegaKernel(topology) - simulator.run() + val simulation = OmegaKernel.create(topology) + simulation.run() } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index dba0fe1b..0884a725 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -24,11 +24,11 @@ package nl.atlarge.opendc.topology.machine -import nl.atlarge.opendc.experiment.Task +import nl.atlarge.opendc.extension.destinations +import nl.atlarge.opendc.workload.Task import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.topology.Entity -import java.util.* /** * A Physical Machine (PM) inside a rack of a datacenter. It has a speed, and can be given a workload on which it will @@ -47,7 +47,7 @@ class Machine : Entity<Machine.State>, Process<Machine> { /** * The shape of the state of a [Machine] entity. */ - data class State(val status: Status) + data class State(val status: Status, val task: Task? = null) /** * The initial state of a [Machine] entity. @@ -58,30 +58,27 @@ class Machine : Entity<Machine.State>, Process<Machine> { * Run the simulation kernel for this entity. */ override suspend fun Context<Machine>.run() { - update(state.copy(status = Status.IDLE)) + update(State(Status.IDLE)) - val cpus = outgoingEdges.filter { it.tag == "cpu" }.map { it.to as Cpu } - val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) - val task: Task + val cpus = outgoingEdges.destinations<Cpu>("cpu") + val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong() + var task: Task? = null - val delay = Random().nextInt(1000) + 1 - wait(delay) - - loop@ while (true) { - val msg = receive() - when (msg) { - is Task -> { - task = msg - break@loop + while (true) { + if (task != null) { + if (task.finished) { + task = null + update(State(Status.IDLE)) + } else { + task.consume(speed * delta) } - else -> println("warning: unhandled message $msg") } - } - update(state.copy(status = Status.RUNNING)) - while (tick()) { - task.consume(speed.toLong()) + val msg = receive() + if (msg is Task) { + task = msg + update(State(Status.RUNNING, task)) + } } - update(state.copy(status = Status.HALT)) } } |
