diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-09-18 00:12:24 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2017-09-18 00:12:24 +0200 |
| commit | 62895f71b7a7479652d9b86f7036b6580b40b7c7 (patch) | |
| tree | 780dbb3ea34c957acd92453af6679ea47d5ce82a /opendc-omega | |
| parent | c4816f18fa1ab4528a6966d636c3bfd7eac7b82a (diff) | |
Refactor and split up code base
This change splits up the current code base into three different
module:
- opendc-core - This module defines the API which you can use to write
simulatable entities in a topology.
- opendc-omega - This module is the reference implementation of the API
defined the `opendc-core` module.
- opendc-stdlib - This module provides a standard library of entities
which can be used for datacenter simulation.
Diffstat (limited to 'opendc-omega')
5 files changed, 507 insertions, 0 deletions
diff --git a/opendc-omega/build.gradle b/opendc-omega/build.gradle new file mode 100644 index 00000000..89b5740d --- /dev/null +++ b/opendc-omega/build.gradle @@ -0,0 +1,87 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +buildscript { + ext.kotlin_version = '1.1.4-3' + ext.dokka_version = '0.9.15' + + repositories { + mavenCentral() + jcenter() + } + + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + classpath "org.jetbrains.dokka:dokka-gradle-plugin:$dokka_version" + classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0-RC3' + } +} + +apply plugin: 'java' +apply plugin: 'kotlin' +apply plugin: 'org.jetbrains.dokka' +apply plugin: 'org.junit.platform.gradle.plugin' + +compileKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} + +compileTestKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} + +kotlin { + experimental { + coroutines 'enable' + } +} + +dokka { + outputFormat = 'html' + outputDirectory = "$buildDir/javadoc" +} + +/* Project configuration */ +group 'nl.atlarge.opendc' +version '1.0' + +repositories { + jcenter() +} + +dependencies { + compile project(':opendc-core') + compile "io.github.microutils:kotlin-logging:1.4.6" + + testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" + testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" + testCompile "org.junit.platform:junit-platform-launcher:1.0.0-RC3" + testCompile "org.slf4j:slf4j-simple:1.7.25" + testCompile project(':opendc-stdlib') +} diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt new file mode 100644 index 00000000..c84f4dbf --- /dev/null +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaClock.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.omega + +import nl.atlarge.opendc.kernel.Clock +import nl.atlarge.opendc.kernel.Tick + +/** + * A [Clock] implementation used by the Omega simulation kernel. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +class OmegaClock: Clock { + /** + * The simulation time expressed as the amount of ticks that passed. + */ + override var tick: Tick = 0 +} diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt new file mode 100644 index 00000000..631b6d45 --- /dev/null +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaKernel.kt @@ -0,0 +1,281 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.omega + +import mu.KotlinLogging +import nl.atlarge.opendc.kernel.* +import nl.atlarge.opendc.kernel.messaging.Envelope +import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.Topology +import nl.atlarge.opendc.topology.TopologyContext +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * By default, [Process]s are resolved as part of the [Topology], meaning each [Entity] in the topology should also + * implement its simulation behaviour by deriving from the [Process] interface. + * + * @param topology The topology to run the simulation over. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +class OmegaKernel(override val topology: Topology) : Kernel { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap<Entity<*>, OmegaContext<*, *>> = HashMap() + + /** + * The message queue. + */ + private val queue: PriorityQueue<Envelope<*>> = PriorityQueue(Comparator.comparingLong(Envelope<*>::tick)) + + /** + * The clock of the simulator. + */ + private val clock: OmegaClock = OmegaClock() + + /** + * Initialise the simulator. + */ + init { + topology.forEach { resolve(it) } + registry.values.forEach { context -> + @Suppress("UNCHECKED_CAST") + val process = context.entity as Process<Entity<*>> + + // Start all process co-routines + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + } + } + + /** + * Step through one event in the simulation. + */ + override fun step() { + while (true) { + val envelope = queue.peek() ?: return + val tick = envelope.tick + + if (tick > clock.tick) { + // Tick has yet to occur + // Jump in time to next event + clock.tick = tick + break + } else if (tick < clock.tick) { + // Tick has already occurred + logger.warn { "message processed out of order" } + } + queue.poll() + + val context = registry[envelope.destination] ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message as Interrupt) + } + } + } + + /** + * Run a simulation over the specified [Topology]. + * This method will step through multiple cycles in the simulation until no more message exist in the queue. + */ + override fun run() { + while (queue.isNotEmpty()) { + step() + } + } + + /** + * Schedule a message for processing by a [Process]. + * + * @param message The message to schedule. + * @param destination The destination of the message. + * @param sender The sender of the message. + * @param delay The amount of ticks to wait before processing the message. + */ + override fun schedule(message: Any?, destination: Entity<*>, sender: Entity<*>?, delay: Int) { + require(delay > 0) { "The amount of ticks to delay the message must be a positive number" } + queue.add(Envelope(message, clock.tick + delay, sender, destination)) + } + + /** + * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. + * + * @param entity The [Entity] to resolve the [Context] for. + * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated + * with it. + */ + private fun <E : Entity<*>> resolve(entity: E): Context<E>? { + if (entity !is Process<*>) + return null + + @Suppress("UNCHECKED_CAST") + return registry.computeIfAbsent(entity, { + OmegaContext(entity) + }) as Context<E> + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>, + Continuation<Unit>, TopologyContext by topology { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation<Envelope<*>> + + /** + * The state of the entity. + */ + var state: S = entity.initialState + + /** + * The [Topology] over which the simulation is run. + */ + override val topology: Topology = this@OmegaKernel.topology + + /** + * The global [Clock] that keeps track of the simulation time. + */ + override val clock: Clock = this@OmegaKernel.clock + + /** + * The [CoroutineContext] for a [Process]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + @Suppress("UNCHECKED_CAST") + override val <T : Entity<S>, S> T.state: S + get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> { + return suspendCoroutine { continuation = it } + } + + /** + * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. + * + * @param block The block to process the message with. + * @return The processed message. + */ + suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T { + val envelope = receiveEnvelope() + return block(envelope, envelope.message) + } + + /** + * Send the given message downstream. + * + * @param msg The message to send. + * @param sender The sender of the message. + * @param delay The number of ticks before the message should be received. + */ + suspend override fun Entity<*>.send(msg: Any?, sender: Entity<*>?, delay: Int) { + schedule(msg, this, sender, delay) + } + + /** + * Send an interruption message to the given [Entity]. + */ + suspend override fun Entity<*>.interrupt() = send(Interrupt, this) + + /** + * Suspend the simulation kernel until the next tick occurs in the simulation. + */ + suspend override fun tick(): Boolean { + wait(1) + return true + } + + /** + * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution. + * + * @param n The amount of ticks to suspend the simulation kernel, with <code>n > 0</code> + */ + suspend override fun wait(n: Int) { + require(n > 0) { "The amount of ticks to suspend must be a non-zero positive number" } + queue.add(Envelope(Resume, clock.tick + n, entity, entity)) + + while (true) { + if (receive() is Resume) + return + } + } + + /** + * Update the state of the entity being simulated. + * + * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects + * referencing the old entity having their data changed. + * + * @param next The next state of the entity. + */ + suspend override fun <C : Context<E>, E : Entity<S>, S> C.update(next: S) { + @Suppress("UNCHECKED_CAST") + (this as OmegaContext<E, S>).state = next + } + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) {} + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + val currentThread = Thread.currentThread() + currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) + } + } +} diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/Resume.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/Resume.kt new file mode 100644 index 00000000..d20115d0 --- /dev/null +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/Resume.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.kernel.omega + +import nl.atlarge.opendc.kernel.Context + +/** + * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up + * and resume execution. + * + * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to + * wake up a process from another entity. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object Resume diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt new file mode 100644 index 00000000..1408d03f --- /dev/null +++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -0,0 +1,61 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc + +import nl.atlarge.opendc.kernel.omega.OmegaKernel +import nl.atlarge.opendc.topology.AdjacencyList +import nl.atlarge.opendc.topology.container.Rack +import nl.atlarge.opendc.topology.machine.Cpu +import nl.atlarge.opendc.topology.machine.Machine +import org.junit.jupiter.api.Test + +internal class SmokeTest { + @Test + fun smoke() { + val builder = AdjacencyList.builder() + val topology = builder.construct { + val rack = Rack() + add(rack) + val n = 1000 + // Create n machines in the rack + repeat(n) { + val machine = Machine() + add(machine) + connect(rack, machine, tag = "machine") + + val cpu1 = Cpu(10, 2, 2) + val cpu2 = Cpu(5, 3, 2) + add(cpu1) + add(cpu2) + + connect(machine, cpu1, tag = "cpu") + connect(machine, cpu2, tag = "cpu") + } + } + + val simulator = OmegaKernel(topology) + simulator.run() + } +} |
