diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-30 20:33:16 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-03 17:35:57 +0200 |
| commit | 115e37984624a409bc1ad4f54bf10c9537183390 (patch) | |
| tree | 51d1d9d5079317a70d550cd5e6c9aab3e9ae30c5 /opendc-compute | |
| parent | 70e69db59c821568b5469c43b38b4d0a46b84e92 (diff) | |
feat(exp/compute): Add provisioners for compute service
This change adds a new module `opendc-experiments-compute` that provides
provisioner implementations for experiments to use for setting up the
compute service of OpenDC and provisioning (simulated) hosts.
Diffstat (limited to 'opendc-compute')
2 files changed, 118 insertions, 79 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index f6744123..e86456fe 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -22,23 +22,17 @@ package org.opendc.compute.workload -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.flow.FlowEngine import java.time.Clock import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.math.max /** * Helper class to simulate VM-based workloads in OpenDC. @@ -89,72 +83,7 @@ public class ComputeServiceHelper( failureModel: FailureModel? = null, interference: Boolean = false, ) { - val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) - val client = service.newClient() - val clock = clock - - // Create new image for the virtual machine - val image = client.newImage("vm-image") - - try { - coroutineScope { - // Start the fault injector - injector?.start() - - var offset = Long.MIN_VALUE - - for (entry in trace.sortedBy { it.startTime }) { - val now = clock.millis() - val start = entry.startTime.toEpochMilli() - - if (offset < 0) { - offset = start - now - } - - // Make sure the trace entries are ordered by submission time - assert(start - offset >= 0) { "Invalid trace order" } - - if (!submitImmediately) { - delay(max(0, (start - offset) - now)) - } - - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload(entry.trace, workloadOffset) - val meta = mutableMapOf<String, Any>("workload" to workload) - - val interferenceProfile = entry.interferenceProfile - if (interference && interferenceProfile != null) { - meta["interference-profile"] = interferenceProfile - } - - launch { - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.cpuCount, - entry.memCapacity, - meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() - ), - meta = meta - ) - - // Wait for the server reach its end time - val endTime = entry.stopTime.toEpochMilli() - delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) - - // Stop the server after reaching the end-time of the virtual machine - server.stop() - } - } - } - - yield() - } finally { - injector?.close() - client.close() - } + service.replay(clock, trace, random.nextLong(), submitImmediately, failureModel, interference) } /** @@ -194,11 +123,4 @@ public class ComputeServiceHelper( hosts.clear() } - - /** - * Construct a [ComputeService] instance. - */ - private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService { - return ComputeService(context, clock, scheduler, schedulingQuantum) - } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt new file mode 100644 index 00000000..dc8713dc --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.compute.workload + +import kotlinx.coroutines.* +import org.opendc.compute.service.ComputeService +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.time.Clock +import java.util.* +import kotlin.coroutines.coroutineContext +import kotlin.math.max + +/** + * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished. + * + * @param clock The simulation clock. + * @param trace The trace to simulate. + * @param seed The seed to use for randomness. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param failureModel A failure model to use for injecting failures. + * @param interference A flag to indicate that VM interference needs to be enabled. + */ +public suspend fun ComputeService.replay( + clock: Clock, + trace: List<VirtualMachine>, + seed: Long, + submitImmediately: Boolean = false, + failureModel: FailureModel? = null, + interference: Boolean = false +) { + val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed)) + val client = newClient() + + // Create new image for the virtual machine + val image = client.newImage("vm-image") + + try { + coroutineScope { + // Start the fault injector + injector?.start() + + var offset = Long.MIN_VALUE + + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() + + if (offset < 0) { + offset = start - now + } + + // Make sure the trace entries are ordered by submission time + assert(start - offset >= 0) { "Invalid trace order" } + + if (!submitImmediately) { + delay(max(0, (start - offset) - now)) + } + + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload(entry.trace, workloadOffset) + val meta = mutableMapOf<String, Any>("workload" to workload) + + val interferenceProfile = entry.interferenceProfile + if (interference && interferenceProfile != null) { + meta["interference-profile"] = interferenceProfile + } + + launch { + val server = client.newServer( + entry.name, + image, + client.newFlavor( + entry.name, + entry.cpuCount, + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() + ), + meta = meta + ) + + // Wait for the server reach its end time + val endTime = entry.stopTime.toEpochMilli() + delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) + + // Stop the server after reaching the end-time of the virtual machine + server.stop() + } + } + } + + yield() + } finally { + injector?.close() + client.close() + } +} |
