summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-tpds/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-14 12:43:29 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-14 12:50:35 +0100
commit92e858e398bf69380dbacebc042dde2bfa8cfe9c (patch)
treea7431f3d74dd449d5c6053e77b4cb60cdb36d924 /opendc/opendc-experiments-tpds/src/main
parent5095d42c0a1fe0a593c84bccfdd594712e12ca1a (diff)
refactor: Integrate opendc-compute in existing model
This change refactors the existing model to use the new interfaces from the opendc-compute module.
Diffstat (limited to 'opendc/opendc-experiments-tpds/src/main')
-rw-r--r--opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt130
1 files changed, 53 insertions, 77 deletions
diff --git a/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt
index ffd1604e..3dcea99d 100644
--- a/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt
+++ b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt
@@ -24,23 +24,13 @@
package com.atlarge.opendc.experiments.tpds
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.SendRef
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.opendc.core.Broker
-import com.atlarge.opendc.core.Model
-import com.atlarge.opendc.core.PlatformMessage
-import com.atlarge.opendc.core.find
-import com.atlarge.opendc.core.services.provisioning.SimpleProvisioningService
-import com.atlarge.opendc.core.services.resources.ResourceManagementService
-import com.atlarge.opendc.core.zones
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.service.StageWorkflowScheduler
-import com.atlarge.opendc.workflows.service.WorkflowEvent
-import com.atlarge.opendc.workflows.service.WorkflowMessage
+import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
+import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode
-import com.atlarge.opendc.workflows.service.WorkflowService
import com.atlarge.opendc.workflows.service.stage.job.FifoJobSortingPolicy
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
@@ -48,13 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDyn
import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy
import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy
import com.atlarge.opendc.workflows.workload.Job
+import com.atlarge.opendc.workflows.workload.Task
import java.io.File
import java.util.ServiceLoader
import kotlin.math.max
-import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
/**
@@ -66,75 +55,62 @@ fun main(args: Array<String>) {
return
}
- val scheduler = StageWorkflowScheduler(
- mode = WorkflowSchedulerMode.Batch(100),
- jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobSortingPolicy = FifoJobSortingPolicy(),
- taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(),
- taskSortingPolicy = FifoTaskSortingPolicy(),
- resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(),
- resourceSelectionPolicy = FirstFitResourceSelectionPolicy()
- )
-
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
.use { it.read() }
- .let { env ->
- env.copy(platforms = env.platforms.map { platform ->
- platform.copy(zones = platform.zones.map { zone ->
- val services = zone.services + setOf(ResourceManagementService, SimpleProvisioningService, WorkflowService(scheduler))
- zone.copy(services = services)
- })
- })
+
+ var total = 0
+ var finished = 0
+
+ val token = Channel<Boolean>()
+
+ val monitor = object : WorkflowMonitor {
+ override suspend fun onJobStart(job: Job, time: Long) {
+ println("Job ${job.uid} submitted")
+ total += 1
}
- val broker = object : Broker {
- override suspend fun invoke(ctx: ProcessContext, platforms: List<SendRef<PlatformMessage>>) {
- coroutineScope {
- val zones = platforms.first().zones()
- val service = zones.values.first().find(WorkflowService)
- val activeJobs = mutableSetOf<Job>()
- val channel = ctx.open<WorkflowEvent>()
- val outlet = ctx.connect(service)
- val inlet = ctx.listen(channel.receive)
-
- launch {
- val reader = GwfTraceReader(File(args[0]))
-
- while (reader.hasNext() && isActive) {
- val (time, job) = reader.next()
- delay(max(0, time - ctx.clock.millis()))
- outlet.send(WorkflowMessage.Submit(job, channel.send))
- }
- }
-
- var total = 0
- var finished = 0
-
- while (isActive) {
- when (val msg = inlet.receive()) {
- is WorkflowEvent.JobSubmitted -> {
- println("Job ${msg.job.uid} submitted")
- total += 1
- }
- is WorkflowEvent.JobStarted -> {
- activeJobs += msg.job
- }
- is WorkflowEvent.JobFinished -> {
- activeJobs -= msg.job
- finished += 1
- println("Jobs $finished/$total finished (${msg.job.tasks.size} tasks)")
- if (activeJobs.isEmpty())
- return@coroutineScope
- }
- }
- }
+ override suspend fun onJobFinish(job: Job, time: Long) {
+ finished += 1
+ println("Jobs $finished/$total finished (${job.tasks.size} tasks)")
+
+ if (finished == total) {
+ token.send(true)
}
}
+
+ override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
+ println("Task started ${task.uid}")
+ }
+
+ override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
+ println("Task finished ${task.uid}")
+ }
}
- val model = Model(environment, listOf(broker))
- val factory = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = factory({ model(it) }, name = "sim")
+ val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+ val system = provider({ ctx ->
+ val scheduler = StageWorkflowService(
+ ctx,
+ environment.platforms[0].zones[0].services[ProvisioningService.Key],
+ mode = WorkflowSchedulerMode.Batch(100),
+ jobAdmissionPolicy = NullJobAdmissionPolicy,
+ jobSortingPolicy = FifoJobSortingPolicy(),
+ taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(),
+ taskSortingPolicy = FifoTaskSortingPolicy(),
+ resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(),
+ resourceSelectionPolicy = FirstFitResourceSelectionPolicy()
+ )
+
+ val reader = GwfTraceReader(File(args[0]))
+
+ while (reader.hasNext()) {
+ val (time, job) = reader.next()
+ delay(max(0, time - ctx.clock.millis()))
+ scheduler.submit(job, monitor)
+ }
+
+ token.receive()
+ }, name = "sim")
runBlocking {
system.run()