diff options
Diffstat (limited to 'opendc-stdlib/src/main/kotlin')
5 files changed, 105 insertions, 19 deletions
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt index cc196a00..5382e48b 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt @@ -54,19 +54,24 @@ class FifoScheduler : Scheduler { return } + val iterator = queue.iterator() + machines - .filter { it.state.status == Machine.Status.IDLE } - .forEach { - while (queue.isNotEmpty()) { - val task = queue.poll() + .forEach { machine -> + while (iterator.hasNext()) { + val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed if (!task.isReady()) { + iterator.remove() submit(task) continue + } else if (task.finished) { + iterator.remove() + continue } - it.send(task) + machine.send(task) break } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt index ce80ddc3..0e94f81a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt @@ -57,8 +57,7 @@ class SrtfScheduler : Scheduler { val iterator = tasks.sortedBy { it.remaining }.iterator() machines - .filter { it.state.status == Machine.Status.IDLE } - .forEach { + .forEach { machine -> while (iterator.hasNext()) { val task = iterator.next() @@ -71,7 +70,7 @@ class SrtfScheduler : Scheduler { continue } - it.send(task) + machine.send(task) break } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index 4136c03c..f8954527 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -24,16 +24,76 @@ package nl.atlarge.opendc.topology.container +import mu.KotlinLogging +import nl.atlarge.opendc.extension.topology.destinations +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.kernel.Process +import nl.atlarge.opendc.kernel.time.Duration +import nl.atlarge.opendc.scheduler.Scheduler import nl.atlarge.opendc.topology.Entity +import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.workload.Task +import java.util.* /** * A representation of a facility used to house computer systems and associated components. * + * @property scheduler The tasks scheduler the datacenter uses. + * @property interval The interval at which task will be (re)scheduled. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Datacenter : Entity<Unit> { +class Datacenter(val scheduler: Scheduler, val interval: Duration) : Entity<Unit>, Process<Datacenter> { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + /** * The initial state of the entity. */ override val initialState = Unit + + /** + * This method is invoked to start the simulation an [Entity] associated with this [Process]. + * + * This method is assumed to be running during a simulation, but should hand back control to the simulator at + * some point by suspending the process. This allows other processes to do work in the current tick of the + * simulation. + * Suspending the process can be achieved by calling suspending method in the context: + * - [Context.tick] - Wait for the next tick to occur + * - [Context.wait] - Wait for `n` amount of ticks before resuming execution. + * - [Context.receive] - Wait for a message to be received in the mailbox of the [Entity] before resuming + * execution. + * + * If this method exits early, before the simulation has finished, the entity is assumed to be shutdown and its + * simulation will not run any further. + */ + suspend override fun Context<Datacenter>.run() { + // The queue of messages to be processed after a cycle + val queue: Queue<Any> = ArrayDeque() + // Find all machines in the datacenter + val machines = outgoingEdges.destinations<Room>("room").asSequence() + .flatMap { it.outgoingEdges.destinations<Rack>("rack").asSequence() } + .flatMap { it.outgoingEdges.destinations<Machine>("machine").asSequence() }.toList() + + logger.info { "Initialising datacenter with ${machines.size} machines" } + + // Register all machines to the scheduler + machines.forEach(scheduler::register) + + while (true) { + // Process all messages in the queue + while (queue.isNotEmpty()) { + val msg = queue.poll() + if (msg is Task) { + scheduler.submit(msg) + } + } + // (Re)schedule the tasks + scheduler.run { schedule() } + + // Sleep a time quantum + wait(interval, queue) + } + } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index 163f280f..3305581a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -24,7 +24,8 @@ package nl.atlarge.opendc.topology.machine -import nl.atlarge.opendc.extension.destinations +import mu.KotlinLogging +import nl.atlarge.opendc.extension.topology.destinations import nl.atlarge.opendc.workload.Task import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process @@ -39,6 +40,11 @@ import nl.atlarge.opendc.topology.Entity */ class Machine : Entity<Machine.State>, Process<Machine> { /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** * The status of a machine. */ enum class Status { @@ -63,19 +69,20 @@ class Machine : Entity<Machine.State>, Process<Machine> { val interval: Duration = 10 val cpus = outgoingEdges.destinations<Cpu>("cpu") - val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong() - var task: Task? = null + val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) + var task: Task = receiveTask() + update(State(Status.RUNNING, task)) while (true) { - if (task != null) { - if (task.finished) { - task = null - update(State(Status.IDLE)) - } else { - task.consume(speed * delta) - } + if (task.finished) { + logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } + update(State(Status.IDLE)) + task = receiveTask() + } else { + task.consume(speed * delta) } + // Check if we have received a new order in the meantime. val msg = receive(interval) if (msg is Task) { task = msg @@ -83,4 +90,17 @@ class Machine : Entity<Machine.State>, Process<Machine> { } } } + + /** + * Wait for a [Task] to be received by the [Process] and discard all other messages received in the meantime. + * + * @return The task that has been received. + */ + private suspend fun Context<Machine>.receiveTask(): Task { + while (true) { + val msg = receive() + if (msg is Task) + return msg + } + } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt index 9a8c84e6..bd2b0604 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt @@ -24,6 +24,8 @@ package nl.atlarge.opendc.workload +import nl.atlarge.opendc.kernel.time.Instant + /** * A task represents some computation that is part of a [Job]. * |
