From a17e7eda3cfd70b033e2fb6097880fba7f5af864 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 12 Jul 2024 14:08:53 +0200 Subject: Merged experiment-scenario and experiment-base by moving ScenarioCli.kt to experiment-base. Renamed the distribution of experiment-base to OpenDCScenarioRunner (#236) --- .../opendc-experiments-base/build.gradle.kts | 36 +++++ .../opendc/experiments/base/runner/ScenarioCli.kt | 65 ++++++++ .../experiments/base/runner/ScenarioHelpers.kt | 168 --------------------- .../experiments/base/runner/ScenarioReplayer.kt | 168 +++++++++++++++++++++ 4 files changed, 269 insertions(+), 168 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioCli.kt delete mode 100644 opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioHelpers.kt create mode 100644 opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt (limited to 'opendc-experiments/opendc-experiments-base') diff --git a/opendc-experiments/opendc-experiments-base/build.gradle.kts b/opendc-experiments/opendc-experiments-base/build.gradle.kts index 30510785..303517aa 100644 --- a/opendc-experiments/opendc-experiments-base/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-base/build.gradle.kts @@ -27,6 +27,7 @@ plugins { `kotlin-library-conventions` `testing-conventions` `jacoco-conventions` + distribution kotlin("plugin.serialization") version "1.9.22" } @@ -35,6 +36,8 @@ dependencies { api(projects.opendcCompute.opendcComputeService) api(projects.opendcCompute.opendcComputeSimulator) + implementation(libs.clikt) + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0") implementation(libs.progressbar) implementation(project(mapOf("path" to ":opendc-simulator:opendc-simulator-core"))) @@ -48,3 +51,36 @@ dependencies { runtimeOnly(libs.log4j.core) runtimeOnly(libs.log4j.slf4j) } + +val createScenarioApp by tasks.creating(CreateStartScripts::class) { + dependsOn(tasks.jar) + + applicationName = "OpenDCScenarioRunner" + mainClass.set("org.opendc.experiments.base.ScenarioCLI") + classpath = tasks.jar.get().outputs.files + configurations["runtimeClasspath"] + outputDir = project.buildDir.resolve("scripts") +} + +// Create custom Scenario distribution +distributions { + main { + distributionBaseName.set("OpenDCScenarioRunner") + + contents { + from("README.md") + from("LICENSE.txt") + from("../../LICENSE.txt") { + rename { "LICENSE-OpenDC.txt" } + } + + into("bin") { + from(createScenarioApp) + } + + into("lib") { + from(tasks.jar) + from(configurations["runtimeClasspath"]) + } + } + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioCli.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioCli.kt new file mode 100644 index 00000000..abf9ad46 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioCli.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ScenarioCli") + +package org.opendc.experiments.base.runner + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import org.opendc.experiments.base.scenario.getScenarios +import java.io.File + +/** + * Main entrypoint of the application. + */ +public fun main(args: Array): Unit = ScenarioCommand().main(args) + +/** + * Represents the command for the Scenario experiments. + */ +internal class ScenarioCommand : CliktCommand(name = "scenario") { + /** + * The path to the environment directory. + */ + private val scenarioPath by option("--scenario-path", help = "path to scenario file") + .file(canBeDir = false, canBeFile = true) + .defaultLazy { File("resources/scenario.json") } + + /** + * The number of threads to use for parallelism. + */ + private val parallelism by option("-p", "--parallelism", help = "number of worker threads") + .int() + .default(Runtime.getRuntime().availableProcessors() - 1) + + override fun run() { + val scenarios = getScenarios(scenarioPath) + runScenarios(scenarios, parallelism) + + // TODO: implement outputResults(scenario) // this will take the results, from a folder, and output them visually + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioHelpers.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioHelpers.kt deleted file mode 100644 index 970754b0..00000000 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioHelpers.kt +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("ScenarioHelpers") - -package org.opendc.experiments.base.runner - -import CheckpointModelSpec -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.yield -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher -import org.opendc.compute.failure.models.FailureModel -import org.opendc.compute.service.ComputeService -import org.opendc.compute.workload.VirtualMachine -import org.opendc.experiments.base.scenario.specs.FailureModelSpec -import org.opendc.experiments.base.scenario.specs.createFailureModel -import java.time.InstantSource -import java.util.Random -import kotlin.coroutines.coroutineContext -import kotlin.math.max - -/** - * A watcher that is locked and waits for a change in the server state to unlock - * @param unlockStates determine which [ServerState] triggers an unlock. - * Default values are TERMINATED, ERROR, and DELETED. - */ -public class RunningServerWatcher : ServerWatcher { - // TODO: make this changeable - private val unlockStates: List = listOf(ServerState.DELETED, ServerState.TERMINATED) - - private val mutex: Mutex = Mutex() - - public suspend fun lock() { - mutex.lock() - } - - public suspend fun wait() { - this.lock() - } - - override fun onStateChanged( - server: Server, - newState: ServerState, - ) { - if (unlockStates.contains(newState)) { - mutex.unlock() - } - } -} - -/** - * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. - * - * @param clock The simulation clock. - * @param trace The trace to simulate. - * @param seed The seed to use for randomness. - * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). - * @param failureModelSpec A failure model to use for injecting failures. - */ -public suspend fun ComputeService.replay( - clock: InstantSource, - trace: List, - failureModelSpec: FailureModelSpec? = null, - checkpointModelSpec: CheckpointModelSpec? = null, - seed: Long = 0, - submitImmediately: Boolean = false, -) { - val client = newClient() - - // Create a failure model based on the failureModelSpec, if not null, otherwise set failureModel to null - val failureModel: FailureModel? = - failureModelSpec?.let { - createFailureModel(coroutineContext, clock, this, Random(seed), it) - } - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - // Start the fault injector - failureModel?.start() - - var simulationOffset = Long.MIN_VALUE - - for (entry in trace.sortedBy { it.startTime }) { - val now = clock.millis() - val start = entry.startTime.toEpochMilli() - - // Set the simulationOffset based on the starting time of the first server - if (simulationOffset == Long.MIN_VALUE) { - simulationOffset = start - now - } - - // Delay the server based on the startTime given by the trace. - if (!submitImmediately) { - delay(max(0, (start - now - simulationOffset))) - } - - val checkpointTime = checkpointModelSpec?.checkpointTime ?: 0L - val checkpointWait = checkpointModelSpec?.checkpointWait ?: 0L - -// val workload = SimRuntimeWorkload( -// entry.duration, -// 1.0, -// checkpointTime, -// checkpointWait -// ) - - val workload = entry.trace.createWorkload(start, checkpointTime, checkpointWait) - val meta = mutableMapOf("workload" to workload) - - launch { - val server = - client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.cpuCount, - entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), - ), - meta = meta, - ) - - val serverWatcher = RunningServerWatcher() - serverWatcher.lock() - server.watch(serverWatcher) - - // Wait until the server is terminated - serverWatcher.wait() - - // Stop the server after reaching the end-time of the virtual machine - server.delete() - } - } - } - yield() - } finally { - failureModel?.close() - client.close() - } -} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt new file mode 100644 index 00000000..970754b0 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ScenarioHelpers") + +package org.opendc.experiments.base.runner + +import CheckpointModelSpec +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.yield +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.failure.models.FailureModel +import org.opendc.compute.service.ComputeService +import org.opendc.compute.workload.VirtualMachine +import org.opendc.experiments.base.scenario.specs.FailureModelSpec +import org.opendc.experiments.base.scenario.specs.createFailureModel +import java.time.InstantSource +import java.util.Random +import kotlin.coroutines.coroutineContext +import kotlin.math.max + +/** + * A watcher that is locked and waits for a change in the server state to unlock + * @param unlockStates determine which [ServerState] triggers an unlock. + * Default values are TERMINATED, ERROR, and DELETED. + */ +public class RunningServerWatcher : ServerWatcher { + // TODO: make this changeable + private val unlockStates: List = listOf(ServerState.DELETED, ServerState.TERMINATED) + + private val mutex: Mutex = Mutex() + + public suspend fun lock() { + mutex.lock() + } + + public suspend fun wait() { + this.lock() + } + + override fun onStateChanged( + server: Server, + newState: ServerState, + ) { + if (unlockStates.contains(newState)) { + mutex.unlock() + } + } +} + +/** + * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. + * + * @param clock The simulation clock. + * @param trace The trace to simulate. + * @param seed The seed to use for randomness. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param failureModelSpec A failure model to use for injecting failures. + */ +public suspend fun ComputeService.replay( + clock: InstantSource, + trace: List, + failureModelSpec: FailureModelSpec? = null, + checkpointModelSpec: CheckpointModelSpec? = null, + seed: Long = 0, + submitImmediately: Boolean = false, +) { + val client = newClient() + + // Create a failure model based on the failureModelSpec, if not null, otherwise set failureModel to null + val failureModel: FailureModel? = + failureModelSpec?.let { + createFailureModel(coroutineContext, clock, this, Random(seed), it) + } + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + failureModel?.start() + + var simulationOffset = Long.MIN_VALUE + + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() + + // Set the simulationOffset based on the starting time of the first server + if (simulationOffset == Long.MIN_VALUE) { + simulationOffset = start - now + } + + // Delay the server based on the startTime given by the trace. + if (!submitImmediately) { + delay(max(0, (start - now - simulationOffset))) + } + + val checkpointTime = checkpointModelSpec?.checkpointTime ?: 0L + val checkpointWait = checkpointModelSpec?.checkpointWait ?: 0L + +// val workload = SimRuntimeWorkload( +// entry.duration, +// 1.0, +// checkpointTime, +// checkpointWait +// ) + + val workload = entry.trace.createWorkload(start, checkpointTime, checkpointWait) + val meta = mutableMapOf("workload" to workload) + + launch { + val server = + client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.cpuCount, + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap(), + ), + meta = meta, + ) + + val serverWatcher = RunningServerWatcher() + serverWatcher.lock() + server.watch(serverWatcher) + + // Wait until the server is terminated + serverWatcher.wait() + + // Stop the server after reaching the end-time of the virtual machine + server.delete() + } + } + } + yield() + } finally { + failureModel?.close() + client.close() + } +} -- cgit v1.2.3