diff options
Diffstat (limited to 'opendc-stdlib/src/main/kotlin')
15 files changed, 196 insertions, 89 deletions
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt index 5382e48b..1c3dc869 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt @@ -22,12 +22,12 @@ * SOFTWARE. */ -package nl.atlarge.opendc.scheduler +package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.workload.Task +import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -37,6 +37,11 @@ import java.util.* */ class FifoScheduler : Scheduler { /** + * The name of this scheduler. + */ + override val name: String = "FIFO" + + /** * The set of machines the scheduler knows of. */ val machines: MutableSet<Machine> = HashSet() @@ -54,6 +59,8 @@ class FifoScheduler : Scheduler { return } + // The tasks that need to be rescheduled + val rescheduled = ArrayDeque<Task>() val iterator = queue.iterator() machines @@ -64,7 +71,7 @@ class FifoScheduler : Scheduler { // TODO What to do with tasks that are not ready yet to be processed if (!task.isReady()) { iterator.remove() - submit(task) + rescheduled.add(task) continue } else if (task.finished) { iterator.remove() @@ -75,6 +82,11 @@ class FifoScheduler : Scheduler { break } } + + // Reschedule all tasks that are not ready yet + while (!rescheduled.isEmpty()) { + submit(rescheduled.poll()) + } } /** diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/Scheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt index 8c96341a..bf988802 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/Scheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt @@ -22,12 +22,12 @@ * SOFTWARE. */ -package nl.atlarge.opendc.scheduler +package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.workload.Task +import nl.atlarge.opendc.platform.workload.Task /** * A task scheduler that is coupled to an [Entity] in the topology of the cloud network. @@ -36,6 +36,11 @@ import nl.atlarge.opendc.workload.Task */ interface Scheduler { /** + * The name of this scheduler. + */ + val name: String + + /** * (Re)schedule the tasks submitted to the scheduler over the specified set of machines. * * This method should be invoked at some interval to allow the scheduler to reschedule existing tasks and schedule diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt index 0e94f81a..b2660964 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt @@ -22,12 +22,12 @@ * SOFTWARE. */ -package nl.atlarge.opendc.scheduler +package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.workload.Task +import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -37,6 +37,11 @@ import java.util.* */ class SrtfScheduler : Scheduler { /** + * The name of this scheduler. + */ + override val name: String = "SRTF" + + /** * The set of machines the scheduler knows of. */ val machines: MutableSet<Machine> = HashSet() diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Job.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt index 6dbbf327..eb173229 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Job.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt @@ -22,15 +22,32 @@ * SOFTWARE. */ -package nl.atlarge.opendc.workload +package nl.atlarge.opendc.platform.workload /** * A job that is submitted to a cloud network, which consists of one or multiple [Task]s. * - * @param id The unique identifier of this job. - * @param name The name of this job. - * @param owner The user to which the job belongs. - * @param tasks The tasks of which the job consists. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -data class Job(val id: Int, val name: String, val owner: User, val tasks: Collection<Task>) +interface Job { + /** + * A unique identifier of the job. + */ + val id: Int + + /** + * The owner of this job. + */ + val owner: User + + /** + * The tasks this job consists of. + */ + val tasks: Set<Task> + + /** + * A flag to indicate the job has finished. + */ + val finished: Boolean + get() = !tasks.any { !it.finished } +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt index bd2b0604..140a5d5d 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt @@ -22,31 +22,57 @@ * SOFTWARE. */ -package nl.atlarge.opendc.workload +package nl.atlarge.opendc.platform.workload -import nl.atlarge.opendc.kernel.time.Instant +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.topology.container.Datacenter +import nl.atlarge.opendc.topology.machine.Machine /** * A task represents some computation that is part of a [Job]. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -data class Task( - val id: Int, - val dependencies: Set<Task>, +interface Task { + /** + * The unique identifier of the task. + */ + val id: Int + + /** + * The amount of flops for this task. + */ val flops: Long -) { + + /** + * The dependencies of the task. + */ + val dependencies: Set<Task> + + /** + * A flag to indicate the task is parallelizable. + */ + val parallelizable: Boolean + /** * The remaining amount of flops to compute. */ - var remaining: Long = flops - private set + val remaining: Long + + /** + * A flag to indicate the task has been accepted by the datacenter. + */ + val accepted: Boolean + + /** + * A flag to indicate the task has been started. + */ + val started: Boolean /** * A flag to indicate whether the task is finished. */ - var finished: Boolean = false - private set + val finished: Boolean /** * Determine whether the task is ready to be processed. @@ -56,18 +82,24 @@ data class Task( fun isReady() = dependencies.all { it.finished } /** + * Accept the task into the scheduling queue. + */ + fun Context<Datacenter>.accept() + + /** + * Start a task. + */ + fun Context<Machine>.start() + + /** * Consume the given amount of flops of this task. * * @param flops The total amount of flops to consume. */ - fun consume(flops: Long) { - if (finished) - return - if (remaining <= flops) { - finished = true - remaining = 0 - } else { - remaining -= flops - } - } + fun Context<Machine>.consume(flops: Long) + + /** + * Finalise the task. + */ + fun Context<Machine>.finalize() } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Trace.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Trace.kt new file mode 100644 index 00000000..6dd2efb8 --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Trace.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.platform.workload + +/** + * A timestamped sequence of jobs received in a cloud network. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Trace { + /** + * The [Job]s in the trace. + */ + val jobs: Set<Job> +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/User.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/User.kt index d0e90851..d827fee5 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/User.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/User.kt @@ -22,19 +22,24 @@ * SOFTWARE. */ -package nl.atlarge.opendc.workload - -import nl.atlarge.opendc.topology.Topology +package nl.atlarge.opendc.platform.workload /** * A user of a cloud network that provides [Job]s for the simulation. * - * Each [User] in a simulation has its own logical view of the cloud network which is used to route its jobs in the + * Each user in a simulation has its own logical view of the cloud network which is used to route its jobs in the * physical network. * - * @param id The unique identifier of the user. - * @param name The name of the user. - * @param view The view of the user on the topology. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -data class User(val id: Int, val name: String, val view: Topology) +interface User { + /** + * The unique identifier of the user. + */ + val id: Int + + /** + * The name of this user. + */ + val name: String +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index f8954527..333609bb 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -25,14 +25,14 @@ package nl.atlarge.opendc.topology.container import mu.KotlinLogging -import nl.atlarge.opendc.extension.topology.destinations +import nl.atlarge.opendc.topology.destinations import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.kernel.time.Duration -import nl.atlarge.opendc.scheduler.Scheduler +import nl.atlarge.opendc.platform.scheduler.Scheduler import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.workload.Task +import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -42,16 +42,16 @@ import java.util.* * @property interval The interval at which task will be (re)scheduled. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Datacenter(val scheduler: Scheduler, val interval: Duration) : Entity<Unit>, Process<Datacenter> { +interface Datacenter : Entity<Unit>, Process<Datacenter> { /** - * The logger instance to use for the simulator. + * The task scheduler the datacenter uses. */ - private val logger = KotlinLogging.logger {} + val scheduler: Scheduler /** - * The initial state of the entity. + * The interval at which task will be (re)scheduled. */ - override val initialState = Unit + val interval: Duration /** * This method is invoked to start the simulation an [Entity] associated with this [Process]. @@ -69,6 +69,8 @@ class Datacenter(val scheduler: Scheduler, val interval: Duration) : Entity<Unit * simulation will not run any further. */ suspend override fun Context<Datacenter>.run() { + val logger = KotlinLogging.logger {} + // The queue of messages to be processed after a cycle val queue: Queue<Any> = ArrayDeque() // Find all machines in the datacenter diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Rack.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Rack.kt index 27207d4c..25429f71 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Rack.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Rack.kt @@ -32,6 +32,4 @@ import nl.atlarge.opendc.topology.Entity * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Rack : Entity<Unit> { - override val initialState = Unit -} +interface Rack : Entity<Unit> diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt index 109a5629..3b338899 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Room.kt @@ -31,9 +31,4 @@ import nl.atlarge.opendc.topology.Entity * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Room : Entity<Unit> { - /** - * The initial state of the entity. - */ - override val initialState = Unit -} +interface Room : Entity<Unit> diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Cpu.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Cpu.kt index 78e2eaaa..f97e73e9 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Cpu.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Cpu.kt @@ -29,10 +29,4 @@ package nl.atlarge.opendc.topology.machine * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -data class Cpu( - override val speed: Int, - override val cores: Int, - override val energyConsumption: Int -) : ProcessingUnit { - override val initialState = Unit -} +interface Cpu : ProcessingUnit diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Gpu.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Gpu.kt index 09179c94..15c5263f 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Gpu.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Gpu.kt @@ -29,11 +29,5 @@ package nl.atlarge.opendc.topology.machine * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Gpu( - override val speed: Int, - override val cores: Int, - override val energyConsumption: Int -) : ProcessingUnit { - override val initialState = Unit -} +interface Gpu : ProcessingUnit diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index 3305581a..4338ae04 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -25,8 +25,8 @@ package nl.atlarge.opendc.topology.machine import mu.KotlinLogging -import nl.atlarge.opendc.extension.topology.destinations -import nl.atlarge.opendc.workload.Task +import nl.atlarge.opendc.topology.destinations +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.kernel.time.Duration @@ -38,7 +38,7 @@ import nl.atlarge.opendc.topology.Entity * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class Machine : Entity<Machine.State>, Process<Machine> { +open class Machine : Entity<Machine.State>, Process<Machine> { /** * The logger instance to use for the simulator. */ @@ -53,8 +53,18 @@ class Machine : Entity<Machine.State>, Process<Machine> { /** * The shape of the state of a [Machine] entity. + * + * @property status The status of the machine. + * @property task The task assign to the machine. + * @property memory The memory usage of the machine (defaults to 50mb for the kernel) + * @property load The load on the machine (defaults to 0.0) + * @property temperature The temperature of the machine (defaults to 23 degrees Celcius) */ - data class State(val status: Status, val task: Task? = null) + data class State(val status: Status, + val task: Task? = null, + val memory: Int = 50, + val load: Double = 0.0, + val temperature: Double = 23.0) /** * The initial state of a [Machine] entity. @@ -69,24 +79,28 @@ class Machine : Entity<Machine.State>, Process<Machine> { val interval: Duration = 10 val cpus = outgoingEdges.destinations<Cpu>("cpu") - val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) + val speed = cpus.fold(0, { acc, cpu -> acc + cpu.clockRate * cpu.cores }) / 10 + var task: Task = receiveTask() - update(State(Status.RUNNING, task)) + update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) + task.run { start() } while (true) { - if (task.finished) { + if (task.remaining <= 0) { + task.run { finalize() } logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } update(State(Status.IDLE)) task = receiveTask() } else { - task.consume(speed * delta) + task.run { consume(speed * delta) } } // Check if we have received a new order in the meantime. val msg = receive(interval) if (msg is Task) { task = msg - update(State(Status.RUNNING, task)) + update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) + task.run { start() } } } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/ProcessingUnit.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/ProcessingUnit.kt index 31bfbcd6..abc8608b 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/ProcessingUnit.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/ProcessingUnit.kt @@ -33,9 +33,9 @@ import nl.atlarge.opendc.topology.Entity */ interface ProcessingUnit : Entity<Unit> { /** - * The speed of this [ProcessingUnit] per core. + * The speed of this [ProcessingUnit] per core in MHz. */ - val speed: Int + val clockRate: Int /** * The amount of cores within this [ProcessingUnit]. @@ -43,7 +43,7 @@ interface ProcessingUnit : Entity<Unit> { val cores: Int /** - * The energy consumption of this [ProcessingUnit] in Kj/s. + * The energy consumption of this [ProcessingUnit] in Watt. */ - val energyConsumption: Int + val energyConsumption: Double } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/power/PowerUnit.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/power/PowerUnit.kt index aac4ce03..b9602e55 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/power/PowerUnit.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/power/PowerUnit.kt @@ -29,9 +29,6 @@ import nl.atlarge.opendc.topology.Entity /** * An [Entity] which provides power for other entities a cloud network to run. * - * @param output The power output of the power unit in Watt. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class PowerUnit(val output: Double) : Entity<Unit> { - override val initialState = Unit -} +interface PowerUnit : Entity<Unit> |
