summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt (renamed from opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt)2
-rw-r--r--opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt15
-rw-r--r--opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt34
-rw-r--r--opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt41
-rw-r--r--opendc-stdlib/build.gradle1
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt15
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt5
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt62
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt40
-rw-r--r--opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt2
10 files changed, 187 insertions, 30 deletions
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt
index 83100587..b5b5ec82 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/Node.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/extension/topology/Traversable.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package nl.atlarge.opendc.extension
+package nl.atlarge.opendc.extension.topology
import nl.atlarge.opendc.topology.Edge
diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
index 46cb271e..6185e273 100644
--- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
+++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/Context.kt
@@ -33,6 +33,7 @@ import nl.atlarge.opendc.topology.Entity
import nl.atlarge.opendc.topology.Topology
import nl.atlarge.opendc.topology.TopologyContext
import java.lang.Process
+import java.util.*
/**
* This interface provides a context for simulation [Process]es, which defines the environment in which the simulation
@@ -87,7 +88,7 @@ interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext {
/**
* Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming
- * execution.
+ * execution and drop all messages that are received during this period.
*
* A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend
* the process until the no more messages at an earlier point in time have to be processed.
@@ -97,6 +98,18 @@ interface Context<out E : Entity<*>> : Readable, Writable, TopologyContext {
suspend fun wait(duration: Duration)
/**
+ * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming
+ * execution and push all messages that are received during this period to the given queue.
+ *
+ * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend
+ * the process until the no more messages at an earlier point in time have to be processed.
+ *
+ * @param duration The duration of simulation time to wait before resuming execution.
+ * @param queue The mutable queue to push the messages to.
+ */
+ suspend fun wait(duration: Duration, queue: Queue<Any>)
+
+ /**
* Suspend the [Process] of the [Entity] in simulation for one tick in simulation time which is defined by the
* [Clock].
*
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
index a29f2b08..67b192fb 100644
--- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
+++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
@@ -202,7 +202,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
/**
* The last point in time the process has done some work.
*/
- var last: Instant = 0
+ var last: Instant = -1
/**
* The state of the entity.
@@ -332,7 +332,11 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
* @param duration The duration of simulation time to wait before resuming execution.
*/
suspend override fun wait(duration: Duration) {
- require(duration > 0) { "The amount of time to suspend must be a non-zero positive number" }
+ require(duration >= 0) { "The amount of time to suspend must be a positive number" }
+
+ if (duration == 0.toLong())
+ return
+
schedule(Resume, entity, entity, duration)
while (true) {
@@ -342,6 +346,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
}
/**
+ * Suspend the [Process] of the [Entity] in simulation for the given duration of simulation time before resuming
+ * execution and push all messages that are received during this period to the given queue.
+ *
+ * A call to this method will not make the [Process] sleep for the actual duration of time, but instead suspend
+ * the process until the no more messages at an earlier point in time have to be processed.
+ *
+ * @param duration The duration of simulation time to wait before resuming execution.
+ * @param queue The mutable queue to push the messages to.
+ */
+ suspend override fun wait(duration: Duration, queue: Queue<Any>) {
+ require(duration >= 0) { "The amount of time to suspend must be a positive number" }
+
+ if (duration == 0.toLong())
+ return
+
+ schedule(Resume, entity, entity, duration)
+
+ while (true) {
+ val msg = receive()
+ if (msg is Resume)
+ return
+ queue.add(msg)
+ }
+ }
+
+ /**
* Update the state of the entity being simulated.
*
* <p>Instead of directly mutating the entity, we create a new instance of the entity to prevent other objects
diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
index 4f48f20d..e2eb8269 100644
--- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
+++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
@@ -25,25 +25,46 @@
package nl.atlarge.opendc
import nl.atlarge.opendc.kernel.omega.OmegaKernel
+import nl.atlarge.opendc.scheduler.FifoScheduler
+import nl.atlarge.opendc.scheduler.SrtfScheduler
import nl.atlarge.opendc.topology.AdjacencyList
+import nl.atlarge.opendc.topology.container.Datacenter
import nl.atlarge.opendc.topology.container.Rack
+import nl.atlarge.opendc.topology.container.Room
import nl.atlarge.opendc.topology.machine.Cpu
import nl.atlarge.opendc.topology.machine.Machine
+import nl.atlarge.opendc.workload.Task
import org.junit.jupiter.api.Test
+import java.util.*
internal class SmokeTest {
@Test
fun smoke() {
- val rack = Rack()
+ val datacenter = Datacenter(SrtfScheduler(), 50)
val builder = AdjacencyList.builder()
val topology = builder.construct {
- add(rack)
- val n = 100
+ add(datacenter)
+
+ // Add a room to the datacenter
+ val room = Room().also {
+ add(it)
+ connect(datacenter, it, tag = "room")
+ }
+
+ // Add a rack to the room
+ val rack = Rack().also {
+ add(it)
+ connect(room, it, tag = "rack")
+ }
+
+ val n = 10
+
// Create n machines in the rack
repeat(n) {
- val machine = Machine()
- add(machine)
- connect(rack, machine, tag = "machine")
+ val machine = Machine().also {
+ add(it)
+ connect(rack, it, tag = "machine")
+ }
val cpu1 = Cpu(10, 2, 2)
val cpu2 = Cpu(5, 3, 2)
@@ -56,6 +77,12 @@ internal class SmokeTest {
}
val simulation = OmegaKernel.create(topology)
- simulation.run()
+ val random = Random(0)
+ for (i in 0..100) {
+ val task = Task(i, emptySet(), random.nextInt(10000).toLong())
+ simulation.schedule(task, datacenter, delay = random.nextInt(15000).toLong())
+ }
+ simulation.run(50000)
+
}
}
diff --git a/opendc-stdlib/build.gradle b/opendc-stdlib/build.gradle
index 1013b82d..a0c92459 100644
--- a/opendc-stdlib/build.gradle
+++ b/opendc-stdlib/build.gradle
@@ -77,6 +77,7 @@ repositories {
dependencies {
compile project(':opendc-core')
+ compile "io.github.microutils:kotlin-logging:1.4.6"
testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3"
testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3"
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt
index cc196a00..5382e48b 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/FifoScheduler.kt
@@ -54,19 +54,24 @@ class FifoScheduler : Scheduler {
return
}
+ val iterator = queue.iterator()
+
machines
- .filter { it.state.status == Machine.Status.IDLE }
- .forEach {
- while (queue.isNotEmpty()) {
- val task = queue.poll()
+ .forEach { machine ->
+ while (iterator.hasNext()) {
+ val task = iterator.next()
// TODO What to do with tasks that are not ready yet to be processed
if (!task.isReady()) {
+ iterator.remove()
submit(task)
continue
+ } else if (task.finished) {
+ iterator.remove()
+ continue
}
- it.send(task)
+ machine.send(task)
break
}
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt
index ce80ddc3..0e94f81a 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/scheduler/SrtfScheduler.kt
@@ -57,8 +57,7 @@ class SrtfScheduler : Scheduler {
val iterator = tasks.sortedBy { it.remaining }.iterator()
machines
- .filter { it.state.status == Machine.Status.IDLE }
- .forEach {
+ .forEach { machine ->
while (iterator.hasNext()) {
val task = iterator.next()
@@ -71,7 +70,7 @@ class SrtfScheduler : Scheduler {
continue
}
- it.send(task)
+ machine.send(task)
break
}
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt
index 4136c03c..f8954527 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt
@@ -24,16 +24,76 @@
package nl.atlarge.opendc.topology.container
+import mu.KotlinLogging
+import nl.atlarge.opendc.extension.topology.destinations
+import nl.atlarge.opendc.kernel.Context
+import nl.atlarge.opendc.kernel.Process
+import nl.atlarge.opendc.kernel.time.Duration
+import nl.atlarge.opendc.scheduler.Scheduler
import nl.atlarge.opendc.topology.Entity
+import nl.atlarge.opendc.topology.machine.Machine
+import nl.atlarge.opendc.workload.Task
+import java.util.*
/**
* A representation of a facility used to house computer systems and associated components.
*
+ * @property scheduler The tasks scheduler the datacenter uses.
+ * @property interval The interval at which task will be (re)scheduled.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-class Datacenter : Entity<Unit> {
+class Datacenter(val scheduler: Scheduler, val interval: Duration) : Entity<Unit>, Process<Datacenter> {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
/**
* The initial state of the entity.
*/
override val initialState = Unit
+
+ /**
+ * This method is invoked to start the simulation an [Entity] associated with this [Process].
+ *
+ * This method is assumed to be running during a simulation, but should hand back control to the simulator at
+ * some point by suspending the process. This allows other processes to do work in the current tick of the
+ * simulation.
+ * Suspending the process can be achieved by calling suspending method in the context:
+ * - [Context.tick] - Wait for the next tick to occur
+ * - [Context.wait] - Wait for `n` amount of ticks before resuming execution.
+ * - [Context.receive] - Wait for a message to be received in the mailbox of the [Entity] before resuming
+ * execution.
+ *
+ * If this method exits early, before the simulation has finished, the entity is assumed to be shutdown and its
+ * simulation will not run any further.
+ */
+ suspend override fun Context<Datacenter>.run() {
+ // The queue of messages to be processed after a cycle
+ val queue: Queue<Any> = ArrayDeque()
+ // Find all machines in the datacenter
+ val machines = outgoingEdges.destinations<Room>("room").asSequence()
+ .flatMap { it.outgoingEdges.destinations<Rack>("rack").asSequence() }
+ .flatMap { it.outgoingEdges.destinations<Machine>("machine").asSequence() }.toList()
+
+ logger.info { "Initialising datacenter with ${machines.size} machines" }
+
+ // Register all machines to the scheduler
+ machines.forEach(scheduler::register)
+
+ while (true) {
+ // Process all messages in the queue
+ while (queue.isNotEmpty()) {
+ val msg = queue.poll()
+ if (msg is Task) {
+ scheduler.submit(msg)
+ }
+ }
+ // (Re)schedule the tasks
+ scheduler.run { schedule() }
+
+ // Sleep a time quantum
+ wait(interval, queue)
+ }
+ }
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
index 163f280f..3305581a 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt
@@ -24,7 +24,8 @@
package nl.atlarge.opendc.topology.machine
-import nl.atlarge.opendc.extension.destinations
+import mu.KotlinLogging
+import nl.atlarge.opendc.extension.topology.destinations
import nl.atlarge.opendc.workload.Task
import nl.atlarge.opendc.kernel.Context
import nl.atlarge.opendc.kernel.Process
@@ -39,6 +40,11 @@ import nl.atlarge.opendc.topology.Entity
*/
class Machine : Entity<Machine.State>, Process<Machine> {
/**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The status of a machine.
*/
enum class Status {
@@ -63,19 +69,20 @@ class Machine : Entity<Machine.State>, Process<Machine> {
val interval: Duration = 10
val cpus = outgoingEdges.destinations<Cpu>("cpu")
- val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong()
- var task: Task? = null
+ val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores })
+ var task: Task = receiveTask()
+ update(State(Status.RUNNING, task))
while (true) {
- if (task != null) {
- if (task.finished) {
- task = null
- update(State(Status.IDLE))
- } else {
- task.consume(speed * delta)
- }
+ if (task.finished) {
+ logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" }
+ update(State(Status.IDLE))
+ task = receiveTask()
+ } else {
+ task.consume(speed * delta)
}
+ // Check if we have received a new order in the meantime.
val msg = receive(interval)
if (msg is Task) {
task = msg
@@ -83,4 +90,17 @@ class Machine : Entity<Machine.State>, Process<Machine> {
}
}
}
+
+ /**
+ * Wait for a [Task] to be received by the [Process] and discard all other messages received in the meantime.
+ *
+ * @return The task that has been received.
+ */
+ private suspend fun Context<Machine>.receiveTask(): Task {
+ while (true) {
+ val msg = receive()
+ if (msg is Task)
+ return msg
+ }
+ }
}
diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt
index 9a8c84e6..bd2b0604 100644
--- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt
+++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/workload/Task.kt
@@ -24,6 +24,8 @@
package nl.atlarge.opendc.workload
+import nl.atlarge.opendc.kernel.time.Instant
+
/**
* A task represents some computation that is part of a [Job].
*