From 92e858e398bf69380dbacebc042dde2bfa8cfe9c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 14 Feb 2020 12:43:29 +0100 Subject: refactor: Integrate opendc-compute in existing model This change refactors the existing model to use the new interfaces from the opendc-compute module. --- .../opendc/experiments/tpds/TestExperiment.kt | 130 +++++++++------------ 1 file changed, 53 insertions(+), 77 deletions(-) (limited to 'opendc/opendc-experiments-tpds/src/main') diff --git a/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt index ffd1604e..3dcea99d 100644 --- a/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt +++ b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt @@ -24,23 +24,13 @@ package com.atlarge.opendc.experiments.tpds -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.core.Broker -import com.atlarge.opendc.core.Model -import com.atlarge.opendc.core.PlatformMessage -import com.atlarge.opendc.core.find -import com.atlarge.opendc.core.services.provisioning.SimpleProvisioningService -import com.atlarge.opendc.core.services.resources.ResourceManagementService -import com.atlarge.opendc.core.zones +import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.service.StageWorkflowScheduler -import com.atlarge.opendc.workflows.service.WorkflowEvent -import com.atlarge.opendc.workflows.service.WorkflowMessage +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor +import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode -import com.atlarge.opendc.workflows.service.WorkflowService import com.atlarge.opendc.workflows.service.stage.job.FifoJobSortingPolicy import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy @@ -48,13 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDyn import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task import java.io.File import java.util.ServiceLoader import kotlin.math.max -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking /** @@ -66,75 +55,62 @@ fun main(args: Array) { return } - val scheduler = StageWorkflowScheduler( - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobSortingPolicy = FifoJobSortingPolicy(), - taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), - taskSortingPolicy = FifoTaskSortingPolicy(), - resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), - resourceSelectionPolicy = FirstFitResourceSelectionPolicy() - ) - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) .use { it.read() } - .let { env -> - env.copy(platforms = env.platforms.map { platform -> - platform.copy(zones = platform.zones.map { zone -> - val services = zone.services + setOf(ResourceManagementService, SimpleProvisioningService, WorkflowService(scheduler)) - zone.copy(services = services) - }) - }) + + var total = 0 + var finished = 0 + + val token = Channel() + + val monitor = object : WorkflowMonitor { + override suspend fun onJobStart(job: Job, time: Long) { + println("Job ${job.uid} submitted") + total += 1 } - val broker = object : Broker { - override suspend fun invoke(ctx: ProcessContext, platforms: List>) { - coroutineScope { - val zones = platforms.first().zones() - val service = zones.values.first().find(WorkflowService) - val activeJobs = mutableSetOf() - val channel = ctx.open() - val outlet = ctx.connect(service) - val inlet = ctx.listen(channel.receive) - - launch { - val reader = GwfTraceReader(File(args[0])) - - while (reader.hasNext() && isActive) { - val (time, job) = reader.next() - delay(max(0, time - ctx.clock.millis())) - outlet.send(WorkflowMessage.Submit(job, channel.send)) - } - } - - var total = 0 - var finished = 0 - - while (isActive) { - when (val msg = inlet.receive()) { - is WorkflowEvent.JobSubmitted -> { - println("Job ${msg.job.uid} submitted") - total += 1 - } - is WorkflowEvent.JobStarted -> { - activeJobs += msg.job - } - is WorkflowEvent.JobFinished -> { - activeJobs -= msg.job - finished += 1 - println("Jobs $finished/$total finished (${msg.job.tasks.size} tasks)") - if (activeJobs.isEmpty()) - return@coroutineScope - } - } - } + override suspend fun onJobFinish(job: Job, time: Long) { + finished += 1 + println("Jobs $finished/$total finished (${job.tasks.size} tasks)") + + if (finished == total) { + token.send(true) } } + + override suspend fun onTaskStart(job: Job, task: Task, time: Long) { + println("Task started ${task.uid}") + } + + override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { + println("Task finished ${task.uid}") + } } - val model = Model(environment, listOf(broker)) - val factory = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = factory({ model(it) }, name = "sim") + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider({ ctx -> + val scheduler = StageWorkflowService( + ctx, + environment.platforms[0].zones[0].services[ProvisioningService.Key], + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobSortingPolicy = FifoJobSortingPolicy(), + taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), + taskSortingPolicy = FifoTaskSortingPolicy(), + resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), + resourceSelectionPolicy = FirstFitResourceSelectionPolicy() + ) + + val reader = GwfTraceReader(File(args[0])) + + while (reader.hasNext()) { + val (time, job) = reader.next() + delay(max(0, time - ctx.clock.millis())) + scheduler.submit(job, monitor) + } + + token.receive() + }, name = "sim") runBlocking { system.run() -- cgit v1.2.3