summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-11-16 15:49:46 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-11-16 15:49:46 +0100
commit1cfd967d6d27f339b264449ff2a1adeb705de598 (patch)
tree6dc98eb37c05f7b07adff1d4e085f16c35727f6b /opendc-workflow/opendc-workflow-service/src/test
parent381c7589cbf01ca6ed321c58c8a3a9cbea6ebd84 (diff)
feat(workflow): Add helper tools for workflow simulations
This change adds a new module, opendc-workflow-workload that contains helper code for constructing workflow simulations using OpenDC.
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src/test')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt127
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt95
2 files changed, 42 insertions, 180 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
deleted file mode 100644
index 9ee3736e..00000000
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.service
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.trace.*
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.time.Clock
-import java.util.*
-import kotlin.collections.HashMap
-import kotlin.collections.HashSet
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * Helper tool to replay workflow trace.
- */
-internal class TraceReplayer(private val trace: Trace) {
- /**
- * Replay the workload.
- */
- public suspend fun replay(clock: Clock, service: WorkflowService) {
- val jobs = parseTrace(trace)
-
- // Sort jobs by their arrival time
- (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long }
-
- // Wait until the trace is started
- val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long
- delay(min(0L, startTime - clock.millis()))
-
- val offset = startTime - clock.millis()
-
- coroutineScope {
- for (job in jobs) {
- val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long
- delay(max(0, (submitTime - offset) - clock.millis()))
-
- launch { service.run(job) }
- }
- }
- }
-
- /**
- * Convert [trace] into a list of [Job]s that can be submitted to the workflow service.
- */
- public fun parseTrace(trace: Trace): List<Job> {
- val table = checkNotNull(trace.getTable(TABLE_TASKS))
- val reader = table.newReader()
-
- val jobs = mutableMapOf<Long, Job>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, Set<Long>>()
-
- try {
- while (reader.nextRow()) {
- // Bag of tasks without workflow ID all share the same workflow
- val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L
- val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
-
- val id = reader.get(TASK_ID).toLong()
- val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
- reader.getInt(TASK_ALLOC_NCPUS)
- else
- reader.getInt(TASK_REQ_NCPUS)
- val submitTime = reader.get(TASK_SUBMIT_TIME)
- val runtime = reader.get(TASK_RUNTIME)
- val flops: Long = 4000 * runtime.seconds * grantedCpus
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, id),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to grantedCpus,
- WORKFLOW_TASK_DEADLINE to runtime.toMillis()
- ),
- )
-
- tasks[id] = task
- taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet()
-
- (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
- (workflow.tasks as MutableSet<Task>).add(task)
- }
-
- // Resolve dependencies for all tasks
- for ((task, deps) in taskDependencies) {
- for (dep in deps) {
- val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
- (task.dependencies as MutableSet<Task>).add(parent)
- }
- }
- } finally {
- reader.close()
- }
-
- return jobs.values.toList()
- }
-}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 04f54e58..d80c098b 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -22,41 +22,41 @@
package org.opendc.workflow.service
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.workload.ComputeServiceHelper
+import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.trace.Trace
-import org.opendc.workflow.service.internal.WorkflowServiceImpl
import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
+import org.opendc.workflow.workload.WorkflowSchedulerSpec
+import org.opendc.workflow.workload.WorkflowServiceHelper
+import org.opendc.workflow.workload.toJobs
import java.nio.file.Paths
import java.time.Duration
import java.util.*
/**
- * Integration test suite for the [WorkflowServiceImpl].
+ * Integration test suite for the [WorkflowService].
*/
@DisplayName("WorkflowService")
internal class WorkflowServiceTest {
@@ -65,60 +65,39 @@ internal class WorkflowServiceTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val meterProvider: MeterProvider = SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-
- val interpreter = FlowEngine(coroutineContext, clock)
- val machineModel = createMachineModel()
- val hvProvider = SimSpaceSharedHypervisorProvider()
- val hosts = List(4) { id ->
- SimHost(
- UUID(0, id.toLong()),
- "node-$id",
- machineModel,
- emptyMap(),
- coroutineContext,
- interpreter,
- MeterProvider.noop(),
- hvProvider,
- )
- }
-
+ // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
+ val HOST_COUNT = 4
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
+ val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
- hosts.forEach { compute.addHost(it) }
+ repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) }
- val scheduler = WorkflowService(
- coroutineContext,
- clock,
- meterProvider,
- compute.newClient(),
- mode = WorkflowSchedulerMode.Batch(100),
+ // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
+ val workflowScheduler = WorkflowSchedulerSpec(
+ batchMode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
taskEligibilityPolicy = NullTaskEligibilityPolicy,
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
)
+ val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler)
- val trace = Trace.open(
- Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
- format = "gwf"
- )
- val replayer = TraceReplayer(trace)
-
- replayer.replay(clock, scheduler)
+ try {
+ val trace = Trace.open(
+ Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
+ format = "gwf"
+ )
- hosts.forEach(SimHost::close)
- scheduler.close()
- compute.close()
+ workflowHelper.replay(trace.toJobs())
+ } finally {
+ workflowHelper.close()
+ computeHelper.close()
+ }
- val metrics = collectMetrics(meterProvider as MetricProducer)
+ val metrics = collectMetrics(workflowHelper.metricProducer)
assertAll(
{ assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") },
@@ -126,19 +105,29 @@ internal class WorkflowServiceTest {
{ assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
{ assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(33213236L, clock.millis()) }
+ { assertEquals(33214236L, clock.millis()) { "Total duration incorrect" } }
)
}
/**
- * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
+ * Construct a [HostSpec] for a simulated host.
*/
- private fun createMachineModel(): MachineModel {
+ private fun createHostSpec(uid: Int): HostSpec {
+ // Machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32)
- val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) }
+ val cpus = List(node.coreCount) { ProcessingUnit(node, it, 3400.0) }
val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) }
- return MachineModel(cpus, memory)
+ val machineModel = MachineModel(cpus, memory)
+
+ return HostSpec(
+ UUID(0, uid.toLong()),
+ "host-$uid",
+ emptyMap(),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0)),
+ SimSpaceSharedHypervisorProvider()
+ )
}
class WorkflowMetrics {