diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-09-20 00:36:47 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-09-20 00:36:47 +0200 |
| commit | a67b87be9e14d6d3c23e1e6aff5051176171e6ef (patch) | |
| tree | e3c0e780a878d455796598809773309bbb9fec3f /opendc-omega | |
| parent | 62895f71b7a7479652d9b86f7036b6580b40b7c7 (diff) | |
Improve simulation time management
Diffstat (limited to 'opendc-omega')
| -rw-r--r-- | opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt (renamed from opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt) | 40 | ||||
| -rw-r--r-- | opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt | 244 | ||||
| -rw-r--r-- | opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt | 344 | ||||
| -rw-r--r-- | opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt | 8 |
4 files changed, 392 insertions, 244 deletions
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt index c84f4dbf..1554a9e6 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/MessageContainer.kt @@ -24,17 +24,45 @@ package nl.atlarge.opendc.kernel.omega -import nl.atlarge.opendc.kernel.Clock -import nl.atlarge.opendc.kernel.Tick +import nl.atlarge.opendc.kernel.messaging.Envelope +import nl.atlarge.opendc.kernel.messaging.Receipt +import nl.atlarge.opendc.kernel.time.Instant +import nl.atlarge.opendc.topology.Entity /** - * A [Clock] implementation used by the Omega simulation kernel. + * A wrapper around a message that has been scheduled for processing. * + * @property message The message to wrap. + * @property time The point in time to deliver the message. + * @property sender The sender of the message. + * @property destination The destination of the message. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class OmegaClock: Clock { +internal data class MessageContainer(override val message: Any, + val time: Instant, + override val sender: Entity<*>?, + override val destination: Entity<*>) : Envelope<Any>, Receipt { /** - * The simulation time expressed as the amount of ticks that passed. + * A flag to indicate the message has been canceled. */ - override var tick: Tick = 0 + override var canceled: Boolean = false + + /** + * A flag to indicate the message has been delivered. + */ + override var delivered: Boolean = false + + /** + * Cancel the message to prevent it from being received by an [Entity]. + * + * @throws IllegalStateException if the message has already been delivered. + */ + override fun cancel() { + if (delivered) { + throw IllegalStateException("The message has already been delivered") + } + + canceled = true + } + } diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt index 631b6d45..5367e674 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt @@ -24,14 +24,11 @@ package nl.atlarge.opendc.kernel.omega -import mu.KotlinLogging -import nl.atlarge.opendc.kernel.* -import nl.atlarge.opendc.kernel.messaging.Envelope +import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.kernel.Process +import nl.atlarge.opendc.kernel.Simulation import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.Topology -import nl.atlarge.opendc.topology.TopologyContext -import java.util.* -import kotlin.coroutines.experimental.* /** * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. @@ -42,240 +39,19 @@ import kotlin.coroutines.experimental.* * By default, [Process]s are resolved as part of the [Topology], meaning each [Entity] in the topology should also * implement its simulation behaviour by deriving from the [Process] interface. * - * @param topology The topology to run the simulation over. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class OmegaKernel(override val topology: Topology) : Kernel { +object OmegaKernel : Kernel { /** - * The logger instance to use for the simulator. + * The name of the kernel. */ - private val logger = KotlinLogging.logger {} + override val name: String = "opendc-omega" /** - * The registry of the simulation kernels used in the experiment. - */ - private val registry: MutableMap<Entity<*>, OmegaContext<*, *>> = HashMap() - - /** - * The message queue. - */ - private val queue: PriorityQueue<Envelope<*>> = PriorityQueue(Comparator.comparingLong(Envelope<*>::tick)) - - /** - * The clock of the simulator. - */ - private val clock: OmegaClock = OmegaClock() - - /** - * Initialise the simulator. - */ - init { - topology.forEach { resolve(it) } - registry.values.forEach { context -> - @Suppress("UNCHECKED_CAST") - val process = context.entity as Process<Entity<*>> - - // Start all process co-routines - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) - } - } - - /** - * Step through one event in the simulation. - */ - override fun step() { - while (true) { - val envelope = queue.peek() ?: return - val tick = envelope.tick - - if (tick > clock.tick) { - // Tick has yet to occur - // Jump in time to next event - clock.tick = tick - break - } else if (tick < clock.tick) { - // Tick has already occurred - logger.warn { "message processed out of order" } - } - queue.poll() - - val context = registry[envelope.destination] ?: continue - - if (envelope.message !is Interrupt) { - context.continuation.resume(envelope) - } else { - context.continuation.resumeWithException(envelope.message as Interrupt) - } - } - } - - /** - * Run a simulation over the specified [Topology]. - * This method will step through multiple cycles in the simulation until no more message exist in the queue. - */ - override fun run() { - while (queue.isNotEmpty()) { - step() - } - } - - /** - * Schedule a message for processing by a [Process]. + * Create a new [Simulation] of the given [Topology] that is facilitated by this simulation kernel. * - * @param message The message to schedule. - * @param destination The destination of the message. - * @param sender The sender of the message. - * @param delay The amount of ticks to wait before processing the message. + * @param topology The [Topology] to create a [Simulation] of. + * @return A [Simulation] instance. */ - override fun schedule(message: Any?, destination: Entity<*>, sender: Entity<*>?, delay: Int) { - require(delay > 0) { "The amount of ticks to delay the message must be a positive number" } - queue.add(Envelope(message, clock.tick + delay, sender, destination)) - } - - /** - * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. - * - * @param entity The [Entity] to resolve the [Context] for. - * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated - * with it. - */ - private fun <E : Entity<*>> resolve(entity: E): Context<E>? { - if (entity !is Process<*>) - return null - - @Suppress("UNCHECKED_CAST") - return registry.computeIfAbsent(entity, { - OmegaContext(entity) - }) as Context<E> - } - - /** - * This internal class provides the default implementation for the [Context] interface for this simulator. - */ - private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>, - Continuation<Unit>, TopologyContext by topology { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation<Envelope<*>> - - /** - * The state of the entity. - */ - var state: S = entity.initialState - - /** - * The [Topology] over which the simulation is run. - */ - override val topology: Topology = this@OmegaKernel.topology - - /** - * The global [Clock] that keeps track of the simulation time. - */ - override val clock: Clock = this@OmegaKernel.clock - - /** - * The [CoroutineContext] for a [Process]. - */ - override val context: CoroutineContext = EmptyCoroutineContext - - /** - * The observable state of an [Entity] within the simulation is provided by the context of the simulation. - */ - @Suppress("UNCHECKED_CAST") - override val <T : Entity<S>, S> T.state: S - get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState - - /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the - * message has been received. - * - * @return The envelope containing the message. - */ - suspend fun receiveEnvelope(): Envelope<*> { - return suspendCoroutine { continuation = it } - } - - /** - * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. - * - * @param block The block to process the message with. - * @return The processed message. - */ - suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T { - val envelope = receiveEnvelope() - return block(envelope, envelope.message) - } - - /** - * Send the given message downstream. - * - * @param msg The message to send. - * @param sender The sender of the message. - * @param delay The number of ticks before the message should be received. - */ - suspend override fun Entity<*>.send(msg: Any?, sender: Entity<*>?, delay: Int) { - schedule(msg, this, sender, delay) - } - - /** - * Send an interruption message to the given [Entity]. - */ - suspend override fun Entity<*>.interrupt() = send(Interrupt, this) - - /** - * Suspend the simulation kernel until the next tick occurs in the simulation. - */ - suspend override fun tick(): Boolean { - wait(1) - return true - } - - /** - * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution. - * - * @param n The amount of ticks to suspend the simulation kernel, with <code>n > 0</code> - */ - suspend override fun wait(n: Int) { - require(n > 0) { "The amount of ticks to suspend must be a non-zero positive number" } - queue.add(Envelope(Resume, clock.tick + n, entity, entity)) - - while (true) { - if (receive() is Resume) - return - } - } - - /** - * Update the state of the entity being simulated. - * - * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects - * referencing the old entity having their data changed. - * - * @param next The next state of the entity. - */ - suspend override fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) { - @Suppress("UNCHECKED_CAST") - (this as OmegaContext<E, S>).state = next - } - - // Completion continuation implementation - /** - * Resume the execution of this continuation with the given value. - * - * @param value The value to resume with. - */ - override fun resume(value: Unit) {} - - /** - * Resume the execution of this continuation with an exception. - * - * @param exception The exception to resume with. - */ - override fun resumeWithException(exception: Throwable) { - val currentThread = Thread.currentThread() - currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) - } - } + override fun create(topology: Topology): Simulation = OmegaSimulation(this, topology) } diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt new file mode 100644 index 00000000..ec7701e7 --- /dev/null +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -0,0 +1,344 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.omega + +import mu.KotlinLogging +import nl.atlarge.opendc.kernel.* +import nl.atlarge.opendc.kernel.messaging.Envelope +import nl.atlarge.opendc.kernel.messaging.Receipt +import nl.atlarge.opendc.kernel.time.Clock +import nl.atlarge.opendc.kernel.time.Duration +import nl.atlarge.opendc.kernel.time.Instant +import nl.atlarge.opendc.kernel.time.TickClock +import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.Topology +import nl.atlarge.opendc.topology.TopologyContext +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * By default, [Process]s are resolved as part of the [Topology], meaning each [Entity] in the topology should also + * implement its simulation behaviour by deriving from the [Process] interface. + * + * @property kernel The kernel that facilitates the simulation. + * @property topology The topology to run the simulation over. + * @property clock The clock to use for simulation time. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class OmegaSimulation(override val kernel: OmegaKernel, override val topology: Topology, + override val clock: Clock = TickClock()) : Simulation { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap<Entity<*>, OmegaContext<*, *>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + + /** + * Initialise the simulator. + */ + init { + topology.forEach { resolve(it) } + registry.values.forEach { context -> + @Suppress("UNCHECKED_CAST") + val process = context.entity as Process<Entity<*>> + + // Start all process co-routines + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + } + } + + /** + * Step through one event in the simulation. + */ + override fun step() { + while (true) { + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > clock.now) { + // Tick has yet to occur + // Jump in time to next event + clock.advanceTo(delivery) + break + } else if (delivery < clock.now) { + // Tick has already occurred + logger.warn { "message processed out of order" } + } + queue.poll() + + val context = registry[envelope.destination] ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = clock.now + } + } + + /** + * Run a simulation over the specified [Topology]. + * This method will step through multiple cycles in the simulation until no more message exist in the queue. + */ + override fun run() { + while (queue.isNotEmpty()) { + step() + } + } + + /** + * Run a simulation over the specified [Topology], stepping through cycles until (exclusive) the specified clock + * tick has occurred. The control is then handed back to the user. + * + * @param until The point in simulation time at which the simulation should be paused and the control is handed + * back to the user. + */ + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (clock.now >= until) { + return + } + + while (clock.now < until && queue.isNotEmpty()) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (clock.now > until) { + clock.rewindTo(until) + } + } + + /** + * Schedule a message for processing by a [Process]. + * + * @param message The message to schedule. + * @param destination The destination of the message. + * @param sender The sender of the message. + * @param delay The amount of time to wait before processing the message. + */ + override fun schedule(message: Any, destination: Entity<*>, sender: Entity<*>?, delay: Duration): Receipt { + require(delay > 0) { "The amount of time to delay the message must be a positive number" } + val wrapped = MessageContainer(message, clock.now + delay, sender, destination) + queue.add(wrapped) + return wrapped + } + + /** + * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. + * + * @param entity The [Entity] to resolve the [Context] for. + * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated + * with it. + */ + private fun <E : Entity<*>> resolve(entity: E): Context<E>? { + if (entity !is Process<*>) + return null + + @Suppress("UNCHECKED_CAST") + return registry.computeIfAbsent(entity, { + OmegaContext(entity) + }) as Context<E> + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>, + Continuation<Unit>, TopologyContext by topology { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation<Envelope<*>> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = 0 + + /** + * The state of the entity. + */ + var state: S = entity.initialState + + /** + * The [Topology] over which the simulation is run. + */ + override val topology: Topology = this@OmegaSimulation.topology + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = clock.now + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Process] has executed some work. + */ + override val delta: Duration + get() = clock.now - last + + /** + * The [CoroutineContext] for a [Process]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + @Suppress("UNCHECKED_CAST") + override val <T : Entity<S>, S> T.state: S + get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> { + return suspendCoroutine { continuation = it } + } + + /** + * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. + * + * @param block The block to process the message with. + * @return The processed message. + */ + suspend override fun <T> receive(block: Envelope<*>.(Any) -> T): T { + val envelope = receiveEnvelope() + return block(envelope, envelope.message) + } + + /** + * Send the given message to the specified entity. + * + * @param msg The message to send. + * @param delay The amount of time to wait before the message should be received. + */ + suspend override fun Entity<*>.send(msg: Any, delay: Duration) = send(msg, entity, delay) + + /** + * Send the given message to the specified entity. + * + * @param msg The message to send. + * @param sender The sender of the message. + * @param delay The amount of time to wait before the message should be received. + */ + suspend override fun Entity<*>.send(msg: Any, sender: Entity<*>, delay: Duration): Receipt { + return schedule(msg, this, sender, delay) + } + + /** + * Send an interruption message to the given [Entity]. + */ + suspend override fun Entity<*>.interrupt() { + send(Interrupt) + } + + /** + * Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the + * [Clock]. + * + * @return `true` to allow usage in while statements. + */ + suspend override fun tick(): Boolean { + wait(clock.tick) + return true + } + + /** + * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming + * execution. + * + * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend + * the process until the no more messages at an earlier point in time have to be processed. + * + * @param duration The duration of simulation time to wait before resuming execution. + */ + suspend override fun wait(duration: Duration) { + require(duration > 0) { "The amount of time to suspend must be a non-zero positive number" } + schedule(Resume, entity, entity, duration) + + while (true) { + if (receive() is Resume) + return + } + } + + /** + * Update the state of the entity being simulated. + * + * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects + * referencing the old entity having their data changed. + * + * @param next The next state of the entity. + */ + suspend override fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) { + @Suppress("UNCHECKED_CAST") + (this as OmegaContext<E, S>).state = next + } + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) {} + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + val currentThread = Thread.currentThread() + currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) + } + } +} diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt index 1408d03f..4f48f20d 100644 --- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt +++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -34,11 +34,11 @@ import org.junit.jupiter.api.Test internal class SmokeTest { @Test fun smoke() { + val rack = Rack() val builder = AdjacencyList.builder() val topology = builder.construct { - val rack = Rack() add(rack) - val n = 1000 + val n = 100 // Create n machines in the rack repeat(n) { val machine = Machine() @@ -55,7 +55,7 @@ internal class SmokeTest { } } - val simulator = OmegaKernel(topology) - simulator.run() + val simulation = OmegaKernel.create(topology) + simulation.run() } } |
