diff options
14 files changed, 284 insertions, 173 deletions
diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/Jpa.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/Jpa.kt new file mode 100644 index 00000000..cbbe280a --- /dev/null +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/Jpa.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.integration.jpa + +import javax.persistence.EntityManager + +/** + * Run the given block in a transaction, committing on return of the block. + * + * @param block The block to execute in the transaction. + */ +inline fun EntityManager.transaction(block: () -> Unit) { + transaction.begin() + block() + transaction.commit() +} diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt index 500c7e99..83a98cfb 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/integration/jpa/schema/Task.kt @@ -24,11 +24,9 @@ package nl.atlarge.opendc.integration.jpa.schema -import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.platform.workload.Task -import nl.atlarge.opendc.topology.container.Datacenter -import nl.atlarge.opendc.topology.machine.Machine +import nl.atlarge.opendc.platform.workload.TaskState import javax.persistence.* /** @@ -52,90 +50,68 @@ data class Task( /** * The dependencies of the task. */ - override val dependencies: Set<Task> - get() { - if (_dependencies != null) - return _dependencies!! - _dependencies = dependency?.let(::setOf) ?: emptySet() - return _dependencies!! - } - - /** - * The dependencies set cache. - */ - private var _dependencies: Set<Task>? = null - - /** - * The remaining amount of flops to compute. - */ - override var remaining: Long - get() { - if (_remaining == null) - _remaining = flops - return _remaining!! - } - private set(value) { _remaining = value } - private var _remaining: Long? = -1 + override lateinit var dependencies: Set<Task> + private set /** - * A flag to indicate the task has been accepted by the datacenter. + * The remaining flops for this task. */ - override var accepted: Boolean = false + override var remaining: Long = 0 private set /** - * A flag to indicate the task has been started. + * A flag to indicate whether the task has finished. */ - override var started: Boolean = false + override var finished: Boolean = false private set /** - * A flag to indicate whether the task is finished. + * The state of the task. */ - override var finished: Boolean = false + override lateinit var state: TaskState private set /** - * Accept the task into the scheduling queue. + * This method initialises the task object after it has been created by the JPA implementation. We use this + * initialisation method because JPA implementations only call the default constructor */ - override fun Context<Datacenter>.accept() { - accepted = true + @PostLoad + internal fun init() { + remaining = flops + dependencies = dependency?.let(::setOf) ?: emptySet() + state = TaskState.Underway } /** - * Start a task. + * This method is invoked when a task has arrived at a datacenter. + * + * @param time The moment in time the task has arrived at the datacenter. */ - override fun Context<Machine>.start() { - started = true + override fun arrive(time: Instant) { + if (state !is TaskState.Underway) { + throw IllegalStateException("The task has already been submitted to a datacenter") + } + remaining = flops + state = TaskState.Queued(time) } /** * Consume the given amount of flops of this task. * + * @param time The current moment in time of the consumption. * @param flops The total amount of flops to consume. */ - override fun Context<Machine>.consume(flops: Long) { - if (finished) + override fun consume(time: Instant, flops: Long) { + if (state is TaskState.Queued) { + state = TaskState.Running(state as TaskState.Queued, time) + } else if (finished) { return - if (remaining <= flops) { + } + remaining -= flops + if (remaining <= 0) { remaining = 0 - } else { - remaining -= flops + finished = true + state = TaskState.Finished(state as TaskState.Running, time) } } - - /** - * Finalise the task. - */ - override fun Context<Machine>.finalize() { - finished = true - } - - /** - * Reset the task. - */ - internal fun reset() { - remaining = flops - finished = false - } } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt index f5a86e64..59b28d0b 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperiment.kt @@ -27,15 +27,19 @@ package nl.atlarge.opendc.platform import mu.KotlinLogging import nl.atlarge.opendc.integration.jpa.schema.ExperimentState import nl.atlarge.opendc.integration.jpa.schema.MachineState -import nl.atlarge.opendc.integration.jpa.schema.TaskState +import nl.atlarge.opendc.integration.jpa.transaction +import nl.atlarge.opendc.integration.jpa.schema.Trace as InternalTrace +import nl.atlarge.opendc.integration.jpa.schema.TaskState as InternalTaskState import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment import nl.atlarge.opendc.integration.jpa.schema.Task as InternalTask import nl.atlarge.opendc.kernel.Kernel +import nl.atlarge.opendc.platform.workload.TaskState import nl.atlarge.opendc.topology.JpaTopologyFactory import nl.atlarge.opendc.topology.container.Rack import nl.atlarge.opendc.topology.container.Room import nl.atlarge.opendc.topology.destinations import nl.atlarge.opendc.topology.machine.Machine +import java.util.* import javax.persistence.EntityManager /** @@ -45,8 +49,8 @@ import javax.persistence.EntityManager * @property experiment The internal experiment definition to use. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class JpaExperiment(val manager: EntityManager, - private val experiment: InternalExperiment): Experiment<Unit> { +class JpaExperiment(private val manager: EntityManager, + private val experiment: InternalExperiment): Experiment<Unit>, AutoCloseable { /** * The logging instance. */ @@ -64,11 +68,10 @@ class JpaExperiment(val manager: EntityManager, } // Set the simulation state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.SIMULATING - transaction.commit() } + val section = experiment.path.sections.first() // Important: initialise the scheduler of the datacenter @@ -77,18 +80,16 @@ class JpaExperiment(val manager: EntityManager, val topology = JpaTopologyFactory(section).create() val simulation = kernel.create(topology) val trace = experiment.trace + val tasks = trace.jobs.flatMap { it.tasks } - logger.info { "Sending trace to kernel" } + logger.info { "Sending trace to kernel ${Objects.hashCode(trace)} ${(trace as InternalTrace).id}" } // Schedule all messages in the trace - trace.jobs.forEach { job -> - job.tasks.forEach { task -> - if (task is InternalTask) { - task.reset() - simulation.schedule(task, section.datacenter, delay = task.startTime) - } else { - logger.warn { "Dropped invalid task $task" } - } + tasks.forEach { task -> + if (task is InternalTask) { + simulation.schedule(task, section.datacenter, delay = task.startTime) + } else { + logger.warn { "Dropped invalid task $task" } } } @@ -103,48 +104,73 @@ class JpaExperiment(val manager: EntityManager, while (trace.jobs.any { !it.finished }) { // Collect data of simulation cycle - manager.transaction.begin() - machines.forEach { machine -> - val state = simulation.run { machine.state } - val wrapped = MachineState(0, - machine as nl.atlarge.opendc.integration.jpa.schema.Machine, - state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, - experiment, - simulation.clock.now, - state.temperature, - state.memory, - state.load - ) - manager.persist(wrapped) - } - - trace.jobs.asSequence() - .flatMap { it.tasks.asSequence() } - .forEach { task -> - val state = TaskState(0, - task as nl.atlarge.opendc.integration.jpa.schema.Task, + manager.transaction { + experiment.last = simulation.clock.now + + machines.forEach { machine -> + val state = simulation.run { machine.state } + val wrapped = MachineState(0, + machine as nl.atlarge.opendc.integration.jpa.schema.Machine, + state.task as nl.atlarge.opendc.integration.jpa.schema.Task?, experiment, simulation.clock.now, - task.remaining.toInt(), - 1 + state.temperature, + state.memory, + state.load ) - manager.persist(state) + manager.persist(wrapped) } - manager.transaction.commit() + + trace.jobs.asSequence() + .flatMap { it.tasks.asSequence() } + .forEach { task -> + val state = InternalTaskState(0, + task as nl.atlarge.opendc.integration.jpa.schema.Task, + experiment, + simulation.clock.now, + task.remaining.toInt(), + 1 + ) + manager.persist(state) + } + } // Run next simulation cycle simulation.run(simulation.clock.now + 1) - experiment.last = simulation.clock.now } // Set the experiment state - manager.run { - transaction.begin() + manager.transaction { experiment.state = ExperimentState.FINISHED - transaction.commit() } logger.info { "Simulation done" } - manager.close() + val waiting: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.previous.at - finished.previous.previous.at) + } / tasks.size + + val execution: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.at) + } / tasks.size + + val turnaround: Long = tasks.fold(0.toLong()) { acc, task -> + val finished = task.state as TaskState.Finished + acc + (finished.at - finished.previous.previous.at) + } / tasks.size + + logger.info { "Average waiting time: $waiting seconds" } + logger.info { "Average execution time: $execution seconds" } + logger.info { "Average turnaround time: $turnaround seconds" } } + + /** + * Closes this resource, relinquishing any underlying resources. + * This method is invoked automatically on objects managed by the + * `try`-with-resources statement. + * + * @throws Exception if this resource cannot be closed + */ + override fun close() = manager.close() } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt index ce69f84b..1d1e118d 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaExperimentManager.kt @@ -24,6 +24,7 @@ package nl.atlarge.opendc.platform +import nl.atlarge.opendc.integration.jpa.transaction import nl.atlarge.opendc.integration.jpa.schema.Experiment as InternalExperiment import nl.atlarge.opendc.integration.jpa.schema.ExperimentState import javax.persistence.EntityManager @@ -36,11 +37,11 @@ import javax.persistence.EntityManagerFactory * from. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -class JpaExperimentManager(private val factory: EntityManagerFactory) { +class JpaExperimentManager(private val factory: EntityManagerFactory): AutoCloseable { /** * The entity manager for this experiment. */ - private val manager: EntityManager = factory.createEntityManager() + private var manager: EntityManager = factory.createEntityManager() /** * The amount of experiments in the queue. This property makes a call to the database and does therefore not @@ -60,21 +61,33 @@ class JpaExperimentManager(private val factory: EntityManagerFactory) { * @return The experiment that has been polled from the database or `null` if there are no experiments in the * queue. */ - fun poll(): Experiment<Unit>? { - manager.transaction.begin() - var experiment: InternalExperiment? = null - val results = manager.createQuery("SELECT e FROM experiments e WHERE e.state = :s", + fun poll(): JpaExperiment? { + var result: JpaExperiment? = null + manager.transaction { + var experiment: InternalExperiment? = null + val results = manager.createQuery("SELECT e FROM experiments e WHERE e.state = :s", InternalExperiment::class.java) - .setParameter("s", ExperimentState.QUEUED) - .setMaxResults(1) - .resultList + .setParameter("s", ExperimentState.QUEUED) + .setMaxResults(1) + .resultList - if (!results.isEmpty()) { - experiment = results.first() - experiment!!.state = ExperimentState.CLAIMED + if (!results.isEmpty()) { + experiment = results.first() + experiment!!.state = ExperimentState.CLAIMED + } + result = experiment?.let { JpaExperiment(manager, it) } } - manager.transaction.commit() - return experiment?.let { JpaExperiment(factory.createEntityManager(), it) } + manager = factory.createEntityManager() + return result } + + /** + * Close this resource, relinquishing any underlying resources. + * This method is invoked automatically on objects managed by the + * `try`-with-resources statement.* + * + * @throws Exception if this resource cannot be closed + */ + override fun close() = manager.close() } diff --git a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt index 6a1d839d..794ca44e 100644 --- a/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt +++ b/opendc-integration-jpa/src/main/kotlin/nl/atlarge/opendc/platform/JpaPlatformRunner.kt @@ -45,17 +45,17 @@ fun main(args: Array<String>) { properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: "" val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties) - val threads = 1 + val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) + val kernel = OmegaKernel logger.info { "Waiting for enqueued experiments..." } while (true) { - val experiment = experiments.poll() - executorService.submit { - experiment?.run { - logger.info { "Found experiment. Running simulating now..." } - run(OmegaKernel) + experiments.poll()?.let { experiment -> + logger.info { "Found experiment. Submitting for simulation now..." } + executorService.submit { + experiment.use { it.run(kernel) } } } diff --git a/opendc-integration-jpa/src/main/resources/jpa/schema.xml b/opendc-integration-jpa/src/main/resources/jpa/schema.xml index a14fb96d..bd6ea7a1 100644 --- a/opendc-integration-jpa/src/main/resources/jpa/schema.xml +++ b/opendc-integration-jpa/src/main/resources/jpa/schema.xml @@ -85,7 +85,7 @@ </attributes> </entity> - <entity class="Trace" access="FIELD" name="traces"> + <entity class="Trace" access="FIELD" name="traces" cacheable="false"> <attributes> <id name="id" /> <basic name="name"> @@ -97,17 +97,17 @@ </attributes> </entity> - <entity class="Job" access="FIELD" name="jobs"> + <entity class="Job" access="FIELD" name="jobs" cacheable="false"> <attributes> <id name="id" /> - <one-to-many name="tasks" target-entity="nl.atlarge.opendc.integration.jpa.schema.Task"> + <one-to-many name="tasks" target-entity="Task"> <join-column name="job_id" /> </one-to-many> <transient name="owner" /> </attributes> </entity> - <entity class="Task" access="FIELD" name="tasks"> + <entity class="Task" access="FIELD" name="tasks" cacheable="false"> <convert converter="nl.atlarge.opendc.integration.jpa.converter.ParallelizableConverter" attribute-name="parallelizable" /> <attributes> <id name="id" /> @@ -121,13 +121,12 @@ <column name="parallelizability" column-definition="text" /> </basic> - <one-to-one name="dependency" target-entity="nl.atlarge.opendc.integration.jpa.schema.Task"> + <one-to-one name="dependency" target-entity="Task"> <join-column name="task_dependency_id" /> </one-to-one> - <transient name="_dependencies" /> - <transient name="_remaining" /> - <transient name="accepted" /> - <transient name="started" /> + <transient name="dependencies" /> + <transient name="state" /> + <transient name="remaining" /> <transient name="finished" /> </attributes> </entity> diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt index 1c3dc869..c45ed5e6 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/FifoScheduler.kt @@ -25,6 +25,7 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Job import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine import nl.atlarge.opendc.platform.workload.Task @@ -69,7 +70,7 @@ class FifoScheduler : Scheduler { val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed - if (!task.isReady()) { + if (!task.ready) { iterator.remove() rescheduled.add(task) continue @@ -85,7 +86,7 @@ class FifoScheduler : Scheduler { // Reschedule all tasks that are not ready yet while (!rescheduled.isEmpty()) { - submit(rescheduled.poll()) + queue.add(rescheduled.poll()) } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt index bf988802..578bef9c 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/Scheduler.kt @@ -25,9 +25,9 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task /** * A task scheduler that is coupled to an [Entity] in the topology of the cloud network. diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt index b2660964..03f37b50 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/scheduler/SrtfScheduler.kt @@ -25,9 +25,9 @@ package nl.atlarge.opendc.platform.scheduler import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -67,8 +67,8 @@ class SrtfScheduler : Scheduler { val task = iterator.next() // TODO What to do with tasks that are not ready yet to be processed - if (!task.isReady()) { - submit(task) + if (!task.ready) { + tasks.add(task) continue } else if (task.finished) { tasks.remove(task) diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt index eb173229..df2f2b6a 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Job.kt @@ -25,7 +25,7 @@ package nl.atlarge.opendc.platform.workload /** - * A job that is submitted to a cloud network, which consists of one or multiple [Task]s. + * A bag of tasks which are submitted by a [User] to the cloud network. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt index 140a5d5d..e740cdd8 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/Task.kt @@ -24,12 +24,11 @@ package nl.atlarge.opendc.platform.workload -import nl.atlarge.opendc.kernel.Context -import nl.atlarge.opendc.topology.container.Datacenter +import nl.atlarge.opendc.kernel.time.Instant import nl.atlarge.opendc.topology.machine.Machine /** - * A task represents some computation that is part of a [Job]. + * A task that runs as part of a [Job] on a [Machine]. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ @@ -55,51 +54,39 @@ interface Task { val parallelizable: Boolean /** - * The remaining amount of flops to compute. + * The remaining flops for this task. */ val remaining: Long /** - * A flag to indicate the task has been accepted by the datacenter. + * The state of the task. */ - val accepted: Boolean + val state: TaskState /** - * A flag to indicate the task has been started. + * A flag to indicate whether the task is ready to be started. */ - val started: Boolean + val ready: Boolean + get() = !dependencies.any { !it.finished } /** - * A flag to indicate whether the task is finished. + * A flag to indicate whether the task has finished. */ val finished: Boolean + get() = state is TaskState.Finished /** - * Determine whether the task is ready to be processed. + * This method is invoked when a task has arrived at a datacenter. * - * @return `true` if the task is ready to be processed, `false` otherwise. + * @param time The moment in time the task has arrived at the datacenter. */ - fun isReady() = dependencies.all { it.finished } - - /** - * Accept the task into the scheduling queue. - */ - fun Context<Datacenter>.accept() - - /** - * Start a task. - */ - fun Context<Machine>.start() + fun arrive(time: Instant) /** * Consume the given amount of flops of this task. * + * @param time The current moment in time of the consumption. * @param flops The total amount of flops to consume. */ - fun Context<Machine>.consume(flops: Long) - - /** - * Finalise the task. - */ - fun Context<Machine>.finalize() + fun consume(time: Instant, flops: Long) } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt new file mode 100644 index 00000000..d1f908af --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/platform/workload/TaskState.kt @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nl.atlarge.opendc.platform.workload + +import nl.atlarge.opendc.kernel.time.Instant + + +/** + * This class hierarchy describes the states of a [Task]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +sealed class TaskState { + /** + * A state to indicate the task has not yet arrived at the [Datacenter]. + */ + object Underway : TaskState() + + /** + * A state to indicate the task has arrived at the [Datacenter]. + * + * @property at The moment in time the task has arrived. + */ + data class Queued(val at: Instant) : TaskState() + + /** + * A state to indicate the task has started running on a machine. + * + * @property previous The previous state of the task. + * @property at The moment in time the task started. + */ + data class Running(val previous: Queued, val at: Instant) : TaskState() + + /** + * A state to indicate the task has finished. + * + * @property previous The previous state of the task. + * @property at The moment in time the task finished. + */ + data class Finished(val previous: Running, val at: Instant) : TaskState() + + /** + * A state to indicate the task has failed. + * + * @property previous The previous state of the task. + * @property at The moment in time the task failed. + * @property reason The reason of the failure. + */ + data class Failed(val previous: Running, val at: Instant, val reason: String) : TaskState() +} diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt index 333609bb..b3d67568 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/container/Datacenter.kt @@ -30,9 +30,10 @@ import nl.atlarge.opendc.kernel.Context import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.kernel.time.Duration import nl.atlarge.opendc.platform.scheduler.Scheduler +import nl.atlarge.opendc.platform.workload.Job +import nl.atlarge.opendc.platform.workload.Task import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine -import nl.atlarge.opendc.platform.workload.Task import java.util.* /** @@ -88,6 +89,7 @@ interface Datacenter : Entity<Unit>, Process<Datacenter> { while (queue.isNotEmpty()) { val msg = queue.poll() if (msg is Task) { + msg.arrive(time) scheduler.submit(msg) } } diff --git a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt index b1d881a1..761f14b1 100644 --- a/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt +++ b/opendc-stdlib/src/main/kotlin/nl/atlarge/opendc/topology/machine/Machine.kt @@ -83,16 +83,14 @@ open class Machine : Entity<Machine.State>, Process<Machine> { var task: Task = receiveTask() update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) - task.run { start() } while (true) { - if (task.remaining <= 0) { - task.run { finalize() } + if (task.finished) { logger.info { "${entity.id}: Task ${task.id} finished. Machine idle at $time" } update(State(Status.IDLE)) task = receiveTask() } else { - task.run { consume(speed * delta) } + task.consume(time, speed * delta) } // Check if we have received a new order in the meantime. @@ -100,7 +98,6 @@ open class Machine : Entity<Machine.State>, Process<Machine> { if (msg is Task) { task = msg update(State(Status.RUNNING, task, load = 1.0, memory = state.memory + 50, temperature = 30.0)) - task.run { start() } } } } |
