From a031db367c71ec1604b34f3765198c2196bfe551 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 21 Sep 2017 23:25:44 +0200 Subject: Create simple datacenter experiment --- .../nl/atlarge/opendc/topology/machine/Machine.kt | 40 ++++++++++++++++------ 1 file changed, 30 insertions(+), 10 deletions(-) (limited to 'opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt') diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index 163f280f..3305581a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -24,7 +24,8 @@ package nl.atlarge.opendc.topology.machine -import nl.atlarge.opendc.extension.destinations +import mu.KotlinLogging +import nl.atlarge.opendc.extension.topology.destinations import nl.atlarge.opendc.workload.Task import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process @@ -38,6 +39,11 @@ import nl.atlarge.opendc.topology.Entity * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ class Machine : Entity, Process { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + /** * The status of a machine. */ @@ -63,19 +69,20 @@ class Machine : Entity, Process { val interval: Duration = 10 val cpus = outgoingEdges.destinations("cpu") - val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }).toLong() - var task: Task? = null + val speed = cpus.fold(0, { acc, (speed, cores) -> acc + speed * cores }) + var task: Task = receiveTask() + update(State(Status.RUNNING, task)) while (true) { - if (task != null) { - if (task.finished) { - task = null - update(State(Status.IDLE)) - } else { - task.consume(speed * delta) - } + if (task.finished) { + logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } + update(State(Status.IDLE)) + task = receiveTask() + } else { + task.consume(speed * delta) } + // Check if we have received a new order in the meantime. val msg = receive(interval) if (msg is Task) { task = msg @@ -83,4 +90,17 @@ class Machine : Entity, Process { } } } + + /** + * Wait for a [Task] to be received by the [Process] and discard all other messages received in the meantime. + * + * @return The task that has been received. + */ + private suspend fun Context.receiveTask(): Task { + while (true) { + val msg = receive() + if (msg is Task) + return msg + } + } } -- cgit v1.2.3