From 96f75806dbdcb8c43a22c7db98e85ac5e854e68b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 4 Sep 2017 12:56:44 +0200 Subject: Move simulation kernels into topology entities This change embeds simulation kernels into the entities and relations of the topology. --- opendc-core/build.gradle | 25 ++++---- .../atlarge/opendc/kernel/AbstractEntityKernel.kt | 65 --------------------- .../kotlin/nl/atlarge/opendc/kernel/Context.kt | 6 ++ .../main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt | 2 +- .../kotlin/nl/atlarge/opendc/kernel/Simulator.kt | 68 +++++++++++++--------- .../nl/atlarge/opendc/kernel/impl/MachineKernel.kt | 61 ------------------- .../main/kotlin/nl/atlarge/opendc/topology/Node.kt | 23 +++++++- .../nl/atlarge/opendc/topology/machine/Machine.kt | 38 +++++++++++- .../src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt | 40 +++++-------- 9 files changed, 132 insertions(+), 196 deletions(-) delete mode 100644 opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt delete mode 100644 opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt (limited to 'opendc-core') diff --git a/opendc-core/build.gradle b/opendc-core/build.gradle index f2999d82..bf71b63b 100644 --- a/opendc-core/build.gradle +++ b/opendc-core/build.gradle @@ -25,7 +25,6 @@ /* Build configuration */ buildscript { ext.kotlin_version = '1.1.4-3' - ext.kotlin_version = '1.1.4' repositories { mavenCentral() @@ -45,7 +44,13 @@ plugins { apply plugin: 'org.junit.platform.gradle.plugin' apply plugin: 'kotlin' -tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all { +compileKotlin { + kotlinOptions { + jvmTarget = "1.8" + } +} + +compileTestKotlin { kotlinOptions { jvmTarget = "1.8" } @@ -67,19 +72,11 @@ repositories { dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" - compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.18' + compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version" + compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.18" + compile "io.github.microutils:kotlin-logging:1.4.6" testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" - compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version" -} -compileKotlin { - kotlinOptions { - jvmTarget = "1.8" - } -} -compileTestKotlin { - kotlinOptions { - jvmTarget = "1.8" - } + testCompile "org.slf4j:slf4j-simple:1.7.25" } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt deleted file mode 100644 index 35b89e4e..00000000 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package nl.atlarge.opendc.kernel - -import nl.atlarge.opendc.kernel.messaging.ReadablePort -import nl.atlarge.opendc.kernel.messaging.WritableChannel -import nl.atlarge.opendc.topology.Edge -import nl.atlarge.opendc.topology.Entity -import nl.atlarge.opendc.topology.Node - -/** - * A simulation kernel that simulates a single entity in the topology of a cloud network. - * - * @param ctx The context in which the simulation is run. - * @param E The shape of the component to simulate. - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -abstract class AbstractEntityKernel>(private val ctx: EntityContext): Kernel> { - /** - * The [Node] that is simulated. - */ - val self: Node = ctx.component - - /** - * Create a [WritableChannel] over the edge with the given tag. - * - * @param tag The tag of the edge to create a channel over. - * @return The channel that has been created or the cached result. - */ - inline fun > output(tag: String): WritableChannel { - TODO() - } - - /** - * Create a [ReadablePort] over the edges with the given tag. - * - * @param tag The tag of the edges to create a port over. - * @return The port that has been created or the cached result. - */ - inline fun > input(tag: String): ReadablePort { - TODO() - } -} diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt index 81431e02..fb8c2044 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt @@ -27,6 +27,7 @@ package nl.atlarge.opendc.kernel import nl.atlarge.opendc.kernel.messaging.Readable import nl.atlarge.opendc.topology.Component import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.Topology /** * The [Context] interface provides a context for a simulation kernel, which defines the environment in which the @@ -35,6 +36,11 @@ import nl.atlarge.opendc.topology.Entity * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ interface Context>: Readable { + /** + * The [Topology] over which the simulation is run. + */ + val topology: Topology + /** * The [Component] that is simulated. */ diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt index 5f399c57..4a289580 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt @@ -42,5 +42,5 @@ interface Kernel> { *

If this method exists early, before the simulation has finished, the entity is assumed to be shutdown and its * simulation will not run any further. */ - suspend fun C.run() + suspend fun C.simulate() } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt index 3ad16da5..dfe41295 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt @@ -24,6 +24,7 @@ package nl.atlarge.opendc.kernel +import mu.KotlinLogging import nl.atlarge.opendc.kernel.clock.Clock import nl.atlarge.opendc.kernel.clock.Tick import nl.atlarge.opendc.kernel.messaging.Envelope @@ -35,14 +36,18 @@ import kotlin.coroutines.experimental.* * A [Simulator] runs the simulation over the specified topology. * * @param topology The topology to run the simulation over. - * @param mapping The mapping of components in the topology to the simulation kernels the components should use. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Simulator(val topology: Topology, private val mapping: Map, Class>>): Iterator { +class Simulator(val topology: Topology): Iterator { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + /** * The registry of the simulation kernels used in the experiment. */ - private val registry: MutableMap, Pair>, Context<*>>?> = HashMap() + private val registry: MutableMap, Context<*>?> = HashMap() /** * A mapping of the entities in the topology to their current state. @@ -63,14 +68,15 @@ class Simulator(val topology: Topology, private val mapping: Map, C node.outgoingEdges().forEach { resolve(it) } } - registry.values.forEach { - it?.also { (kernel, ctx) -> - // Start all kernel co-routines - kernel.run { - val block: suspend () -> Unit = { ctx.run() } - block.startCoroutine(SimulationCoroutine()) - } - } + registry.values.forEach { context -> + if (context == null) + return@forEach + @Suppress("UNCHECKED_CAST") + val kernel = context.component.label as Kernel> + + // Start all kernel co-routines + val block: suspend () -> Unit = { kernel.run { context.simulate() } } + block.startCoroutine(KernelCoroutine()) } } @@ -80,23 +86,17 @@ class Simulator(val topology: Topology, private val mapping: Map, C * @param component The component to resolve. * @return The [Kernel] that simulates that [Component]. */ - fun > resolve(component: T): Pair>, Context>? { + fun > resolve(component: T): Context? { @Suppress("UNCHECKED_CAST") return registry.computeIfAbsent(component, { - val constructor = mapping[it]?.constructors?.get(0) - val ctx = if (component is Node<*>) { - DefaultEntityContext(component as Node<*>) - } else { - DefaultChannelContext(component as Edge<*>) - } - - if (constructor == null) { - println("warning: invalid constructor for kernel ${mapping[it]}") + if (component.label !is Kernel<*>) null - } else { - Pair(constructor.newInstance(ctx) as Kernel>, ctx) + else when (component) { + is Node<*> -> DefaultEntityContext(component as Node<*>) + is Edge<*> -> DefaultChannelContext(component as Edge<*>) + else -> null } - }) as? Pair>, Context>? + }) as Context? } /** @@ -119,14 +119,17 @@ class Simulator(val topology: Topology, private val mapping: Map, C break else if (tick < clock.tick) // Tick has already occurred - println("error: tick was not handled correctly") + logger.warn {"tick was not handled correctly"} clock.queue.poll() block() } } - class SimulationCoroutine: Continuation { + /** + * The co-routine which runs a simulation kernel. + */ + private class KernelCoroutine: Continuation { override val context: CoroutineContext = EmptyCoroutineContext override fun resume(value: Unit) {} @@ -140,6 +143,11 @@ class Simulator(val topology: Topology, private val mapping: Map, C * The [Context] for an entity within the simulation. */ private inner class DefaultEntityContext>(override val component: Node): EntityContext { + /** + * The [Topology] over which the simulation is run. + */ + override val topology: Topology = this@Simulator.topology + /** * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. * @@ -180,6 +188,11 @@ class Simulator(val topology: Topology, private val mapping: Map, C */ private inner class DefaultChannelContext(override val component: Edge): ChannelContext { /** + * The [Topology] over which the simulation is run. + */ + override val topology: Topology = this@Simulator.topology + + /** * Retrieves and removes a single message from this channel suspending the caller while the channel is empty. * * @param block The block to process the message with. @@ -210,6 +223,9 @@ class Simulator(val topology: Topology, private val mapping: Map, C suspend override fun sleep(n: Int): Unit = suspendCoroutine { cont -> clock.scheduleAfter(n, { cont.resume(Unit) }) } } + /** + * The [Clock] for this [Simulator] that keeps track of the simulation time in ticks. + */ private inner class DefaultClock: Clock { override var tick: Tick = 0 internal val queue: PriorityQueue Unit>> = PriorityQueue(Comparator.comparingLong { it.first }) diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt deleted file mode 100644 index f81014e0..00000000 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package nl.atlarge.opendc.kernel.impl - -import nl.atlarge.opendc.experiment.Task -import nl.atlarge.opendc.kernel.AbstractEntityKernel -import nl.atlarge.opendc.kernel.EntityContext -import nl.atlarge.opendc.topology.machine.Cpu -import nl.atlarge.opendc.topology.machine.Machine - -class MachineKernel(ctx: EntityContext): AbstractEntityKernel(ctx) { - - suspend override fun EntityContext.run() { - println("${this}: Initialising!") - println("${this}: ${component.entity.state}") - update(Machine.State(Machine.Status.IDLE)) - println("${this}: ${component.entity.state}") - - val cpus = component.outgoingEdges().filter { it.tag == "cpu" }.map { it.to.entity as Cpu } - val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) - val task: Task - - loop@while (true) { - val msg = receive() - when (msg) { - is Task -> { - task = msg - break@loop - } - else -> println("warning: unhandled message $msg") - } - } - - while (tick()) { - task.consume(speed.toLong()) - } - } - -} diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt index 5b7076ed..d1e99f57 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt @@ -39,7 +39,7 @@ interface Node>: Component { val id: Int /** - * Return the set of incoming edges of this node. + * Return the set of ingoing edges of this node. * * @return All edges whose destination is this node. */ @@ -58,3 +58,24 @@ interface Node>: Component { val entity: T get() = label } + +/** + * Return the set of ingoing edges of this node with the given tag. + * + * @param tag The tag of the edges to get. + * @param T The shape of the label of these edges. + * @return All edges whose destination is this node and have the given tag. + */ +inline fun Node<*>.ingoing(tag: String) = + ingoingEdges().filter { it.tag == tag }.map { it as T }.toSet() + + +/** + * Return the set of outgoing edges of this node with the given tag. + * + * @param tag The tag of the edges to get. + * @param T The shape of the label of these edges. + * @return All edges whose source is this node and have the given tag. + */ +inline fun Node<*>.outgoing(tag: String) = + outgoingEdges().filter { it.tag == tag }.map { it as T }.toSet() diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index 138abee7..f310e103 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -24,7 +24,12 @@ package nl.atlarge.opendc.topology.machine +import nl.atlarge.opendc.experiment.Task +import nl.atlarge.opendc.kernel.EntityContext +import nl.atlarge.opendc.kernel.Kernel import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.outgoing +import java.util.* /** * A Physical Machine (PM) inside a rack of a datacenter. It has a speed, and can be given a workload on which it will @@ -32,7 +37,7 @@ import nl.atlarge.opendc.topology.Entity * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Machine: Entity { +class Machine: Entity, Kernel> { /** * The status of a machine. */ @@ -49,4 +54,35 @@ class Machine: Entity { * The initial state of a [Machine] entity. */ override val initialState = State(Status.HALT) + + /** + * Run the simulation kernel for this entity. + */ + override suspend fun EntityContext.simulate() { + update(state.copy(status = Machine.Status.IDLE)) + + val cpus = component.outgoing("cpu") + val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) + val task: Task + + val delay = Random().nextInt(1000) + sleep(delay) + + loop@while (true) { + val msg = receive() + when (msg) { + is Task -> { + task = msg + break@loop + } + else -> println("warning: unhandled message $msg") + } + } + + update(state.copy(status = Machine.Status.RUNNING)) + while (tick()) { + task.consume(speed.toLong()) + } + update(state.copy(status = Machine.Status.HALT)) + } } diff --git a/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt index 300ffec8..f18e62bd 100644 --- a/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt +++ b/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -24,11 +24,8 @@ package nl.atlarge.opendc -import nl.atlarge.opendc.kernel.Kernel import nl.atlarge.opendc.kernel.Simulator -import nl.atlarge.opendc.kernel.impl.MachineKernel import nl.atlarge.opendc.topology.AdjacencyListTopologyBuilder -import nl.atlarge.opendc.topology.Component import nl.atlarge.opendc.topology.container.Rack import nl.atlarge.opendc.topology.machine.Cpu import nl.atlarge.opendc.topology.machine.Machine @@ -37,37 +34,26 @@ import org.junit.jupiter.api.Test internal class SmokeTest { @Test fun smoke() { - val mapping: MutableMap, Class>> = HashMap() val builder = AdjacencyListTopologyBuilder() val topology = builder.build().apply { val rack = node(Rack()) - val machineA = node(Machine()) - val machineB = node(Machine()) + val n = 10 + // Create n machines in the rack + repeat(n) { + val machine = node(Machine()) + connect(rack, machine, tag = "machine") - connect(rack, machineA, tag = "machine") - connect(rack, machineB, tag = "machine") + val cpu1 = node(Cpu(10, 2, 2)) + val cpu2 = node(Cpu(5, 3, 2)) - val cpuA1 = node(Cpu(10, 2, 2)) - val cpuA2 = node(Cpu(5, 3, 2)) - - connect(machineA, cpuA1, tag = "cpu") - connect(machineA, cpuA2, tag = "cpu") - - val cpuB1 = node(Cpu(10, 2, 2)) - val cpuB2 = node(Cpu(5, 3, 2)) - - connect(machineB, cpuB1, tag = "cpu") - connect(machineB, cpuB2, tag = "cpu") - - mapping.apply { - put(machineA, MachineKernel::class.java) - put(machineB, MachineKernel::class.java) + connect(machine, cpu1, tag = "cpu") + connect(machine, cpu2, tag = "cpu") } + } - val simulator = Simulator(this, mapping) - while (simulator.hasNext()) { - simulator.next() - } + val simulator = Simulator(topology) + while (simulator.hasNext()) { + simulator.next() } } } -- cgit v1.2.3