summaryrefslogtreecommitdiff
path: root/opendc-stdlib/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-stdlib/src/main/kotlin')
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt15
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt5
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt62
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt40
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt2
5 files changed, 105 insertions, 19 deletions
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt
index cc196a00..5382e48b 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt
@@ -54,19 +54,24 @@ class FifoScheduler : Scheduler {
return
}
+ val iterator = queue.iterator()
+
machines
- .filter { it.state.status == Machine.Status.IDLE }
- .forEach {
- while (queue.isNotEmpty()) {
- val task = queue.poll()
+ .forEach { machine ->
+ while (iterator.hasNext()) {
+ val task = iterator.next()
// TODO What to do with tasks that are not ready yet to be processed
if (!task.isReady()) {
+ iterator.remove()
submit(task)
continue
+ } else if (task.finished) {
+ iterator.remove()
+ continue
}
- it.send(task)
+ machine.send(task)
break
}
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt
index ce80ddc3..0e94f81a 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt
@@ -57,8 +57,7 @@ class SrtfScheduler : Scheduler {
val iterator = tasks.sortedBy { it.remaining }.iterator()
machines
- .filter { it.state.status == Machine.Status.IDLE }
- .forEach {
+ .forEach { machine ->
while (iterator.hasNext()) {
val task = iterator.next()
@@ -71,7 +70,7 @@ class SrtfScheduler : Scheduler {
continue
}
- it.send(task)
+ machine.send(task)
break
}
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt
index 4136c03c..f8954527 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt
@@ -24,16 +24,76 @@
package nl.atlarge.opendc.topology.container
+import mu.KotlinLogging
+import nl.atlarge.opendc.extension.topology.destinations
+import nl.atlarge.opendc.kernel.Context
+import nl.atlarge.opendc.kernel.Process
+import nl.atlarge.opendc.kernel.time.Duration
+import nl.atlarge.opendc.scheduler.Scheduler
import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.machine.Machine
+import nl.atlarge.opendc.workload.Task
+import java.util.*
/**
* A representation of a facility used to house computer systems and associated components.
*
+ * @property scheduler The tasks scheduler the datacenter uses.
+ * @property interval The interval at which task will be (re)scheduled.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class Datacenter : Entity<Unit> {
+class Datacenter(val scheduler: Scheduler, val interval: Duration) : Entity<Unit>, Process<Datacenter> {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
/**
* The initial state of the entity.
*/
override val initialState = Unit
+
+ /**
+ * This method is invoked to start the simulation an [Entity] associated with this [Process].
+ *
+ * This method is assumed to be running during a simulation, but should hand back control to the simulator at
+ * some point by suspending the process. This allows other processes to do work in the current tick of the
+ * simulation.
+ * Suspending the process can be achieved by calling suspending method in the context:
+ * - [Context.tick] - Wait for the next tick to occur
+ * - [Context.wait] - Wait for `n` amount of ticks before resuming execution.
+ * - [Context.receive] - Wait for a message to be received in the mailbox of the [Entity] before resuming
+ * execution.
+ *
+ * If this method exits early, before the simulation has finished, the entity is assumed to be shutdown and its
+ * simulation will not run any further.
+ */
+ suspend override fun Context<Datacenter>.run() {
+ // The queue of messages to be processed after a cycle
+ val queue: Queue<Any> = ArrayDeque()
+ // Find all machines in the datacenter
+ val machines = outgoingEdges.destinations<Room>("room").asSequence()
+ .flatMap { it.outgoingEdges.destinations<Rack>("rack").asSequence() }
+ .flatMap { it.outgoingEdges.destinations<Machine>("machine").asSequence() }.toList()
+
+ logger.info { "Initialising datacenter with ${machines.size} machines" }
+
+ // Register all machines to the scheduler
+ machines.forEach(scheduler::register)
+
+ while (true) {
+ // Process all messages in the queue
+ while (queue.isNotEmpty()) {
+ val msg = queue.poll()
+ if (msg is Task) {
+ scheduler.submit(msg)
+ }
+ }
+ // (Re)schedule the tasks
+ scheduler.run { schedule() }
+
+ // Sleep a time quantum
+ wait(interval, queue)
+ }
+ }
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
index 163f280f..3305581a 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
@@ -24,7 +24,8 @@
package nl.atlarge.opendc.topology.machine
-import nl.atlarge.opendc.extension.destinations
+import mu.KotlinLogging
+import nl.atlarge.opendc.extension.topology.destinations
import nl.atlarge.opendc.workload.Task
import nl.atlarge.opendc.kernel.Context
import nl.atlarge.opendc.kernel.Process
@@ -39,6 +40,11 @@ import nl.atlarge.opendc.topology.Entity
*/
class Machine : Entity<Machine.State>, Process<Machine> {
/**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The status of a machine.
*/
enum class Status {
@@ -63,19 +69,20 @@ class Machine : Entity<Machine.State>, Process<Machine> {
val interval: Duration = 10
val cpus = outgoingEdges.destinations<Cpu>("cpu")
- val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong()
- var task: Task? = null
+ val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores })
+ var task: Task = receiveTask()
+ update(State(Status.RUNNING, task))
while (true) {
- if (task != null) {
- if (task.finished) {
- task = null
- update(State(Status.IDLE))
- } else {
- task.consume(speed * delta)
- }
+ if (task.finished) {
+ logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" }
+ update(State(Status.IDLE))
+ task = receiveTask()
+ } else {
+ task.consume(speed * delta)
}
+ // Check if we have received a new order in the meantime.
val msg = receive(interval)
if (msg is Task) {
task = msg
@@ -83,4 +90,17 @@ class Machine : Entity<Machine.State>, Process<Machine> {
}
}
}
+
+ /**
+ * Wait for a [Task] to be received by the [Process] and discard all other messages received in the meantime.
+ *
+ * @return The task that has been received.
+ */
+ private suspend fun Context<Machine>.receiveTask(): Task {
+ while (true) {
+ val msg = receive()
+ if (msg is Task)
+ return msg
+ }
+ }
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt
index 9a8c84e6..bd2b0604 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt
@@ -24,6 +24,8 @@
package nl.atlarge.opendc.workload
+import nl.atlarge.opendc.kernel.time.Instant
+
/**
* A task represents some computation that is part of a [Job].
*