From a031db367c71ec1604b34f3765198c2196bfe551 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 21 Sep 2017 23:25:44 +0200 Subject: Create simple datacenter experiment --- .../kotlin/nl/atlarge/opendc/extension/Node.kt | 43 --------------- .../opendc/extension/topology/Traversable.kt | 43 +++++++++++++++ .../kotlin/nl/atlarge/opendc/kernel/Context.kt | 15 +++++- .../atlarge/opendc/kernel/omega/OmegaSimulation.kt | 34 +++++++++++- .../src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt | 41 +++++++++++--- opendc-stdlib/build.gradle | 1 + .../nl/atlarge/opendc/scheduler/FifoScheduler.kt | 15 ++++-- .../nl/atlarge/opendc/scheduler/SrtfScheduler.kt | 5 +- .../opendc/topology/container/Datacenter.kt | 62 +++++++++++++++++++++- .../nl/atlarge/opendc/topology/machine/Machine.kt | 40 ++++++++++---- .../main/kotlin/nl/atlarge/opendc/workload/Task.kt | 2 + 11 files changed, 229 insertions(+), 72 deletions(-) delete mode 100644 opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt create mode 100644 opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt deleted file mode 100644 index 83100587..00000000 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package nl.atlarge.opendc.extension - -import nl.atlarge.opendc.topology.Edge - -/** - * Filter a [Set] of [Edge]s based on the tag of the edges and return the origin nodes casted to type `T`. - * - * @param tag The tag of the edges to get. - * @return An [Iterable] of the specified type `T` with the given tag. - */ -inline fun Set>.origins(tag: String) = filter { it.tag == tag }.map { it.from as T } - -/** - * Filter a [Set] of [Edge]s based on the tag of the edges and return the destination nodes casted to type `T`. - * - * @param tag The tag of the edges to get. - * @return An [Iterable] of the specified type `T` with the given tag. - */ -inline fun Set>.destinations(tag: String) = filter { it.tag == tag }.map { it.to as T } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt new file mode 100644 index 00000000..b5b5ec82 --- /dev/null +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.extension.topology + +import nl.atlarge.opendc.topology.Edge + +/** + * Filter a [Set] of [Edge]s based on the tag of the edges and return the origin nodes casted to type `T`. + * + * @param tag The tag of the edges to get. + * @return An [Iterable] of the specified type `T` with the given tag. + */ +inline fun Set>.origins(tag: String) = filter { it.tag == tag }.map { it.from as T } + +/** + * Filter a [Set] of [Edge]s based on the tag of the edges and return the destination nodes casted to type `T`. + * + * @param tag The tag of the edges to get. + * @return An [Iterable] of the specified type `T` with the given tag. + */ +inline fun Set>.destinations(tag: String) = filter { it.tag == tag }.map { it.to as T } diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt index 46cb271e..6185e273 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt @@ -33,6 +33,7 @@ import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.Topology import nl.atlarge.opendc.topology.TopologyContext import java.lang.Process +import java.util.* /** * This interface provides a context for simulation [Process]es, which defines the environment in which the simulation @@ -87,7 +88,7 @@ interface Context> : Readable, Writable, TopologyContext { /** * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming - * execution. + * execution and drop all messages that are received during this period. * * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend * the process until the no more messages at an earlier point in time have to be processed. @@ -96,6 +97,18 @@ interface Context> : Readable, Writable, TopologyContext { */ suspend fun wait(duration: Duration) + /** + * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming + * execution and push all messages that are received during this period to the given queue. + * + * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend + * the process until the no more messages at an earlier point in time have to be processed. + * + * @param duration The duration of simulation time to wait before resuming execution. + * @param queue The mutable queue to push the messages to. + */ + suspend fun wait(duration: Duration, queue: Queue) + /** * Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the * [Clock]. diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt index a29f2b08..67b192fb 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -202,7 +202,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to /** * The last point in time the process has done some work. */ - var last: Instant = 0 + var last: Instant = -1 /** * The state of the entity. @@ -332,7 +332,11 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param duration The duration of simulation time to wait before resuming execution. */ suspend override fun wait(duration: Duration) { - require(duration > 0) { "The amount of time to suspend must be a non-zero positive number" } + require(duration >= 0) { "The amount of time to suspend must be a positive number" } + + if (duration == 0.toLong()) + return + schedule(Resume, entity, entity, duration) while (true) { @@ -341,6 +345,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to } } + /** + * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming + * execution and push all messages that are received during this period to the given queue. + * + * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend + * the process until the no more messages at an earlier point in time have to be processed. + * + * @param duration The duration of simulation time to wait before resuming execution. + * @param queue The mutable queue to push the messages to. + */ + suspend override fun wait(duration: Duration, queue: Queue) { + require(duration >= 0) { "The amount of time to suspend must be a positive number" } + + if (duration == 0.toLong()) + return + + schedule(Resume, entity, entity, duration) + + while (true) { + val msg = receive() + if (msg is Resume) + return + queue.add(msg) + } + } + /** * Update the state of the entity being simulated. * diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt index 4f48f20d..e2eb8269 100644 --- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt +++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -25,25 +25,46 @@ package nl.atlarge.opendc import nl.atlarge.opendc.kernel.omega.OmegaKernel +import nl.atlarge.opendc.scheduler.FifoScheduler +import nl.atlarge.opendc.scheduler.SrtfScheduler import nl.atlarge.opendc.topology.AdjacencyList +import nl.atlarge.opendc.topology.container.Datacenter import nl.atlarge.opendc.topology.container.Rack +import nl.atlarge.opendc.topology.container.Room import nl.atlarge.opendc.topology.machine.Cpu import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task import org.junit.jupiter.api.Test +import java.util.* internal class SmokeTest { @Test fun smoke() { - val rack = Rack() + val datacenter = Datacenter(SrtfScheduler(), 50) val builder = AdjacencyList.builder() val topology = builder.construct { - add(rack) - val n = 100 + add(datacenter) + + // Add a room to the datacenter + val room = Room().also { + add(it) + connect(datacenter, it, tag = "room") + } + + // Add a rack to the room + val rack = Rack().also { + add(it) + connect(room, it, tag = "rack") + } + + val n = 10 + // Create n machines in the rack repeat(n) { - val machine = Machine() - add(machine) - connect(rack, machine, tag = "machine") + val machine = Machine().also { + add(it) + connect(rack, it, tag = "machine") + } val cpu1 = Cpu(10, 2, 2) val cpu2 = Cpu(5, 3, 2) @@ -56,6 +77,12 @@ internal class SmokeTest { } val simulation = OmegaKernel.create(topology) - simulation.run() + val random = Random(0) + for (i in 0..100) { + val task = Task(i, emptySet(), random.nextInt(10000).toLong()) + simulation.schedule(task, datacenter, delay = random.nextInt(15000).toLong()) + } + simulation.run(50000) + } } diff --git a/opendc-stdlib/build.gradle b/opendc-stdlib/build.gradle index 1013b82d..a0c92459 100644 --- a/opendc-stdlib/build.gradle +++ b/opendc-stdlib/build.gradle @@ -77,6 +77,7 @@ repositories { dependencies { compile project(':opendc-core') + compile "io.github.microutils:kotlin-logging:1.4.6" testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt index cc196a00..5382e48b 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt @@ -54,19 +54,24 @@ class FifoScheduler : Scheduler { return } + val iterator = queue.iterator() + machines - .filter { it.state.status == Machine.Status.IDLE } - .forEach { - while (queue.isNotEmpty()) { - val task = queue.poll() + .forEach { machine -> + while (iterator.hasNext()) { + val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed if (!task.isReady()) { + iterator.remove() submit(task) continue + } else if (task.finished) { + iterator.remove() + continue } - it.send(task) + machine.send(task) break } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt index ce80ddc3..0e94f81a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt @@ -57,8 +57,7 @@ class SrtfScheduler : Scheduler { val iterator = tasks.sortedBy { it.remaining }.iterator() machines - .filter { it.state.status == Machine.Status.IDLE } - .forEach { + .forEach { machine -> while (iterator.hasNext()) { val task = iterator.next() @@ -71,7 +70,7 @@ class SrtfScheduler : Scheduler { continue } - it.send(task) + machine.send(task) break } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index 4136c03c..f8954527 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -24,16 +24,76 @@ package nl.atlarge.opendc.topology.container +import mu.KotlinLogging +import nl.atlarge.opendc.extension.topology.destinations +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.kernel.Process +import nl.atlarge.opendc.kernel.time.Duration +import nl.atlarge.opendc.scheduler.Scheduler import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task +import java.util.* /** * A representation of a facility used to house computer systems and associated components. * + * @property scheduler The tasks scheduler the datacenter uses. + * @property interval The interval at which task will be (re)scheduled. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Datacenter : Entity { +class Datacenter(val scheduler: Scheduler, val interval: Duration) : Entity, Process { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + /** * The initial state of the entity. */ override val initialState = Unit + + /** + * This method is invoked to start the simulation an [Entity] associated with this [Process]. + * + * This method is assumed to be running during a simulation, but should hand back control to the simulator at + * some point by suspending the process. This allows other processes to do work in the current tick of the + * simulation. + * Suspending the process can be achieved by calling suspending method in the context: + * - [Context.tick] - Wait for the next tick to occur + * - [Context.wait] - Wait for `n` amount of ticks before resuming execution. + * - [Context.receive] - Wait for a message to be received in the mailbox of the [Entity] before resuming + * execution. + * + * If this method exits early, before the simulation has finished, the entity is assumed to be shutdown and its + * simulation will not run any further. + */ + suspend override fun Context.run() { + // The queue of messages to be processed after a cycle + val queue: Queue = ArrayDeque() + // Find all machines in the datacenter + val machines = outgoingEdges.destinations("room").asSequence() + .flatMap { it.outgoingEdges.destinations("rack").asSequence() } + .flatMap { it.outgoingEdges.destinations("machine").asSequence() }.toList() + + logger.info { "Initialising datacenter with ${machines.size} machines" } + + // Register all machines to the scheduler + machines.forEach(scheduler::register) + + while (true) { + // Process all messages in the queue + while (queue.isNotEmpty()) { + val msg = queue.poll() + if (msg is Task) { + scheduler.submit(msg) + } + } + // (Re)schedule the tasks + scheduler.run { schedule() } + + // Sleep a time quantum + wait(interval, queue) + } + } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index 163f280f..3305581a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -24,7 +24,8 @@ package nl.atlarge.opendc.topology.machine -import nl.atlarge.opendc.extension.destinations +import mu.KotlinLogging +import nl.atlarge.opendc.extension.topology.destinations import nl.atlarge.opendc.workload.Task import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process @@ -38,6 +39,11 @@ import nl.atlarge.opendc.topology.Entity * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ class Machine : Entity, Process { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + /** * The status of a machine. */ @@ -63,19 +69,20 @@ class Machine : Entity, Process { val interval: Duration = 10 val cpus = outgoingEdges.destinations("cpu") - val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong() - var task: Task? = null + val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) + var task: Task = receiveTask() + update(State(Status.RUNNING, task)) while (true) { - if (task != null) { - if (task.finished) { - task = null - update(State(Status.IDLE)) - } else { - task.consume(speed * delta) - } + if (task.finished) { + logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } + update(State(Status.IDLE)) + task = receiveTask() + } else { + task.consume(speed * delta) } + // Check if we have received a new order in the meantime. val msg = receive(interval) if (msg is Task) { task = msg @@ -83,4 +90,17 @@ class Machine : Entity, Process { } } } + + /** + * Wait for a [Task] to be received by the [Process] and discard all other messages received in the meantime. + * + * @return The task that has been received. + */ + private suspend fun Context.receiveTask(): Task { + while (true) { + val msg = receive() + if (msg is Task) + return msg + } + } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt index 9a8c84e6..bd2b0604 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt @@ -24,6 +24,8 @@ package nl.atlarge.opendc.workload +import nl.atlarge.opendc.kernel.time.Instant + /** * A task represents some computation that is part of a [Job]. * -- cgit v1.2.3