diff options
| -rw-r--r-- | opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt (renamed from opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt) | 2 | ||||
| -rw-r--r-- | opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt | 15 | ||||
| -rw-r--r-- | opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt | 34 | ||||
| -rw-r--r-- | opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt | 41 | ||||
| -rw-r--r-- | opendc-stdlib/build.gradle | 1 | ||||
| -rw-r--r-- | opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt | 15 | ||||
| -rw-r--r-- | opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt | 5 | ||||
| -rw-r--r-- | opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt | 62 | ||||
| -rw-r--r-- | opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt | 40 | ||||
| -rw-r--r-- | opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt | 2 |
10 files changed, 187 insertions, 30 deletions
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt index 83100587..b5b5ec82 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package nl.atlarge.opendc.extension +package nl.atlarge.opendc.extension.topology import nl.atlarge.opendc.topology.Edge diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt index 46cb271e..6185e273 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt @@ -33,6 +33,7 @@ import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.Topology import nl.atlarge.opendc.topology.TopologyContext import java.lang.Process +import java.util.* /** * This interface provides a context for simulation [Process]es, which defines the environment in which the simulation @@ -87,7 +88,7 @@ interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext { /** * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming - * execution. + * execution and drop all messages that are received during this period. * * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend * the process until the no more messages at an earlier point in time have to be processed. @@ -97,6 +98,18 @@ interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext { suspend fun wait(duration: Duration) /** + * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming + * execution and push all messages that are received during this period to the given queue. + * + * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend + * the process until the no more messages at an earlier point in time have to be processed. + * + * @param duration The duration of simulation time to wait before resuming execution. + * @param queue The mutable queue to push the messages to. + */ + suspend fun wait(duration: Duration, queue: Queue<Any>) + + /** * Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the * [Clock]. * diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt index a29f2b08..67b192fb 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -202,7 +202,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to /** * The last point in time the process has done some work. */ - var last: Instant = 0 + var last: Instant = -1 /** * The state of the entity. @@ -332,7 +332,11 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param duration The duration of simulation time to wait before resuming execution. */ suspend override fun wait(duration: Duration) { - require(duration > 0) { "The amount of time to suspend must be a non-zero positive number" } + require(duration >= 0) { "The amount of time to suspend must be a positive number" } + + if (duration == 0.toLong()) + return + schedule(Resume, entity, entity, duration) while (true) { @@ -342,6 +346,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to } /** + * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming + * execution and push all messages that are received during this period to the given queue. + * + * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend + * the process until the no more messages at an earlier point in time have to be processed. + * + * @param duration The duration of simulation time to wait before resuming execution. + * @param queue The mutable queue to push the messages to. + */ + suspend override fun wait(duration: Duration, queue: Queue<Any>) { + require(duration >= 0) { "The amount of time to suspend must be a positive number" } + + if (duration == 0.toLong()) + return + + schedule(Resume, entity, entity, duration) + + while (true) { + val msg = receive() + if (msg is Resume) + return + queue.add(msg) + } + } + + /** * Update the state of the entity being simulated. * * <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt index 4f48f20d..e2eb8269 100644 --- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt +++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -25,25 +25,46 @@ package nl.atlarge.opendc import nl.atlarge.opendc.kernel.omega.OmegaKernel +import nl.atlarge.opendc.scheduler.FifoScheduler +import nl.atlarge.opendc.scheduler.SrtfScheduler import nl.atlarge.opendc.topology.AdjacencyList +import nl.atlarge.opendc.topology.container.Datacenter import nl.atlarge.opendc.topology.container.Rack +import nl.atlarge.opendc.topology.container.Room import nl.atlarge.opendc.topology.machine.Cpu import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task import org.junit.jupiter.api.Test +import java.util.* internal class SmokeTest { @Test fun smoke() { - val rack = Rack() + val datacenter = Datacenter(SrtfScheduler(), 50) val builder = AdjacencyList.builder() val topology = builder.construct { - add(rack) - val n = 100 + add(datacenter) + + // Add a room to the datacenter + val room = Room().also { + add(it) + connect(datacenter, it, tag = "room") + } + + // Add a rack to the room + val rack = Rack().also { + add(it) + connect(room, it, tag = "rack") + } + + val n = 10 + // Create n machines in the rack repeat(n) { - val machine = Machine() - add(machine) - connect(rack, machine, tag = "machine") + val machine = Machine().also { + add(it) + connect(rack, it, tag = "machine") + } val cpu1 = Cpu(10, 2, 2) val cpu2 = Cpu(5, 3, 2) @@ -56,6 +77,12 @@ internal class SmokeTest { } val simulation = OmegaKernel.create(topology) - simulation.run() + val random = Random(0) + for (i in 0..100) { + val task = Task(i, emptySet(), random.nextInt(10000).toLong()) + simulation.schedule(task, datacenter, delay = random.nextInt(15000).toLong()) + } + simulation.run(50000) + } } diff --git a/opendc-stdlib/build.gradle b/opendc-stdlib/build.gradle index 1013b82d..a0c92459 100644 --- a/opendc-stdlib/build.gradle +++ b/opendc-stdlib/build.gradle @@ -77,6 +77,7 @@ repositories { dependencies { compile project(':opendc-core') + compile "io.github.microutils:kotlin-logging:1.4.6" testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt index cc196a00..5382e48b 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt @@ -54,19 +54,24 @@ class FifoScheduler : Scheduler { return } + val iterator = queue.iterator() + machines - .filter { it.state.status == Machine.Status.IDLE } - .forEach { - while (queue.isNotEmpty()) { - val task = queue.poll() + .forEach { machine -> + while (iterator.hasNext()) { + val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed if (!task.isReady()) { + iterator.remove() submit(task) continue + } else if (task.finished) { + iterator.remove() + continue } - it.send(task) + machine.send(task) break } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt index ce80ddc3..0e94f81a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt @@ -57,8 +57,7 @@ class SrtfScheduler : Scheduler { val iterator = tasks.sortedBy { it.remaining }.iterator() machines - .filter { it.state.status == Machine.Status.IDLE } - .forEach { + .forEach { machine -> while (iterator.hasNext()) { val task = iterator.next() @@ -71,7 +70,7 @@ class SrtfScheduler : Scheduler { continue } - it.send(task) + machine.send(task) break } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index 4136c03c..f8954527 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -24,16 +24,76 @@ package nl.atlarge.opendc.topology.container +import mu.KotlinLogging +import nl.atlarge.opendc.extension.topology.destinations +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.kernel.Process +import nl.atlarge.opendc.kernel.time.Duration +import nl.atlarge.opendc.scheduler.Scheduler import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task +import java.util.* /** * A representation of a facility used to house computer systems and associated components. * + * @property scheduler The tasks scheduler the datacenter uses. + * @property interval The interval at which task will be (re)scheduled. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Datacenter : Entity<Unit> { +class Datacenter(val scheduler: Scheduler, val interval: Duration) : Entity<Unit>, Process<Datacenter> { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + /** * The initial state of the entity. */ override val initialState = Unit + + /** + * This method is invoked to start the simulation an [Entity] associated with this [Process]. + * + * This method is assumed to be running during a simulation, but should hand back control to the simulator at + * some point by suspending the process. This allows other processes to do work in the current tick of the + * simulation. + * Suspending the process can be achieved by calling suspending method in the context: + * - [Context.tick] - Wait for the next tick to occur + * - [Context.wait] - Wait for `n` amount of ticks before resuming execution. + * - [Context.receive] - Wait for a message to be received in the mailbox of the [Entity] before resuming + * execution. + * + * If this method exits early, before the simulation has finished, the entity is assumed to be shutdown and its + * simulation will not run any further. + */ + suspend override fun Context<Datacenter>.run() { + // The queue of messages to be processed after a cycle + val queue: Queue<Any> = ArrayDeque() + // Find all machines in the datacenter + val machines = outgoingEdges.destinations<Room>("room").asSequence() + .flatMap { it.outgoingEdges.destinations<Rack>("rack").asSequence() } + .flatMap { it.outgoingEdges.destinations<Machine>("machine").asSequence() }.toList() + + logger.info { "Initialising datacenter with ${machines.size} machines" } + + // Register all machines to the scheduler + machines.forEach(scheduler::register) + + while (true) { + // Process all messages in the queue + while (queue.isNotEmpty()) { + val msg = queue.poll() + if (msg is Task) { + scheduler.submit(msg) + } + } + // (Re)schedule the tasks + scheduler.run { schedule() } + + // Sleep a time quantum + wait(interval, queue) + } + } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index 163f280f..3305581a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -24,7 +24,8 @@ package nl.atlarge.opendc.topology.machine -import nl.atlarge.opendc.extension.destinations +import mu.KotlinLogging +import nl.atlarge.opendc.extension.topology.destinations import nl.atlarge.opendc.workload.Task import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process @@ -39,6 +40,11 @@ import nl.atlarge.opendc.topology.Entity */ class Machine : Entity<Machine.State>, Process<Machine> { /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** * The status of a machine. */ enum class Status { @@ -63,19 +69,20 @@ class Machine : Entity<Machine.State>, Process<Machine> { val interval: Duration = 10 val cpus = outgoingEdges.destinations<Cpu>("cpu") - val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong() - var task: Task? = null + val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) + var task: Task = receiveTask() + update(State(Status.RUNNING, task)) while (true) { - if (task != null) { - if (task.finished) { - task = null - update(State(Status.IDLE)) - } else { - task.consume(speed * delta) - } + if (task.finished) { + logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } + update(State(Status.IDLE)) + task = receiveTask() + } else { + task.consume(speed * delta) } + // Check if we have received a new order in the meantime. val msg = receive(interval) if (msg is Task) { task = msg @@ -83,4 +90,17 @@ class Machine : Entity<Machine.State>, Process<Machine> { } } } + + /** + * Wait for a [Task] to be received by the [Process] and discard all other messages received in the meantime. + * + * @return The task that has been received. + */ + private suspend fun Context<Machine>.receiveTask(): Task { + while (true) { + val msg = receive() + if (msg is Task) + return msg + } + } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt index 9a8c84e6..bd2b0604 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt @@ -24,6 +24,8 @@ package nl.atlarge.opendc.workload +import nl.atlarge.opendc.kernel.time.Instant + /** * A task represents some computation that is part of a [Job]. * |
