From ef3868ec729f7ce3f5976d4f9a0c8b95098d9857 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 4 May 2021 14:40:46 +0200 Subject: build: Update to Kotlin 1.5.0 --- opendc-workflow/opendc-workflow-service/build.gradle.kts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 5e73222c..bc082dbc 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -41,6 +41,9 @@ dependencies { testImplementation(projects.opendcCompute.opendcComputeSimulator) testImplementation(projects.opendcFormat) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(libs.jackson.module.kotlin) + testImplementation(libs.jackson.module.kotlin) { + exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") + } + testImplementation(kotlin("reflect")) testRuntimeOnly(libs.log4j.slf4j) } -- cgit v1.2.3 From df3c9dc3fcd2f89910575bfdc24a3db3af9eba0f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 20 Jun 2021 22:21:39 +0200 Subject: exp: Enable interpreter sharing across hosts This change enables the experiments to share the SimResourceInterpreter across multiple hosts, which allows updates to be scheduled efficiently for all machines at the same time. This is especially beneficial if the machines operate on the same time slices. --- .../org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index a8d3a9e8..6807572b 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -43,6 +43,7 @@ import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -68,6 +69,7 @@ internal class WorkflowServiceIntegrationTest { .setClock(clock.toOtelClock()) .build() + val interpreter = SimResourceInterpreter(coroutineContext, clock) val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) .use { it.read() } .map { def -> @@ -77,7 +79,7 @@ internal class WorkflowServiceIntegrationTest { def.model, def.meta, coroutineContext, - clock, + interpreter, MeterProvider.noop().get("opendc-compute-simulator"), SimSpaceSharedHypervisorProvider() ) -- cgit v1.2.3 From b29f90e5ad5bcac29cde86e56c06e0b65a52cedc Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 21 Jun 2021 20:57:06 +0200 Subject: simulator: Re-organize compute simulator module This change re-organizes the classes of the compute simulator module to make a clearer distinction between the hardware, firmware and software interfaces in this module. --- .../org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index 6807572b..413112af 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -41,7 +41,7 @@ import org.opendc.compute.service.scheduler.weights.ProvisionedCoresWeigher import org.opendc.compute.simulator.SimHost import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock -- cgit v1.2.3 From e56967a29ac2b2d26cc085b1f3e27096dad6a170 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 24 Jun 2021 12:54:52 +0200 Subject: simulator: Re-implement performance interference model This change updates reimplements the performance interference model to work on top of the universal resource model in `opendc-simulator-resources`. This enables us to model interference and performance variability of other resources such as disk or network in the future. --- .../org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index 413112af..38c774a9 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -25,7 +25,6 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -57,7 +56,6 @@ import kotlin.math.max * Integration test suite for the [WorkflowServiceImpl]. */ @DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) internal class WorkflowServiceIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -70,7 +68,7 @@ internal class WorkflowServiceIntegrationTest { .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json"))) .use { it.read() } .map { def -> SimHost( @@ -106,7 +104,7 @@ internal class WorkflowServiceIntegrationTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) + val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf"))) var offset = Long.MIN_VALUE coroutineScope { -- cgit v1.2.3 From b8f64c1d3df2c990df8941cd036222fab2def9fa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 22 Aug 2021 13:23:53 +0200 Subject: refactor(compute): Update FilterScheduler to follow OpenStack's Nova This change updates the FilterScheduler implementation to follow more closely the scheduler implementation in OpenStack's Nova. We now normalize the weights, support many of the filters and weights in OpenStack and support overcommitting resources. --- .../opendc/workflow/service/WorkflowServiceIntegrationTest.kt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index 38c774a9..d82959e7 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -34,9 +34,10 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.ProvisionedCoresWeigher +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader @@ -55,7 +56,7 @@ import kotlin.math.max /** * Integration test suite for the [WorkflowServiceImpl]. */ -@DisplayName("WorkflowServiceImpl") +@DisplayName("WorkflowService") internal class WorkflowServiceIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -85,8 +86,8 @@ internal class WorkflowServiceIntegrationTest { val meter = MeterProvider.noop().get("opendc-compute") val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) -- cgit v1.2.3 From f111081627280d4e7e1d7147c56cdce708e32433 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 14:06:39 +0200 Subject: build: Upgrade to OpenTelemetry 1.5 This change upgrades the OpenTelemetry dependency to version 1.5, which contains various breaking changes in the metrics API. --- .../opendc/workflow/service/internal/WorkflowServiceImpl.kt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 32191b8f..5329143d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -155,7 +155,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have been submitted to the service. */ - private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + private val submittedJobs = meter.counterBuilder("jobs.submitted") .setDescription("Number of submitted jobs") .setUnit("1") .build() @@ -163,7 +163,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + private val runningJobs = meter.upDownCounterBuilder("jobs.active") .setDescription("Number of jobs running") .setUnit("1") .build() @@ -171,7 +171,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedJobs = meter.longCounterBuilder("jobs.finished") + private val finishedJobs = meter.counterBuilder("jobs.finished") .setDescription("Number of jobs that finished running") .setUnit("1") .build() @@ -179,7 +179,7 @@ public class WorkflowServiceImpl( /** * The number of tasks that have been submitted to the service. */ - private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + private val submittedTasks = meter.counterBuilder("tasks.submitted") .setDescription("Number of submitted tasks") .setUnit("1") .build() @@ -187,7 +187,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + private val runningTasks = meter.upDownCounterBuilder("tasks.active") .setDescription("Number of tasks running") .setUnit("1") .build() @@ -195,7 +195,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedTasks = meter.longCounterBuilder("tasks.finished") + private val finishedTasks = meter.counterBuilder("tasks.finished") .setDescription("Number of tasks that finished running") .setUnit("1") .build() -- cgit v1.2.3 From 23c1502c2668305fd5f4c38c6c794c985d2037e3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 14:56:08 +0200 Subject: refactor(trace): Move GWF trace reader into separate module This change starts the process of moving the different trace formats into separate modules. This change in particular moves the GWF trace format into a new module, opendc-trace-gwf. Furthermore, this change also implements the trace API for the GWF module. --- .../opendc-workflow-service/build.gradle.kts | 6 +- .../org/opendc/workflow/service/TraceReplayer.kt | 127 ++++++++++++++++ .../service/WorkflowServiceIntegrationTest.kt | 164 --------------------- .../opendc/workflow/service/WorkflowServiceTest.kt | 163 ++++++++++++++++++++ 4 files changed, 291 insertions(+), 169 deletions(-) create mode 100644 opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt delete mode 100644 opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt create mode 100644 opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index bc082dbc..941202d2 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,11 +39,7 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcFormat) + testImplementation(projects.opendcTrace.opendcTraceGwf) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(libs.jackson.module.kotlin) { - exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") - } - testImplementation(kotlin("reflect")) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt new file mode 100644 index 00000000..a390fe08 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.time.Clock +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.max +import kotlin.math.min + +/** + * Helper tool to replay workflow trace. + */ +internal class TraceReplayer(private val trace: Trace) { + /** + * Replay the workload. + */ + public suspend fun replay(clock: Clock, service: WorkflowService) { + val jobs = parseTrace(trace) + + // Sort jobs by their arrival time + (jobs as MutableList).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long } + + // Wait until the trace is started + val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(min(0L, startTime - clock.millis())) + + val offset = startTime - clock.millis() + + coroutineScope { + for (job in jobs) { + val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(max(0, (submitTime - offset) - clock.millis())) + + launch { service.run(job) } + } + } + } + + /** + * Convert [trace] into a list of [Job]s that can be submitted to the workflow service. + */ + public fun parseTrace(trace: Trace): List { + val table = checkNotNull(trace.getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf() + val tasks = mutableMapOf() + val taskDependencies = mutableMapOf>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "", HashSet(), HashMap()) } + + val id = reader.getLong(TASK_ID) + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.getLong(TASK_SUBMIT_TIME) + val runtime = reader.getLong(TASK_RUNTIME) + val flops: Long = 4000 * runtime * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to (runtime * 1000) + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS) + + (workflow.metadata as MutableMap).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt deleted file mode 100644 index d82959e7..00000000 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter -import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.telemetry.sdk.toOtelClock -import org.opendc.workflow.service.internal.WorkflowServiceImpl -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode -import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max - -/** - * Integration test suite for the [WorkflowServiceImpl]. - */ -@DisplayName("WorkflowService") -internal class WorkflowServiceIntegrationTest { - /** - * A large integration test where we check whether all tasks in some trace are executed correctly. - */ - @Test - fun testTrace() = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json"))) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - MeterProvider.noop().get("opendc-compute-simulator"), - SimSpaceSharedHypervisorProvider() - ) - } - - val meter = MeterProvider.noop().get("opendc-compute") - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) - ) - val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) - - hosts.forEach { compute.addHost(it) } - - val scheduler = WorkflowService( - coroutineContext, - clock, - meterProvider.get("opendc-workflow"), - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - - val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf"))) - var offset = Long.MIN_VALUE - - coroutineScope { - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - delay(max(0, (entry.start - offset) - clock.millis())) - launch { - scheduler.run(entry.workload) - } - } - } - - hosts.forEach(SimHost::close) - scheduler.close() - compute.close() - - val metrics = collectMetrics(meterProvider as MetricProducer) - - assertAll( - { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, - { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } - ) - } - - class WorkflowMetrics { - var jobsSubmitted = 0L - var jobsActive = 0L - var jobsFinished = 0L - var tasksSubmitted = 0L - var tasksActive = 0L - var tasksFinished = 0L - } - - /** - * Collect the metrics of the workflow service. - */ - private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } - val res = WorkflowMetrics() - res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 - return res - } -} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt new file mode 100644 index 00000000..07433d1f --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.VCpuWeigher +import org.opendc.compute.simulator.SimHost +import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.trace.gwf.GwfTraceFormat +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.util.* + +/** + * Integration test suite for the [WorkflowServiceImpl]. + */ +@DisplayName("WorkflowService") +internal class WorkflowServiceTest { + /** + * A large integration test where we check whether all tasks in some trace are executed correctly. + */ + @Test + fun testTrace() = runBlockingSimulation { + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val interpreter = SimResourceInterpreter(coroutineContext, clock) + val machineModel = createMachineModel() + val hvProvider = SimSpaceSharedHypervisorProvider() + val hosts = List(4) { id -> + SimHost( + UUID(0, id.toLong()), + "node-$id", + machineModel, + emptyMap(), + coroutineContext, + interpreter, + meterProvider.get("opendc-compute-simulator"), + hvProvider, + ) + } + + val meter = MeterProvider.noop().get("opendc-compute") + val computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) + ) + val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } + + val scheduler = WorkflowService( + coroutineContext, + clock, + meterProvider.get("opendc-workflow"), + compute.newClient(), + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + + val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf"))) + val replayer = TraceReplayer(trace) + + replayer.replay(clock, scheduler) + + hosts.forEach(SimHost::close) + scheduler.close() + compute.close() + + val metrics = collectMetrics(meterProvider as MetricProducer) + + assertAll( + { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, + { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, + { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(33213237L, clock.millis()) } + ) + } + + /** + * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html + */ + private fun createMachineModel(): MachineModel { + val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32) + val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } + val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } + + return MachineModel(cpus, memory) + } + + class WorkflowMetrics { + var jobsSubmitted = 0L + var jobsActive = 0L + var jobsFinished = 0L + var tasksSubmitted = 0L + var tasksActive = 0L + var tasksFinished = 0L + } + + /** + * Collect the metrics of the workflow service. + */ + private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = WorkflowMetrics() + res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 + return res + } +} -- cgit v1.2.3 From b7be3400bb4b21d0cd7021e2baf1f6ce43aba189 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 22:10:22 +0200 Subject: feat(trace): Add support for WfCommons (WorkflowHub) traces This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module. --- .../kotlin/org/opendc/workflow/service/TraceReplayer.kt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt index a390fe08..9ee3736e 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -81,17 +81,17 @@ internal class TraceReplayer(private val trace: Trace) { try { while (reader.nextRow()) { // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "", HashSet(), HashMap()) } - val id = reader.getLong(TASK_ID) + val id = reader.get(TASK_ID).toLong() val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) reader.getInt(TASK_ALLOC_NCPUS) else reader.getInt(TASK_REQ_NCPUS) - val submitTime = reader.getLong(TASK_SUBMIT_TIME) - val runtime = reader.getLong(TASK_RUNTIME) - val flops: Long = 4000 * runtime * grantedCpus + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, id), @@ -100,14 +100,14 @@ internal class TraceReplayer(private val trace: Trace) { mapOf( "workload" to workload, WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to (runtime * 1000) + WORKFLOW_TASK_DEADLINE to runtime.toMillis() ), ) tasks[id] = task - taskDependencies[task] = reader.get(TASK_PARENTS) + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() - (workflow.metadata as MutableMap).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) } + (workflow.metadata as MutableMap).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } (workflow.tasks as MutableSet).add(task) } -- cgit v1.2.3 From 3ca64e0110adab65526a0ccfd5b252e9f047ab10 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 14 Sep 2021 14:41:05 +0200 Subject: refactor(telemetry): Create separate MeterProvider per service/host This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario. --- .../main/kotlin/org/opendc/workflow/service/WorkflowService.kt | 8 ++++---- .../org/opendc/workflow/service/internal/WorkflowServiceImpl.kt | 8 +++++++- .../kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt | 8 ++++---- 3 files changed, 15 insertions(+), 9 deletions(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index d3358ef1..a0248a93 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,7 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. - * @param meter The meter to use. + * @param meterProvider The meter provider to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meter, + meterProvider, compute, mode, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 5329143d..a0fd3fad 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.flow.map import mu.KotlinLogging @@ -48,7 +49,7 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -66,6 +67,11 @@ public class WorkflowServiceImpl( */ private val logger = KotlinLogging.logger {} + /** + * The [Meter] to collect metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.workflow.service") + /** * The incoming jobs ready to be processed by the scheduler. */ diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 07433d1f..74316437 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -51,6 +51,7 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.time.Duration import java.util.* /** @@ -79,24 +80,23 @@ internal class WorkflowServiceTest { emptyMap(), coroutineContext, interpreter, - meterProvider.get("opendc-compute-simulator"), + MeterProvider.noop(), hvProvider, ) } - val meter = MeterProvider.noop().get("opendc-compute") val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) + val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) hosts.forEach { compute.addHost(it) } val scheduler = WorkflowService( coroutineContext, clock, - meterProvider.get("opendc-workflow"), + meterProvider, compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, -- cgit v1.2.3 From c7fff03408ee3109d0a39a96c043584a2d8f67ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 22:04:23 +0200 Subject: refactor(trace): Simplify TraceFormat SPI interface This change simplifies the TraceFormat SPI interface by reducing the number of interfaces that implementors need to implement to only TraceFormat. --- opendc-workflow/opendc-workflow-service/build.gradle.kts | 3 ++- .../kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 941202d2..43b64b15 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,7 +39,8 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcTrace.opendcTraceGwf) + testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) + testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 74316437..728dfd99 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -44,13 +44,14 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock -import org.opendc.trace.gwf.GwfTraceFormat +import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import java.nio.file.Paths import java.time.Duration import java.util.* @@ -105,7 +106,10 @@ internal class WorkflowServiceTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf"))) + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) val replayer = TraceReplayer(trace) replayer.replay(clock, scheduler) -- cgit v1.2.3 From d575bed5418be222e1d3ad39af862e2390596d61 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 26 Sep 2021 13:11:10 +0200 Subject: refactor(simulator): Combine work and deadline to duration This change removes the work and deadline properties from the SimResourceCommand.Consume class and introduces a new property duration. This property is now used in conjunction with the limit to compute the amount of work processed by a resource provider. Previously, we used both work and deadline to compute the duration and the amount of remaining work at the end of a consumption. However, with this change, we ensure that a resource consumption always runs at the same speed once establishing, drastically simplifying the computation for the amount of work processed during the consumption. --- .../src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 728dfd99..992b4991 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -126,7 +126,7 @@ internal class WorkflowServiceTest { { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(33213237L, clock.millis()) } + { assertEquals(33213236L, clock.millis()) } ) } -- cgit v1.2.3 From 4cc1d40d421c8736f8b21b360b61d6b065158b7a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 29 Sep 2021 23:56:16 +0200 Subject: refactor(simulator): Migrate to flow-based simulation This change renames the `opendc-simulator-resources` module into the `opendc-simulator-flow` module to indicate that the core simulation model of OpenDC is based around modelling and simulating flows. Previously, the distinction between resource consumer and provider, and input and output caused some confusion. By switching to a flow-based model, this distinction is now clear (as in, the water flows from source to consumer/sink). --- .../test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc-workflow') diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 992b4991..04f54e58 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -42,7 +42,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.sdk.toOtelClock import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -70,7 +70,7 @@ internal class WorkflowServiceTest { .setClock(clock.toOtelClock()) .build() - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val interpreter = FlowEngine(coroutineContext, clock) val machineModel = createMachineModel() val hvProvider = SimSpaceSharedHypervisorProvider() val hosts = List(4) { id -> -- cgit v1.2.3