summaryrefslogtreecommitdiff
path: root/opendc-core/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2017-09-04 12:56:44 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2017-09-04 12:56:44 +0200
commit96f75806dbdcb8c43a22c7db98e85ac5e854e68b (patch)
tree0cd0202c7cb85482621b5624b710acef1604bea3 /opendc-core/src/main
parent2e8df509bb2fc513b7f793d51c6a85cf6bbe62ca (diff)
Move simulation kernels into topology entities
This change embeds simulation kernels into the entities and relations of the topology.
Diffstat (limited to 'opendc-core/src/main')
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt65
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt6
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt2
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt68
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt61
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt23
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt38
7 files changed, 108 insertions, 155 deletions
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt
deleted file mode 100644
index 35b89e4e..00000000
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/AbstractEntityKernel.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package nl.atlarge.opendc.kernel
-
-import nl.atlarge.opendc.kernel.messaging.ReadablePort
-import nl.atlarge.opendc.kernel.messaging.WritableChannel
-import nl.atlarge.opendc.topology.Edge
-import nl.atlarge.opendc.topology.Entity
-import nl.atlarge.opendc.topology.Node
-
-/**
- * A simulation kernel that simulates a single entity in the topology of a cloud network.
- *
- * @param ctx The context in which the simulation is run.
- * @param E The shape of the component to simulate.
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-abstract class AbstractEntityKernel<E: Entity<*>>(private val ctx: EntityContext<E>): Kernel<EntityContext<E>> {
- /**
- * The [Node] that is simulated.
- */
- val self: Node<E> = ctx.component
-
- /**
- * Create a [WritableChannel] over the edge with the given tag.
- *
- * @param tag The tag of the edge to create a channel over.
- * @return The channel that has been created or the cached result.
- */
- inline fun <reified T: Edge<*>> output(tag: String): WritableChannel<T> {
- TODO()
- }
-
- /**
- * Create a [ReadablePort] over the edges with the given tag.
- *
- * @param tag The tag of the edges to create a port over.
- * @return The port that has been created or the cached result.
- */
- inline fun <reified T: Edge<*>> input(tag: String): ReadablePort<T> {
- TODO()
- }
-}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
index 81431e02..fb8c2044 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
@@ -27,6 +27,7 @@ package nl.atlarge.opendc.kernel
import nl.atlarge.opendc.kernel.messaging.Readable
import nl.atlarge.opendc.topology.Component
import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.Topology
/**
* The [Context] interface provides a context for a simulation kernel, which defines the environment in which the
@@ -36,6 +37,11 @@ import nl.atlarge.opendc.topology.Entity
*/
interface Context<out T: Component<*>>: Readable {
/**
+ * The [Topology] over which the simulation is run.
+ */
+ val topology: Topology
+
+ /**
* The [Component] that is simulated.
*/
val component: T
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
index 5f399c57..4a289580 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Kernel.kt
@@ -42,5 +42,5 @@ interface Kernel<in C: Context<*>> {
* <p>If this method exists early, before the simulation has finished, the entity is assumed to be shutdown and its
* simulation will not run any further.
*/
- suspend fun C.run()
+ suspend fun C.simulate()
}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
index 3ad16da5..dfe41295 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
@@ -24,6 +24,7 @@
package nl.atlarge.opendc.kernel
+import mu.KotlinLogging
import nl.atlarge.opendc.kernel.clock.Clock
import nl.atlarge.opendc.kernel.clock.Tick
import nl.atlarge.opendc.kernel.messaging.Envelope
@@ -35,14 +36,18 @@ import kotlin.coroutines.experimental.*
* A [Simulator] runs the simulation over the specified topology.
*
* @param topology The topology to run the simulation over.
- * @param mapping The mapping of components in the topology to the simulation kernels the components should use.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class Simulator(val topology: Topology, private val mapping: Map<Component<*>, Class<out Kernel<*>>>): Iterator<Unit> {
+class Simulator(val topology: Topology): Iterator<Unit> {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
/**
* The registry of the simulation kernels used in the experiment.
*/
- private val registry: MutableMap<Component<*>, Pair<Kernel<Context<*>>, Context<*>>?> = HashMap()
+ private val registry: MutableMap<Component<*>, Context<*>?> = HashMap()
/**
* A mapping of the entities in the topology to their current state.
@@ -63,14 +68,15 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
node.outgoingEdges().forEach { resolve(it) }
}
- registry.values.forEach {
- it?.also { (kernel, ctx) ->
- // Start all kernel co-routines
- kernel.run {
- val block: suspend () -> Unit = { ctx.run() }
- block.startCoroutine(SimulationCoroutine())
- }
- }
+ registry.values.forEach { context ->
+ if (context == null)
+ return@forEach
+ @Suppress("UNCHECKED_CAST")
+ val kernel = context.component.label as Kernel<Context<*>>
+
+ // Start all kernel co-routines
+ val block: suspend () -> Unit = { kernel.run { context.simulate() } }
+ block.startCoroutine(KernelCoroutine())
}
}
@@ -80,23 +86,17 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
* @param component The component to resolve.
* @return The [Kernel] that simulates that [Component].
*/
- fun <T: Component<*>> resolve(component: T): Pair<Kernel<Context<T>>, Context<T>>? {
+ fun <T: Component<*>> resolve(component: T): Context<T>? {
@Suppress("UNCHECKED_CAST")
return registry.computeIfAbsent(component, {
- val constructor = mapping[it]?.constructors?.get(0)
- val ctx = if (component is Node<*>) {
- DefaultEntityContext(component as Node<*>)
- } else {
- DefaultChannelContext(component as Edge<*>)
- }
-
- if (constructor == null) {
- println("warning: invalid constructor for kernel ${mapping[it]}")
+ if (component.label !is Kernel<*>)
null
- } else {
- Pair(constructor.newInstance(ctx) as Kernel<Context<*>>, ctx)
+ else when (component) {
+ is Node<*> -> DefaultEntityContext(component as Node<*>)
+ is Edge<*> -> DefaultChannelContext(component as Edge<*>)
+ else -> null
}
- }) as? Pair<Kernel<Context<T>>, Context<T>>?
+ }) as Context<T>?
}
/**
@@ -119,14 +119,17 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
break
else if (tick < clock.tick)
// Tick has already occurred
- println("error: tick was not handled correctly")
+ logger.warn {"tick was not handled correctly"}
clock.queue.poll()
block()
}
}
- class SimulationCoroutine: Continuation<Unit> {
+ /**
+ * The co-routine which runs a simulation kernel.
+ */
+ private class KernelCoroutine: Continuation<Unit> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resume(value: Unit) {}
@@ -141,6 +144,11 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
*/
private inner class DefaultEntityContext<out T: Entity<*>>(override val component: Node<T>): EntityContext<T> {
/**
+ * The [Topology] over which the simulation is run.
+ */
+ override val topology: Topology = this@Simulator.topology
+
+ /**
* Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
*
* @param block The block to process the message with.
@@ -180,6 +188,11 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
*/
private inner class DefaultChannelContext<out T>(override val component: Edge<T>): ChannelContext<T> {
/**
+ * The [Topology] over which the simulation is run.
+ */
+ override val topology: Topology = this@Simulator.topology
+
+ /**
* Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
*
* @param block The block to process the message with.
@@ -210,6 +223,9 @@ class Simulator(val topology: Topology, private val mapping: Map<Component<*>, C
suspend override fun sleep(n: Int): Unit = suspendCoroutine { cont -> clock.scheduleAfter(n, { cont.resume(Unit) }) }
}
+ /**
+ * The [Clock] for this [Simulator] that keeps track of the simulation time in ticks.
+ */
private inner class DefaultClock: Clock {
override var tick: Tick = 0
internal val queue: PriorityQueue<Pair<Tick, () -> Unit>> = PriorityQueue(Comparator.comparingLong { it.first })
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt
deleted file mode 100644
index f81014e0..00000000
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/impl/MachineKernel.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package nl.atlarge.opendc.kernel.impl
-
-import nl.atlarge.opendc.experiment.Task
-import nl.atlarge.opendc.kernel.AbstractEntityKernel
-import nl.atlarge.opendc.kernel.EntityContext
-import nl.atlarge.opendc.topology.machine.Cpu
-import nl.atlarge.opendc.topology.machine.Machine
-
-class MachineKernel(ctx: EntityContext<Machine>): AbstractEntityKernel<Machine>(ctx) {
-
- suspend override fun EntityContext<Machine>.run() {
- println("${this}: Initialising!")
- println("${this}: ${component.entity.state}")
- update(Machine.State(Machine.Status.IDLE))
- println("${this}: ${component.entity.state}")
-
- val cpus = component.outgoingEdges().filter { it.tag == "cpu" }.map { it.to.entity as Cpu }
- val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores })
- val task: Task
-
- loop@while (true) {
- val msg = receive()
- when (msg) {
- is Task -> {
- task = msg
- break@loop
- }
- else -> println("warning: unhandled message $msg")
- }
- }
-
- while (tick()) {
- task.consume(speed.toLong())
- }
- }
-
-}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt
index 5b7076ed..d1e99f57 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/Node.kt
@@ -39,7 +39,7 @@ interface Node<out T: Entity<*>>: Component<T> {
val id: Int
/**
- * Return the set of incoming edges of this node.
+ * Return the set of ingoing edges of this node.
*
* @return All edges whose destination is this node.
*/
@@ -58,3 +58,24 @@ interface Node<out T: Entity<*>>: Component<T> {
val entity: T
get() = label
}
+
+/**
+ * Return the set of ingoing edges of this node with the given tag.
+ *
+ * @param tag The tag of the edges to get.
+ * @param T The shape of the label of these edges.
+ * @return All edges whose destination is this node and have the given tag.
+ */
+inline fun <reified T> Node<*>.ingoing(tag: String) =
+ ingoingEdges().filter { it.tag == tag }.map { it as T }.toSet()
+
+
+/**
+ * Return the set of outgoing edges of this node with the given tag.
+ *
+ * @param tag The tag of the edges to get.
+ * @param T The shape of the label of these edges.
+ * @return All edges whose source is this node and have the given tag.
+ */
+inline fun <reified T> Node<*>.outgoing(tag: String) =
+ outgoingEdges().filter { it.tag == tag }.map { it as T }.toSet()
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
index 138abee7..f310e103 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
@@ -24,7 +24,12 @@
package nl.atlarge.opendc.topology.machine
+import nl.atlarge.opendc.experiment.Task
+import nl.atlarge.opendc.kernel.EntityContext
+import nl.atlarge.opendc.kernel.Kernel
import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.outgoing
+import java.util.*
/**
* A Physical Machine (PM) inside a rack of a datacenter. It has a speed, and can be given a workload on which it will
@@ -32,7 +37,7 @@ import nl.atlarge.opendc.topology.Entity
*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class Machine: Entity<Machine.State> {
+class Machine: Entity<Machine.State>, Kernel<EntityContext<Machine>> {
/**
* The status of a machine.
*/
@@ -49,4 +54,35 @@ class Machine: Entity<Machine.State> {
* The initial state of a [Machine] entity.
*/
override val initialState = State(Status.HALT)
+
+ /**
+ * Run the simulation kernel for this entity.
+ */
+ override suspend fun EntityContext<Machine>.simulate() {
+ update(state.copy(status = Machine.Status.IDLE))
+
+ val cpus = component.outgoing<Cpu>("cpu")
+ val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores })
+ val task: Task
+
+ val delay = Random().nextInt(1000)
+ sleep(delay)
+
+ loop@while (true) {
+ val msg = receive()
+ when (msg) {
+ is Task -> {
+ task = msg
+ break@loop
+ }
+ else -> println("warning: unhandled message $msg")
+ }
+ }
+
+ update(state.copy(status = Machine.Status.RUNNING))
+ while (tick()) {
+ task.consume(speed.toLong())
+ }
+ update(state.copy(status = Machine.Status.HALT))
+ }
}