summaryrefslogtreecommitdiff
path: root/opendc-core
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2017-09-06 10:56:20 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2017-09-06 10:56:20 +0200
commit5a4d74ec372e8315a43e12499d6a777207550b73 (patch)
tree7b5eabbec84f4a8de9157b29b0f91963908a4b48 /opendc-core
parent89d38062bf439c7cf6f731ab6305ccc0e118d9e1 (diff)
Separate Simulator interface and implementation
This change separates the Simulator interface from one of its possible implementations. This allows us to easily swap simulators between experiments.
Diffstat (limited to 'opendc-core')
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/DefaultSimulator.kt237
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt205
-rw-r--r--opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt4
3 files changed, 245 insertions, 201 deletions
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/DefaultSimulator.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/DefaultSimulator.kt
new file mode 100644
index 00000000..a3d978e4
--- /dev/null
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/DefaultSimulator.kt
@@ -0,0 +1,237 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package nl.atlarge.opendc.kernel
+
+import mu.KotlinLogging
+import nl.atlarge.opendc.kernel.clock.Clock
+import nl.atlarge.opendc.kernel.clock.Tick
+import nl.atlarge.opendc.kernel.messaging.Envelope
+import nl.atlarge.opendc.topology.*
+import java.util.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * A [DefaultSimulator] runs the simulation over the specified topology.
+ *
+ * @param topology The topology to run the simulation over.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+class DefaultSimulator(override val topology: Topology): Simulator, Iterator<Unit> {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The registry of the simulation kernels used in the experiment.
+ */
+ private val registry: MutableMap<Component<*>, Context<*>?> = HashMap()
+
+ /**
+ * A mapping of the entities in the topology to their current state.
+ */
+ private val states: MutableMap<Entity<*>, Any?> = HashMap()
+
+ /**
+ * The clock of the simulator.
+ */
+ private val clock: DefaultClock = DefaultClock()
+
+ /**
+ * Initialize the simulator.
+ */
+ init {
+ topology.forEach { node ->
+ resolve(node)
+ node.outgoingEdges().forEach { resolve(it) }
+ }
+
+ registry.values.forEach { context ->
+ if (context == null)
+ return@forEach
+ @Suppress("UNCHECKED_CAST")
+ val kernel = context.component.label as Kernel<Context<*>>
+
+ // Start all kernel co-routines
+ val block: suspend () -> Unit = { kernel.run { context.simulate() } }
+ block.startCoroutine(KernelCoroutine())
+ }
+ }
+
+ /**
+ * Resolve the given [Component] to the [Kernel] of that component.
+ *
+ * @param component The component to resolve.
+ * @return The [Kernel] that simulates that [Component].
+ */
+ fun <T: Component<*>> resolve(component: T): Context<T>? {
+ @Suppress("UNCHECKED_CAST")
+ return registry.computeIfAbsent(component, {
+ if (component.label !is Kernel<*>)
+ null
+ else when (component) {
+ is Node<*> -> DefaultEntityContext(component as Node<*>)
+ is Edge<*> -> DefaultChannelContext(component as Edge<*>)
+ else -> null
+ }
+ }) as Context<T>?
+ }
+
+ /**
+ * Determine whether the simulator has a next non-empty cycle available.
+ *
+ * @return <code>true</code> if the simulator has a next non-empty cycle, <code>false</code> otherwise.
+ */
+ override fun hasNext(): Boolean = clock.queue.isNotEmpty()
+
+ /**
+ * Run the next cycle in the simulation.
+ */
+ override fun next() {
+ clock.tick++
+ while (true) {
+ val (tick, block) = clock.queue.peek() ?: return
+
+ if (tick > clock.tick)
+ // Tick has yet to occur
+ break
+ else if (tick < clock.tick)
+ // Tick has already occurred
+ logger.warn {"tick was not handled correctly"}
+
+ clock.queue.poll()
+ block()
+ }
+ }
+
+ /**
+ * The co-routine which runs a simulation kernel.
+ */
+ private class KernelCoroutine: Continuation<Unit> {
+ override val context: CoroutineContext = EmptyCoroutineContext
+ override fun resume(value: Unit) {}
+
+ override fun resumeWithException(exception: Throwable) {
+ val currentThread = Thread.currentThread()
+ currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
+ }
+ }
+
+ /**
+ * The [Context] for an entity within the simulation.
+ */
+ private inner class DefaultEntityContext<out T: Entity<*>>(override val component: Node<T>): EntityContext<T> {
+ /**
+ * The [Topology] over which the simulation is run.
+ */
+ override val topology: Topology = this@DefaultSimulator.topology
+
+ /**
+ * Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
+ *
+ * @param block The block to process the message with.
+ * @return The processed message.
+ */
+ suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T = suspendCoroutine {}
+
+ /**
+ * The observable state of an [Entity] within the simulation is provided by context.
+ */
+ @Suppress("UNCHECKED_CAST")
+ override val <S> Entity<S>.state: S
+ get() = states.computeIfAbsent(this, { initialState }) as S
+
+ /**
+ * Update the state of the entity being simulated.
+ *
+ * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects
+ * referencing the old entity having their data changed.
+ *
+ * @param next The next state of the entity.
+ */
+ suspend override fun <C: EntityContext<E>, E: Entity<S>, S> C.update(next: S) {
+ states.put(component.entity as Entity<*>, next)
+ }
+
+ /**
+ * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution.
+ *
+ * @param n The amount of ticks to suspend the simulation kernel.
+ */
+ suspend override fun wait(n: Int): Unit = suspendCoroutine { cont -> clock.scheduleAfter(n, { cont.resume(Unit) }) }
+ }
+
+ /**
+ * The [Context] for an edge within the simulation.
+ */
+ private inner class DefaultChannelContext<out T>(override val component: Edge<T>): ChannelContext<T> {
+ /**
+ * The [Topology] over which the simulation is run.
+ */
+ override val topology: Topology = this@DefaultSimulator.topology
+
+ /**
+ * Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
+ *
+ * @param block The block to process the message with.
+ * @return The processed message.
+ */
+ suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T = suspendCoroutine {}
+
+ /**
+ * Send the given message downstream.
+ *
+ * @param msg The message to send.
+ * @param sender The sender of the message.
+ */
+ suspend override fun send(msg: Any?, sender: Node<*>): Unit = suspendCoroutine {}
+
+ /**
+ * The observable state of an [Entity] within the simulation is provided by context.
+ */
+ @Suppress("UNCHECKED_CAST")
+ override val <S> Entity<S>.state: S
+ get() = states.computeIfAbsent(this, { initialState }) as S
+
+ /**
+ * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution.
+ *
+ * @param n The amount of ticks to suspend the simulation kernel.
+ */
+ suspend override fun wait(n: Int): Unit = suspendCoroutine { cont -> clock.scheduleAfter(n, { cont.resume(Unit) }) }
+ }
+
+ /**
+ * The [Clock] for this [DefaultSimulator] that keeps track of the simulation time in ticks.
+ */
+ private inner class DefaultClock: Clock {
+ override var tick: Tick = 0
+ internal val queue: PriorityQueue<Pair<Tick, () -> Unit>> = PriorityQueue(Comparator.comparingLong { it.first })
+
+ override fun scheduleAt(tick: Tick, block: () -> Unit) {
+ queue.add(Pair(tick, block))
+ }
+ }
+}
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
index c173418d..f336bf1b 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Simulator.kt
@@ -24,214 +24,21 @@
package nl.atlarge.opendc.kernel
-import mu.KotlinLogging
-import nl.atlarge.opendc.kernel.clock.Clock
-import nl.atlarge.opendc.kernel.clock.Tick
-import nl.atlarge.opendc.kernel.messaging.Envelope
-import nl.atlarge.opendc.topology.*
-import java.util.*
-import kotlin.coroutines.experimental.*
+import nl.atlarge.opendc.topology.Topology
/**
- * A [Simulator] runs the simulation over the specified topology.
+ * A [Simulator] implementation runs a simulation over the specified topology.
*
- * @param topology The topology to run the simulation over.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class Simulator(val topology: Topology): Iterator<Unit> {
+interface Simulator {
/**
- * The logger instance to use for the simulator.
+ * The [Topology] over which the simulation runs.
*/
- private val logger = KotlinLogging.logger {}
-
- /**
- * The registry of the simulation kernels used in the experiment.
- */
- private val registry: MutableMap<Component<*>, Context<*>?> = HashMap()
-
- /**
- * A mapping of the entities in the topology to their current state.
- */
- private val states: MutableMap<Entity<*>, Any?> = HashMap()
-
- /**
- * The clock of the simulator.
- */
- private val clock: DefaultClock = DefaultClock()
-
- /**
- * Initialize the simulator.
- */
- init {
- topology.forEach { node ->
- resolve(node)
- node.outgoingEdges().forEach { resolve(it) }
- }
-
- registry.values.forEach { context ->
- if (context == null)
- return@forEach
- @Suppress("UNCHECKED_CAST")
- val kernel = context.component.label as Kernel<Context<*>>
-
- // Start all kernel co-routines
- val block: suspend () -> Unit = { kernel.run { context.simulate() } }
- block.startCoroutine(KernelCoroutine())
- }
- }
-
- /**
- * Resolve the given [Component] to the [Kernel] of that component.
- *
- * @param component The component to resolve.
- * @return The [Kernel] that simulates that [Component].
- */
- fun <T: Component<*>> resolve(component: T): Context<T>? {
- @Suppress("UNCHECKED_CAST")
- return registry.computeIfAbsent(component, {
- if (component.label !is Kernel<*>)
- null
- else when (component) {
- is Node<*> -> DefaultEntityContext(component as Node<*>)
- is Edge<*> -> DefaultChannelContext(component as Edge<*>)
- else -> null
- }
- }) as Context<T>?
- }
-
- /**
- * Determine whether the simulator has a next non-empty cycle available.
- *
- * @return <code>true</code> if the simulator has a next non-empty cycle, <code>false</code> otherwise.
- */
- override fun hasNext(): Boolean = clock.queue.isNotEmpty()
+ val topology: Topology
/**
* Run the next cycle in the simulation.
*/
- override fun next() {
- clock.tick++
- while (true) {
- val (tick, block) = clock.queue.peek() ?: return
-
- if (tick > clock.tick)
- // Tick has yet to occur
- break
- else if (tick < clock.tick)
- // Tick has already occurred
- logger.warn {"tick was not handled correctly"}
-
- clock.queue.poll()
- block()
- }
- }
-
- /**
- * The co-routine which runs a simulation kernel.
- */
- private class KernelCoroutine: Continuation<Unit> {
- override val context: CoroutineContext = EmptyCoroutineContext
- override fun resume(value: Unit) {}
-
- override fun resumeWithException(exception: Throwable) {
- val currentThread = Thread.currentThread()
- currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
- }
- }
-
- /**
- * The [Context] for an entity within the simulation.
- */
- private inner class DefaultEntityContext<out T: Entity<*>>(override val component: Node<T>): EntityContext<T> {
- /**
- * The [Topology] over which the simulation is run.
- */
- override val topology: Topology = this@Simulator.topology
-
- /**
- * Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
- *
- * @param block The block to process the message with.
- * @return The processed message.
- */
- suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T = suspendCoroutine {}
-
- /**
- * The observable state of an [Entity] within the simulation is provided by context.
- */
- @Suppress("UNCHECKED_CAST")
- override val <S> Entity<S>.state: S
- get() = states.computeIfAbsent(this, { initialState }) as S
-
- /**
- * Update the state of the entity being simulated.
- *
- * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects
- * referencing the old entity having their data changed.
- *
- * @param next The next state of the entity.
- */
- suspend override fun <C: EntityContext<E>, E: Entity<S>, S> C.update(next: S) {
- states.put(component.entity as Entity<*>, next)
- }
-
- /**
- * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution.
- *
- * @param n The amount of ticks to suspend the simulation kernel.
- */
- suspend override fun wait(n: Int): Unit = suspendCoroutine { cont -> clock.scheduleAfter(n, { cont.resume(Unit) }) }
- }
-
- /**
- * The [Context] for an edge within the simulation.
- */
- private inner class DefaultChannelContext<out T>(override val component: Edge<T>): ChannelContext<T> {
- /**
- * The [Topology] over which the simulation is run.
- */
- override val topology: Topology = this@Simulator.topology
-
- /**
- * Retrieves and removes a single message from this channel suspending the caller while the channel is empty.
- *
- * @param block The block to process the message with.
- * @return The processed message.
- */
- suspend override fun <T> receive(block: Envelope<*>.(Any?) -> T): T = suspendCoroutine {}
-
- /**
- * Send the given message downstream.
- *
- * @param msg The message to send.
- * @param sender The sender of the message.
- */
- suspend override fun send(msg: Any?, sender: Node<*>): Unit = suspendCoroutine {}
-
- /**
- * The observable state of an [Entity] within the simulation is provided by context.
- */
- @Suppress("UNCHECKED_CAST")
- override val <S> Entity<S>.state: S
- get() = states.computeIfAbsent(this, { initialState }) as S
-
- /**
- * Suspend the simulation kernel for <code>n</code> ticks before resuming the execution.
- *
- * @param n The amount of ticks to suspend the simulation kernel.
- */
- suspend override fun wait(n: Int): Unit = suspendCoroutine { cont -> clock.scheduleAfter(n, { cont.resume(Unit) }) }
- }
-
- /**
- * The [Clock] for this [Simulator] that keeps track of the simulation time in ticks.
- */
- private inner class DefaultClock: Clock {
- override var tick: Tick = 0
- internal val queue: PriorityQueue<Pair<Tick, () -> Unit>> = PriorityQueue(Comparator.comparingLong { it.first })
-
- override fun scheduleAt(tick: Tick, block: () -> Unit) {
- queue.add(Pair(tick, block))
- }
- }
+ fun next()
}
diff --git a/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
index f18e62bd..3af0f8f3 100644
--- a/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
+++ b/opendc-core/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
@@ -24,7 +24,7 @@
package nl.atlarge.opendc
-import nl.atlarge.opendc.kernel.Simulator
+import nl.atlarge.opendc.kernel.DefaultSimulator
import nl.atlarge.opendc.topology.AdjacencyListTopologyBuilder
import nl.atlarge.opendc.topology.container.Rack
import nl.atlarge.opendc.topology.machine.Cpu
@@ -51,7 +51,7 @@ internal class SmokeTest {
}
}
- val simulator = Simulator(topology)
+ val simulator = DefaultSimulator(topology)
while (simulator.hasNext()) {
simulator.next()
}