From 9cf24c9a8d3e96a29d9b111081bc3369aadd490d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 Jan 2021 17:25:40 +0100 Subject: Refactor workflow service to schedule tasks onto VMs This change updates the workflow service to delegate the resource scheduling logic to the virtualized resource provisioner. --- .../main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt | 10 +++++++--- .../main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt | 6 +++++- 2 files changed, 12 insertions(+), 4 deletions(-) (limited to 'simulator/opendc-format/src/main') diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index a20b4f29..b721905d 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -29,6 +29,7 @@ import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task +import org.opendc.workflows.workload.WORKFLOW_TASK_CORES import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File @@ -122,8 +123,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader { val workflowId = values[workflowIdCol].trim().toLong() val taskId = values[taskIdCol].trim().toLong() - val submitTime = values[submitTimeCol].trim().toLong() - val runtime = max(0, values[runtimeCol].trim().toLong()) + val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms + val runtime = max(0, values[runtimeCol].trim().toLong()) // s val cores = values[coreCol].trim().toInt() val dependencies = values[dependencyCol].split(" ") .filter { it.isNotEmpty() } @@ -140,7 +141,10 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader { "", SimWorkloadImage(UUID.randomUUID(), "", emptyMap(), SimFlopsWorkload(flops, cores)), HashSet(), - mapOf(WORKFLOW_TASK_DEADLINE to runtime) + mapOf( + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to (runtime * 1000) + ), ) entry.submissionTime = min(entry.submissionTime, submitTime) (workflow.tasks as MutableSet).add(task) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index b2931468..381a0b41 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -32,6 +32,7 @@ import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task +import org.opendc.workflows.workload.WORKFLOW_TASK_CORES import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.util.UUID import kotlin.math.min @@ -82,7 +83,10 @@ public class WtfTraceReader(path: String) : TraceReader { "", SimWorkloadImage(UUID.randomUUID(), "", emptyMap(), SimFlopsWorkload(flops, cores)), HashSet(), - mapOf(WORKFLOW_TASK_DEADLINE to runtime) + mapOf( + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) entry.submissionTime = min(entry.submissionTime, submitTime) -- cgit v1.2.3