summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-12 14:21:26 +0100
committerGitHub <noreply@github.com>2021-01-12 14:21:26 +0100
commitb1a9f17feb6c6068c1c8f0162a516e29ac48a35a (patch)
treede5f4f0d2ee1571facea6b5319853b189aa35c0a /simulator
parente9278df42673deaeace0d85337ac1434ec6c081d (diff)
parent6e4a9dd6af6b768468194b5a2aeffd60836e6407 (diff)
Merge pull request #73 from atlarge-research/feat/harness-sc18
Update SC18 experiments to use experiment harness
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt127
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt135
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt86
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/resources/env/setup-test.json36
-rw-r--r--simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt21
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt2
7 files changed, 228 insertions, 182 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
index ee2295d9..b6b35694 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
@@ -29,11 +29,12 @@ plugins {
}
application {
- mainClassName = "org.opendc.experiments.sc18.TestExperiment"
+ mainClass.set("org.opendc.harness.runner.console.ConsoleRunnerKt")
}
dependencies {
api(project(":opendc-core"))
+ api(project(":opendc-harness"))
implementation(project(":opendc-format"))
implementation(project(":opendc-workflows"))
implementation(project(":opendc-simulator:opendc-simulator-core"))
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
deleted file mode 100644
index 202df6df..00000000
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.sc18
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.test.TestCoroutineScope
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
-import org.opendc.format.environment.sc18.Sc18EnvironmentReader
-import org.opendc.format.trace.gwf.GwfTraceReader
-import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
-import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.WorkflowEvent
-import org.opendc.workflows.service.WorkflowSchedulerMode
-import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
-import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
-import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import java.io.File
-import kotlin.math.max
-
-/**
- * Main entry point of the experiment.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public fun main(args: Array<String>) {
- if (args.isEmpty()) {
- println("error: Please provide path to GWF trace")
- return
- }
-
- var total = 0
- var finished = 0
-
- val token = Channel<Boolean>()
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val tracer = EventTracer(clock)
-
- val schedulerAsync = testScope.async {
- val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(testScope, clock) }
-
- val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
-
- // Wait for the bare metal nodes to be spawned
- delay(10)
-
- val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000)
-
- // Wait for the hypervisors to be spawned
- delay(10)
-
- StageWorkflowService(
- testScope,
- clock,
- tracer,
- provisioner,
- mode = WorkflowSchedulerMode.Batch(100),
- jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
- taskEligibilityPolicy = NullTaskEligibilityPolicy,
- taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- )
- }
-
- testScope.launch {
- val scheduler = schedulerAsync.await()
- scheduler.events
- .onEach { event ->
- when (event) {
- is WorkflowEvent.JobStarted -> {
- println("Job ${event.job.uid} started")
- }
- is WorkflowEvent.JobFinished -> {
- finished += 1
- println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)")
-
- if (finished == total) {
- token.send(true)
- }
- }
- }
- }
- .collect()
- }
-
- testScope.launch {
- val reader = GwfTraceReader(File(args[0]))
- val scheduler = schedulerAsync.await()
-
- while (reader.hasNext()) {
- val (time, job) = reader.next()
- total += 1
- delay(max(0, time * 1000 - clock.millis()))
- scheduler.submit(job)
- }
- }
-
- testScope.advanceUntilIdle()
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
new file mode 100644
index 00000000..6d2c0ec7
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.sc18
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.test.TestCoroutineScope
+import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
+import org.opendc.format.environment.sc18.Sc18EnvironmentReader
+import org.opendc.format.trace.gwf.GwfTraceReader
+import org.opendc.harness.dsl.Experiment
+import org.opendc.harness.dsl.anyOf
+import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
+import org.opendc.trace.core.enable
+import org.opendc.workflows.service.StageWorkflowService
+import org.opendc.workflows.service.WorkflowEvent
+import org.opendc.workflows.service.WorkflowSchedulerMode
+import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
+import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
+import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
+import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import java.io.File
+import java.io.FileInputStream
+import kotlin.math.max
+
+/**
+ * The [UnderspecificationExperiment] investigates the impact of scheduler underspecification on performance.
+ * It focuses on components that must exist (that is, based on their own publications, the correct operation of the
+ * schedulers under study requires these components), yet have been left underspecified by their author.
+ */
+public class UnderspecificationExperiment : Experiment("underspecification") {
+ /**
+ * The workflow traces to test.
+ */
+ private val trace: String by anyOf("traces/chronos_exp_noscaler_ca.gwf")
+
+ /**
+ * The datacenter environments to test.
+ */
+ private val environment: String by anyOf("environments/base.json")
+
+ @OptIn(ExperimentalCoroutinesApi::class)
+ override fun doRun(repeat: Int) {
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+ val tracer = EventTracer(clock)
+ val recording = tracer.openRecording().run {
+ enable<WorkflowEvent.JobSubmitted>()
+ enable<WorkflowEvent.JobStarted>()
+ enable<WorkflowEvent.JobFinished>()
+ enable<WorkflowEvent.TaskStarted>()
+ enable<WorkflowEvent.TaskFinished>()
+ this
+ }
+
+ testScope.launch {
+ launch { println("MAKESPAN: ${recording.workflowRuntime()}") }
+ launch { println("WAIT: ${recording.workflowWaitingTime()}") }
+ recording.start()
+ }
+
+ testScope.launch {
+ val environment = Sc18EnvironmentReader(FileInputStream(File(environment)))
+ .use { it.construct(testScope, clock) }
+
+ val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val provisioner = SimVirtProvisioningService(
+ testScope,
+ clock,
+ bareMetal,
+ NumberOfActiveServersAllocationPolicy(),
+ tracer,
+ SimSpaceSharedHypervisorProvider(),
+ schedulingQuantum = 1000
+ )
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ val scheduler = StageWorkflowService(
+ testScope,
+ clock,
+ tracer,
+ provisioner,
+ mode = WorkflowSchedulerMode.Batch(100),
+ jobAdmissionPolicy = NullJobAdmissionPolicy,
+ jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
+ taskEligibilityPolicy = NullTaskEligibilityPolicy,
+ taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
+ )
+
+ val reader = GwfTraceReader(File(trace))
+
+ while (reader.hasNext()) {
+ val (time, job) = reader.next()
+ delay(max(0, time * 1000 - clock.millis()))
+ scheduler.submit(job)
+ }
+ }
+
+ testScope.advanceUntilIdle()
+ recording.close()
+
+ // Check whether everything went okay
+ testScope.uncaughtExceptions.forEach { it.printStackTrace() }
+ assert(testScope.uncaughtExceptions.isEmpty()) { "Errors occurred during execution of the experiment" }
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
new file mode 100644
index 00000000..dbd04b87
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.sc18
+
+import org.opendc.trace.core.EventStream
+import org.opendc.trace.core.onEvent
+import org.opendc.workflows.service.WorkflowEvent
+import java.util.*
+import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
+
+/**
+ * This function collects the makespan of workflows that appear in the event stream.
+ */
+public suspend fun EventStream.workflowRuntime(): Map<UUID, Long> = suspendCoroutine { cont ->
+ val starts = mutableMapOf<UUID, Long>()
+ val results = mutableMapOf<UUID, Long>()
+
+ onEvent<WorkflowEvent.JobStarted> {
+ starts[it.job.uid] = it.timestamp
+ }
+ onEvent<WorkflowEvent.JobFinished> {
+ val start = starts.remove(it.job.uid) ?: return@onEvent
+ results[it.job.uid] = it.timestamp - start
+ }
+ onClose { cont.resume(results) }
+}
+
+/**
+ * This function collects the waiting time of workflows that appear in the event stream, which the duration between the
+ * workflow submission and the start of the first task.
+ */
+public suspend fun EventStream.workflowWaitingTime(): Map<UUID, Long> = suspendCoroutine { cont ->
+ val starts = mutableMapOf<UUID, Long>()
+ val results = mutableMapOf<UUID, Long>()
+
+ onEvent<WorkflowEvent.JobStarted> {
+ starts[it.job.uid] = it.timestamp
+ }
+ onEvent<WorkflowEvent.TaskStarted> {
+ results.computeIfAbsent(it.job.uid) { _ ->
+ val start = starts.remove(it.job.uid)!!
+ it.timestamp - start
+ }
+ }
+ onClose { cont.resume(results) }
+}
+
+/**
+ * This function collects the response time of tasks that appear in the event stream.
+ */
+public suspend fun EventStream.taskResponse(): Map<UUID, Long> = suspendCoroutine { cont ->
+ val starts = mutableMapOf<UUID, Long>()
+ val results = mutableMapOf<UUID, Long>()
+
+ onEvent<WorkflowEvent.JobSubmitted> {
+ for (task in it.job.tasks) {
+ starts[task.uid] = it.timestamp
+ }
+ }
+ onEvent<WorkflowEvent.TaskFinished> {
+ val start = starts.remove(it.job.uid) ?: return@onEvent
+ results[it.task.uid] = it.timestamp - start
+ }
+ onClose { cont.resume(results) }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/resources/env/setup-test.json b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/resources/env/setup-test.json
deleted file mode 100644
index 0965b250..00000000
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/resources/env/setup-test.json
+++ /dev/null
@@ -1,36 +0,0 @@
-{
- "name": "Experimental Setup 2",
- "rooms": [
- {
- "type": "SERVER",
- "objects": [
- {
- "type": "RACK",
- "machines": [
- { "cpus": [2] }, { "cpus": [2]},
- { "cpus": [2] }, { "cpus": [2]},
- { "cpus": [2] }, { "cpus": [2]},
- { "cpus": [2] }, { "cpus": [2]},
- { "cpus": [2] }, { "cpus": [2]},
- { "cpus": [2] }, { "cpus": [2]},
- { "cpus": [2] }, { "cpus": [2]},
- { "cpus": [2] }, { "cpus": [2]}
- ]
- },
- {
- "type": "RACK",
- "machines": [
- { "cpus": [1] }, { "cpus": [1]},
- { "cpus": [1] }, { "cpus": [1]},
- { "cpus": [1] }, { "cpus": [1]},
- { "cpus": [1] }, { "cpus": [1]},
- { "cpus": [1] }, { "cpus": [1]},
- { "cpus": [1] }, { "cpus": [1]},
- { "cpus": [1] }, { "cpus": [1]},
- { "cpus": [1] }, { "cpus": [1]}
- ]
- }
- ]
- }
- ]
-}
diff --git a/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt b/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt
index db2cd191..65a0604d 100644
--- a/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt
+++ b/simulator/opendc-harness/src/main/kotlin/org/opendc/harness/engine/ExperimentEngine.kt
@@ -22,13 +22,11 @@
package org.opendc.harness.engine
-import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.supervisorScope
import org.opendc.harness.api.ExperimentDefinition
import org.opendc.harness.api.Trial
import org.opendc.harness.engine.scheduler.ExperimentScheduler
@@ -53,6 +51,7 @@ public class ExperimentEngine(
*
* @param root The experiment to execute.
*/
+ @OptIn(InternalCoroutinesApi::class)
public suspend fun execute(root: ExperimentDefinition): Unit = supervisorScope {
listener.experimentStarted(root)
@@ -75,25 +74,13 @@ public class ExperimentEngine(
listener.trialFinished(trial, null)
} catch (e: Throwable) {
listener.trialFinished(trial, e)
- throw e
}
}
}
launch {
- var error: Throwable? = null
- for (job in jobs) {
- try {
- job.join()
- } catch (e: CancellationException) {
- // Propagate cancellation
- throw e
- } catch (e: Throwable) {
- error = e
- }
- }
-
- listener.scenarioFinished(scenario, error)
+ jobs.joinAll()
+ listener.scenarioFinished(scenario, null)
}
}
listener.experimentFinished(root, null)
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
index 1887ad7a..fac99664 100644
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
+++ b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
@@ -124,7 +124,7 @@ internal abstract class AbstractEventStream : EventStream {
}
override fun close() {
- if (state != StreamState.Closed) {
+ if (state == StreamState.Closed) {
return
}