summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2017-09-04 12:56:44 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2017-09-04 12:56:44 +0200
commit96f75806dbdcb8c43a22c7db98e85ac5e854e68b (patch)
tree0cd0202c7cb85482621b5624b710acef1604bea3
parent2e8df509bb2fc513b7f793d51c6a85cf6bbe62ca (diff)
Move simulation kernels into topology entities
This change embeds simulation kernels into the entities and relations of the topology.
-rw-r--r--opendc-core/build.gradle25
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt65
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt6
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt2
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt68
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt61
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt23
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt38
-rw-r--r--opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt40
9 files changed, 132 insertions, 196 deletions
diff --git a/opendc-core/build.gradle b/opendc-core/build.gradle
index f2999d82..bf71b63b 100644
--- a/opendc-core/build.gradle
+++ b/opendc-core/build.gradle
@@ -25,7 +25,6 @@
/* Build configuration */
buildscript {
ext.kotlin_version = '1.1.4-3'
- ext.kotlin_version = '1.1.4'
repositories {
mavenCentral()
@@ -45,7 +44,13 @@ plugins {
apply plugin: 'org.junit.platform.gradle.plugin'
apply plugin: 'kotlin'
-tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all {
+compileKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
+
+compileTestKotlin {
kotlinOptions {
jvmTarget = "1.8"
}
@@ -67,19 +72,11 @@ repositories {
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
- compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.18'
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version"
+ compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.18"
+ compile "io.github.microutils:kotlin-logging:1.4.6"
testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3"
testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3"
- compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version"
-}
-compileKotlin {
- kotlinOptions {
- jvmTarget = "1.8"
- }
-}
-compileTestKotlin {
- kotlinOptions {
- jvmTarget = "1.8"
- }
+ testCompile "org.slf4j:slf4j-simple:1.7.25"
}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt
deleted file mode 100644
index 35b89e4e..00000000
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package nl.atlarge.opendc.kernel
-
-import nl.atlarge.opendc.kernel.messaging.ReadablePort
-import nl.atlarge.opendc.kernel.messaging.WritableChannel
-import nl.atlarge.opendc.topology.Edge
-import nl.atlarge.opendc.topology.Entity
-import nl.atlarge.opendc.topology.Node
-
-/**
- * A simulation kernel that simulates a single entity in the topology of a cloud network.
- *
- * @param ctx The context in which the simulation is run.
- * @param E The shape of the component to simulate.
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-abstract class AbstractEntityKernel<E: Entity<*>>(private val ctx: EntityContext<E>): Kernel<EntityContext<E>> {
- /**
- * The [Node] that is simulated.
- */
- val self: Node<E> = ctx.component
-
- /**
- * Create a [WritableChannel] over the edge with the given tag.
- *
- * @param tag The tag of the edge to create a channel over.
- * @return The channel that has been created or the cached result.
- */
- inline fun <reified T: Edge<*>> output(tag: String): WritableChannel<T> {
- TODO()
- }
-
- /**
- * Create a [ReadablePort] over the edges with the given tag.
- *
- * @param tag The tag of the edges to create a port over.
- * @return The port that has been created or the cached result.
- */
- inline fun <reified T: Edge<*>> input(tag: String): ReadablePort<T> {
- TODO()
- }
-}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
index 81431e02..fb8c2044 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
@@ -27,6 +27,7 @@ package nl.atlarge.opendc.kernel
import nl.atlarge.opendc.kernel.messaging.Readable
import nl.atlarge.opendc.topology.Component
import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.Topology
/**
* The [Context] interface provides a context for a simulation kernel, which defines the environment in which the
@@ -36,6 +37,11 @@ import nl.atlarge.opendc.topology.Entity
*/
interface Context<out T: Component<*>>: Readable {
/**
+ * The [Topology] over which the simulation is run.
+ */
+ val topology: Topology
+
+ /**
* The [Component] that is simulated.
*/
val component: T
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
index 5f399c57..4a289580 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
@@ -42,5 +42,5 @@ interface Kernel<in C: Context<*>> {
* <p>If this method exists early, before the simulation has finished, the entity is assumed to be shutdown and its
* simulation will not run any further.
*/
- suspend fun C.run()
+ suspend fun C.simulate()
}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
index 3ad16da5..dfe41295 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
@@ -24,6 +24,7 @@
package nl.atlarge.opendc.kernel
+import mu.KotlinLogging
import nl.atlarge.opendc.kernel.clock.Clock
import nl.atlarge.opendc.kernel.clock.Tick
import nl.atlarge.opendc.kernel.messaging.Envelope
@@ -35,14 +36,18 @@ import kotlin.coroutines.experimental.*
* A [Simulator] runs the simulation over the specified topology.
*
* @param topology The topology to run the simulation over.
- * @param mapping The mapping of components in the topology to the simulation kernels the components should use.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class Simulator(val topology: Topology, private val mapping: Map<Component<*>, Class<out Kernel<*>>>): Iterator<Unit> {
+class Simulator(val topology: Topology): Iterator<Unit> {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
/**
* The registry of the simulation kernels used in the experiment.
*/
- private val registry: MutableMap<Component<*>, Pair<Kernel<Context<*>>, Context<*>>?> = HashMap()
+ private val registry: MutableMap<Component<*>, Context<*>?> = HashMap()
/**
* A mapping of the entities in the topology to their current state.
@@ -63,14 +68,15 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
node.outgoingEdges().forEach { resolve(it) }
}
- registry.values.forEach {
- it?.also { (kernel, ctx) ->
- // Start all kernel co-routines
- kernel.run {
- val block: suspend () -> Unit = { ctx.run() }
- block.startCoroutine(SimulationCoroutine())
- }
- }
+ registry.values.forEach { context ->
+ if (context == null)
+ return@forEach
+ @Suppress("UNCHECKED_CAST")
+ val kernel = context.component.label as Kernel<Context<*>>
+
+ // Start all kernel co-routines
+ val block: suspend () -> Unit = { kernel.run { context.simulate() } }
+ block.startCoroutine(KernelCoroutine())
}
}
@@ -80,23 +86,17 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
* @param component The component to resolve.
* @return The [Kernel] that simulates that [Component].
*/
- fun <T: Component<*>> resolve(component: T): Pair<Kernel<Context<T>>, Context<T>>? {
+ fun <T: Component<*>> resolve(component: T): Context<T>? {
@Suppress("UNCHECKED_CAST")
return registry.computeIfAbsent(component, {
- val constructor = mapping[it]?.constructors?.get(0)
- val ctx = if (component is Node<*>) {
- DefaultEntityContext(component as Node<*>)
- } else {
- DefaultChannelContext(component as Edge<*>)
- }
-
- if (constructor == null) {
- println("warning: invalid constructor for kernel ${mapping[it]}")
+ if (component.label !is Kernel<*>)
null
- } else {
- Pair(constructor.newInstance(ctx) as Kernel<Context<*>>, ctx)
+ else when (component) {
+ is Node<*> -> DefaultEntityContext(component as Node<*>)
+ is Edge<*> -> DefaultChannelContext(component as Edge<*>)
+ else -> null
}
- }) as? Pair<Kernel<Context<T>>, Context<T>>?
+ }) as Context<T>?
}
/**
@@ -119,14 +119,17 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
break
else if (tick < clock.tick)
// Tick has already occurred
- println("error: tick was not handled correctly")
+ logger.warn {"tick was not handled correctly"}
clock.queue.poll()
block()
}
}
- class SimulationCoroutine: Continuation<Unit> {
+ /**
+ * The co-routine which runs a simulation kernel.
+ */
+ private class KernelCoroutine: Continuation<Unit> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resume(value: Unit) {}
@@ -141,6 +144,11 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
*/
private inner class DefaultEntityContext<out T: Entity<*>>(override val component: Node<T>): EntityContext<T> {
/**
+ * The [Topology] over which the simulation is run.
+ */
+ override val topology: Topology = this@Simulator.topology
+
+ /**
* Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
*
* @param block The block to process the message with.
@@ -180,6 +188,11 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
*/
private inner class DefaultChannelContext<out T>(override val component: Edge<T>): ChannelContext<T> {
/**
+ * The [Topology] over which the simulation is run.
+ */
+ override val topology: Topology = this@Simulator.topology
+
+ /**
* Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
*
* @param block The block to process the message with.
@@ -210,6 +223,9 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
suspend override fun sleep(n: Int): Unit = suspendCoroutine { cont -> clock.scheduleAfter(n, { cont.resume(Unit) }) }
}
+ /**
+ * The [Clock] for this [Simulator] that keeps track of the simulation time in ticks.
+ */
private inner class DefaultClock: Clock {
override var tick: Tick = 0
internal val queue: PriorityQueue<Pair<Tick, () -> Unit>> = PriorityQueue(Comparator.comparingLong { it.first })
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt
deleted file mode 100644
index f81014e0..00000000
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package nl.atlarge.opendc.kernel.impl
-
-import nl.atlarge.opendc.experiment.Task
-import nl.atlarge.opendc.kernel.AbstractEntityKernel
-import nl.atlarge.opendc.kernel.EntityContext
-import nl.atlarge.opendc.topology.machine.Cpu
-import nl.atlarge.opendc.topology.machine.Machine
-
-class MachineKernel(ctx: EntityContext<Machine>): AbstractEntityKernel<Machine>(ctx) {
-
- suspend override fun EntityContext<Machine>.run() {
- println("${this}: Initialising!")
- println("${this}: ${component.entity.state}")
- update(Machine.State(Machine.Status.IDLE))
- println("${this}: ${component.entity.state}")
-
- val cpus = component.outgoingEdges().filter { it.tag == "cpu" }.map { it.to.entity as Cpu }
- val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores })
- val task: Task
-
- loop@while (true) {
- val msg = receive()
- when (msg) {
- is Task -> {
- task = msg
- break@loop
- }
- else -> println("warning: unhandled message $msg")
- }
- }
-
- while (tick()) {
- task.consume(speed.toLong())
- }
- }
-
-}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt
index 5b7076ed..d1e99f57 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt
@@ -39,7 +39,7 @@ interface Node<out T: Entity<*>>: Component<T> {
val id: Int
/**
- * Return the set of incoming edges of this node.
+ * Return the set of ingoing edges of this node.
*
* @return All edges whose destination is this node.
*/
@@ -58,3 +58,24 @@ interface Node<out T: Entity<*>>: Component<T> {
val entity: T
get() = label
}
+
+/**
+ * Return the set of ingoing edges of this node with the given tag.
+ *
+ * @param tag The tag of the edges to get.
+ * @param T The shape of the label of these edges.
+ * @return All edges whose destination is this node and have the given tag.
+ */
+inline fun <reified T> Node<*>.ingoing(tag: String) =
+ ingoingEdges().filter { it.tag == tag }.map { it as T }.toSet()
+
+
+/**
+ * Return the set of outgoing edges of this node with the given tag.
+ *
+ * @param tag The tag of the edges to get.
+ * @param T The shape of the label of these edges.
+ * @return All edges whose source is this node and have the given tag.
+ */
+inline fun <reified T> Node<*>.outgoing(tag: String) =
+ outgoingEdges().filter { it.tag == tag }.map { it as T }.toSet()
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
index 138abee7..f310e103 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
@@ -24,7 +24,12 @@
package nl.atlarge.opendc.topology.machine
+import nl.atlarge.opendc.experiment.Task
+import nl.atlarge.opendc.kernel.EntityContext
+import nl.atlarge.opendc.kernel.Kernel
import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.outgoing
+import java.util.*
/**
* A Physical Machine (PM) inside a rack of a datacenter. It has a speed, and can be given a workload on which it will
@@ -32,7 +37,7 @@ import nl.atlarge.opendc.topology.Entity
*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class Machine: Entity<Machine.State> {
+class Machine: Entity<Machine.State>, Kernel<EntityContext<Machine>> {
/**
* The status of a machine.
*/
@@ -49,4 +54,35 @@ class Machine: Entity<Machine.State> {
* The initial state of a [Machine] entity.
*/
override val initialState = State(Status.HALT)
+
+ /**
+ * Run the simulation kernel for this entity.
+ */
+ override suspend fun EntityContext<Machine>.simulate() {
+ update(state.copy(status = Machine.Status.IDLE))
+
+ val cpus = component.outgoing<Cpu>("cpu")
+ val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores })
+ val task: Task
+
+ val delay = Random().nextInt(1000)
+ sleep(delay)
+
+ loop@while (true) {
+ val msg = receive()
+ when (msg) {
+ is Task -> {
+ task = msg
+ break@loop
+ }
+ else -> println("warning: unhandled message $msg")
+ }
+ }
+
+ update(state.copy(status = Machine.Status.RUNNING))
+ while (tick()) {
+ task.consume(speed.toLong())
+ }
+ update(state.copy(status = Machine.Status.HALT))
+ }
}
diff --git a/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
index 300ffec8..f18e62bd 100644
--- a/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
+++ b/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
@@ -24,11 +24,8 @@
package nl.atlarge.opendc
-import nl.atlarge.opendc.kernel.Kernel
import nl.atlarge.opendc.kernel.Simulator
-import nl.atlarge.opendc.kernel.impl.MachineKernel
import nl.atlarge.opendc.topology.AdjacencyListTopologyBuilder
-import nl.atlarge.opendc.topology.Component
import nl.atlarge.opendc.topology.container.Rack
import nl.atlarge.opendc.topology.machine.Cpu
import nl.atlarge.opendc.topology.machine.Machine
@@ -37,37 +34,26 @@ import org.junit.jupiter.api.Test
internal class SmokeTest {
@Test
fun smoke() {
- val mapping: MutableMap<Component<*>, Class<out Kernel<*>>> = HashMap()
val builder = AdjacencyListTopologyBuilder()
val topology = builder.build().apply {
val rack = node(Rack())
- val machineA = node(Machine())
- val machineB = node(Machine())
+ val n = 10
+ // Create n machines in the rack
+ repeat(n) {
+ val machine = node(Machine())
+ connect(rack, machine, tag = "machine")
- connect(rack, machineA, tag = "machine")
- connect(rack, machineB, tag = "machine")
+ val cpu1 = node(Cpu(10, 2, 2))
+ val cpu2 = node(Cpu(5, 3, 2))
- val cpuA1 = node(Cpu(10, 2, 2))
- val cpuA2 = node(Cpu(5, 3, 2))
-
- connect(machineA, cpuA1, tag = "cpu")
- connect(machineA, cpuA2, tag = "cpu")
-
- val cpuB1 = node(Cpu(10, 2, 2))
- val cpuB2 = node(Cpu(5, 3, 2))
-
- connect(machineB, cpuB1, tag = "cpu")
- connect(machineB, cpuB2, tag = "cpu")
-
- mapping.apply {
- put(machineA, MachineKernel::class.java)
- put(machineB, MachineKernel::class.java)
+ connect(machine, cpu1, tag = "cpu")
+ connect(machine, cpu2, tag = "cpu")
}
+ }
- val simulator = Simulator(this, mapping)
- while (simulator.hasNext()) {
- simulator.next()
- }
+ val simulator = Simulator(topology)
+ while (simulator.hasNext()) {
+ simulator.next()
}
}
}