diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-11 22:28:31 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-12 13:15:52 +0100 |
| commit | 6e4a9dd6af6b768468194b5a2aeffd60836e6407 (patch) | |
| tree | de5f4f0d2ee1571facea6b5319853b189aa35c0a | |
| parent | ce4b9bd28d9cb24b112a3a4723252c5bbca2fe5b (diff) | |
Refactor SC18 experiments to use new experiment harness
6 files changed, 227 insertions, 181 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index ee2295d9..b6b35694 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -29,11 +29,12 @@ plugins { } application { - mainClassName = "org.opendc.experiments.sc18.TestExperiment" + mainClass.set("org.opendc.harness.runner.console.ConsoleRunnerKt") } dependencies { api(project(":opendc-core")) + api(project(":opendc-harness")) implementation(project(":opendc-format")) implementation(project(":opendc-workflows")) implementation(project(":opendc-simulator:opendc-simulator-core")) diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt deleted file mode 100644 index 202df6df..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc18 - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.test.TestCoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.WorkflowEvent -import org.opendc.workflows.service.WorkflowSchedulerMode -import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy -import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import java.io.File -import kotlin.math.max - -/** - * Main entry point of the experiment. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public fun main(args: Array<String>) { - if (args.isEmpty()) { - println("error: Please provide path to GWF trace") - return - } - - var total = 0 - var finished = 0 - - val token = Channel<Boolean>() - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - - val schedulerAsync = testScope.async { - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(testScope, clock) } - - val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000) - - // Wait for the hypervisors to be spawned - delay(10) - - StageWorkflowService( - testScope, - clock, - tracer, - provisioner, - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - } - - testScope.launch { - val scheduler = schedulerAsync.await() - scheduler.events - .onEach { event -> - when (event) { - is WorkflowEvent.JobStarted -> { - println("Job ${event.job.uid} started") - } - is WorkflowEvent.JobFinished -> { - finished += 1 - println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)") - - if (finished == total) { - token.send(true) - } - } - } - } - .collect() - } - - testScope.launch { - val reader = GwfTraceReader(File(args[0])) - val scheduler = schedulerAsync.await() - - while (reader.hasNext()) { - val (time, job) = reader.next() - total += 1 - delay(max(0, time * 1000 - clock.millis())) - scheduler.submit(job) - } - } - - testScope.advanceUntilIdle() -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt new file mode 100644 index 00000000..6d2c0ec7 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.sc18 + +import kotlinx.coroutines.* +import kotlinx.coroutines.test.TestCoroutineScope +import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.format.environment.sc18.Sc18EnvironmentReader +import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.harness.dsl.Experiment +import org.opendc.harness.dsl.anyOf +import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer +import org.opendc.trace.core.enable +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.WorkflowEvent +import org.opendc.workflows.service.WorkflowSchedulerMode +import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy +import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy +import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import java.io.File +import java.io.FileInputStream +import kotlin.math.max + +/** + * The [UnderspecificationExperiment] investigates the impact of scheduler underspecification on performance. + * It focuses on components that must exist (that is, based on their own publications, the correct operation of the + * schedulers under study requires these components), yet have been left underspecified by their author. + */ +public class UnderspecificationExperiment : Experiment("underspecification") { + /** + * The workflow traces to test. + */ + private val trace: String by anyOf("traces/chronos_exp_noscaler_ca.gwf") + + /** + * The datacenter environments to test. + */ + private val environment: String by anyOf("environments/base.json") + + @OptIn(ExperimentalCoroutinesApi::class) + override fun doRun(repeat: Int) { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) + val recording = tracer.openRecording().run { + enable<WorkflowEvent.JobSubmitted>() + enable<WorkflowEvent.JobStarted>() + enable<WorkflowEvent.JobFinished>() + enable<WorkflowEvent.TaskStarted>() + enable<WorkflowEvent.TaskFinished>() + this + } + + testScope.launch { + launch { println("MAKESPAN: ${recording.workflowRuntime()}") } + launch { println("WAIT: ${recording.workflowWaitingTime()}") } + recording.start() + } + + testScope.launch { + val environment = Sc18EnvironmentReader(FileInputStream(File(environment))) + .use { it.construct(testScope, clock) } + + val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val provisioner = SimVirtProvisioningService( + testScope, + clock, + bareMetal, + NumberOfActiveServersAllocationPolicy(), + tracer, + SimSpaceSharedHypervisorProvider(), + schedulingQuantum = 1000 + ) + + // Wait for the hypervisors to be spawned + delay(10) + + val scheduler = StageWorkflowService( + testScope, + clock, + tracer, + provisioner, + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + + val reader = GwfTraceReader(File(trace)) + + while (reader.hasNext()) { + val (time, job) = reader.next() + delay(max(0, time * 1000 - clock.millis())) + scheduler.submit(job) + } + } + + testScope.advanceUntilIdle() + recording.close() + + // Check whether everything went okay + testScope.uncaughtExceptions.forEach { it.printStackTrace() } + assert(testScope.uncaughtExceptions.isEmpty()) { "Errors occurred during execution of the experiment" } + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt new file mode 100644 index 00000000..dbd04b87 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.sc18 + +import org.opendc.trace.core.EventStream +import org.opendc.trace.core.onEvent +import org.opendc.workflows.service.WorkflowEvent +import java.util.* +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +/** + * This function collects the makespan of workflows that appear in the event stream. + */ +public suspend fun EventStream.workflowRuntime(): Map<UUID, Long> = suspendCoroutine { cont -> + val starts = mutableMapOf<UUID, Long>() + val results = mutableMapOf<UUID, Long>() + + onEvent<WorkflowEvent.JobStarted> { + starts[it.job.uid] = it.timestamp + } + onEvent<WorkflowEvent.JobFinished> { + val start = starts.remove(it.job.uid) ?: return@onEvent + results[it.job.uid] = it.timestamp - start + } + onClose { cont.resume(results) } +} + +/** + * This function collects the waiting time of workflows that appear in the event stream, which the duration between the + * workflow submission and the start of the first task. + */ +public suspend fun EventStream.workflowWaitingTime(): Map<UUID, Long> = suspendCoroutine { cont -> + val starts = mutableMapOf<UUID, Long>() + val results = mutableMapOf<UUID, Long>() + + onEvent<WorkflowEvent.JobStarted> { + starts[it.job.uid] = it.timestamp + } + onEvent<WorkflowEvent.TaskStarted> { + results.computeIfAbsent(it.job.uid) { _ -> + val start = starts.remove(it.job.uid)!! + it.timestamp - start + } + } + onClose { cont.resume(results) } +} + +/** + * This function collects the response time of tasks that appear in the event stream. + */ +public suspend fun EventStream.taskResponse(): Map<UUID, Long> = suspendCoroutine { cont -> + val starts = mutableMapOf<UUID, Long>() + val results = mutableMapOf<UUID, Long>() + + onEvent<WorkflowEvent.JobSubmitted> { + for (task in it.job.tasks) { + starts[task.uid] = it.timestamp + } + } + onEvent<WorkflowEvent.TaskFinished> { + val start = starts.remove(it.job.uid) ?: return@onEvent + results[it.task.uid] = it.timestamp - start + } + onClose { cont.resume(results) } +} diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/resources/env/setup-test.json b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/resources/env/setup-test.json deleted file mode 100644 index 0965b250..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/resources/env/setup-test.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "Experimental Setup 2", - "rooms": [ - { - "type": "SERVER", - "objects": [ - { - "type": "RACK", - "machines": [ - { "cpus": [2] }, { "cpus": [2]}, - { "cpus": [2] }, { "cpus": [2]}, - { "cpus": [2] }, { "cpus": [2]}, - { "cpus": [2] }, { "cpus": [2]}, - { "cpus": [2] }, { "cpus": [2]}, - { "cpus": [2] }, { "cpus": [2]}, - { "cpus": [2] }, { "cpus": [2]}, - { "cpus": [2] }, { "cpus": [2]} - ] - }, - { - "type": "RACK", - "machines": [ - { "cpus": [1] }, { "cpus": [1]}, - { "cpus": [1] }, { "cpus": [1]}, - { "cpus": [1] }, { "cpus": [1]}, - { "cpus": [1] }, { "cpus": [1]}, - { "cpus": [1] }, { "cpus": [1]}, - { "cpus": [1] }, { "cpus": [1]}, - { "cpus": [1] }, { "cpus": [1]}, - { "cpus": [1] }, { "cpus": [1]} - ] - } - ] - } - ] -} diff --git a/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt b/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt index db2cd191..65a0604d 100644 --- a/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt +++ b/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt @@ -22,13 +22,11 @@ package org.opendc.harness.engine -import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.* import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch -import kotlinx.coroutines.supervisorScope import org.opendc.harness.api.ExperimentDefinition import org.opendc.harness.api.Trial import org.opendc.harness.engine.scheduler.ExperimentScheduler @@ -53,6 +51,7 @@ public class ExperimentEngine( * * @param root The experiment to execute. */ + @OptIn(InternalCoroutinesApi::class) public suspend fun execute(root: ExperimentDefinition): Unit = supervisorScope { listener.experimentStarted(root) @@ -75,25 +74,13 @@ public class ExperimentEngine( listener.trialFinished(trial, null) } catch (e: Throwable) { listener.trialFinished(trial, e) - throw e } } } launch { - var error: Throwable? = null - for (job in jobs) { - try { - job.join() - } catch (e: CancellationException) { - // Propagate cancellation - throw e - } catch (e: Throwable) { - error = e - } - } - - listener.scenarioFinished(scenario, error) + jobs.joinAll() + listener.scenarioFinished(scenario, null) } } listener.experimentFinished(root, null) |
