summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-30 20:33:16 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-03 17:35:57 +0200
commit115e37984624a409bc1ad4f54bf10c9537183390 (patch)
tree51d1d9d5079317a70d550cd5e6c9aab3e9ae30c5 /opendc-compute/opendc-compute-workload/src/main
parent70e69db59c821568b5469c43b38b4d0a46b84e92 (diff)
feat(exp/compute): Add provisioners for compute service
This change adds a new module `opendc-experiments-compute` that provides provisioner implementations for experiments to use for setting up the compute service of OpenDC and provisioning (simulated) hosts.
Diffstat (limited to 'opendc-compute/opendc-compute-workload/src/main')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt80
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt117
2 files changed, 118 insertions, 79 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index f6744123..e86456fe 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -22,23 +22,17 @@
package org.opendc.compute.workload
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.kernel.SimHypervisor
-import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.flow.FlowEngine
import java.time.Clock
import java.time.Duration
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.math.max
/**
* Helper class to simulate VM-based workloads in OpenDC.
@@ -89,72 +83,7 @@ public class ComputeServiceHelper(
failureModel: FailureModel? = null,
interference: Boolean = false,
) {
- val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong()))
- val client = service.newClient()
- val clock = clock
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- // Start the fault injector
- injector?.start()
-
- var offset = Long.MIN_VALUE
-
- for (entry in trace.sortedBy { it.startTime }) {
- val now = clock.millis()
- val start = entry.startTime.toEpochMilli()
-
- if (offset < 0) {
- offset = start - now
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(start - offset >= 0) { "Invalid trace order" }
-
- if (!submitImmediately) {
- delay(max(0, (start - offset) - now))
- }
-
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload(entry.trace, workloadOffset)
- val meta = mutableMapOf<String, Any>("workload" to workload)
-
- val interferenceProfile = entry.interferenceProfile
- if (interference && interferenceProfile != null) {
- meta["interference-profile"] = interferenceProfile
- }
-
- launch {
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.cpuCount,
- entry.memCapacity,
- meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
- ),
- meta = meta
- )
-
- // Wait for the server reach its end time
- val endTime = entry.stopTime.toEpochMilli()
- delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
-
- // Stop the server after reaching the end-time of the virtual machine
- server.stop()
- }
- }
- }
-
- yield()
- } finally {
- injector?.close()
- client.close()
- }
+ service.replay(clock, trace, random.nextLong(), submitImmediately, failureModel, interference)
}
/**
@@ -194,11 +123,4 @@ public class ComputeServiceHelper(
hosts.clear()
}
-
- /**
- * Construct a [ComputeService] instance.
- */
- private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService {
- return ComputeService(context, clock, scheduler, schedulingQuantum)
- }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt
new file mode 100644
index 00000000..dc8713dc
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TraceHelpers")
+package org.opendc.compute.workload
+
+import kotlinx.coroutines.*
+import org.opendc.compute.service.ComputeService
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.coroutineContext
+import kotlin.math.max
+
+/**
+ * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished.
+ *
+ * @param clock The simulation clock.
+ * @param trace The trace to simulate.
+ * @param seed The seed to use for randomness.
+ * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
+ * @param failureModel A failure model to use for injecting failures.
+ * @param interference A flag to indicate that VM interference needs to be enabled.
+ */
+public suspend fun ComputeService.replay(
+ clock: Clock,
+ trace: List<VirtualMachine>,
+ seed: Long,
+ submitImmediately: Boolean = false,
+ failureModel: FailureModel? = null,
+ interference: Boolean = false
+) {
+ val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed))
+ val client = newClient()
+
+ // Create new image for the virtual machine
+ val image = client.newImage("vm-image")
+
+ try {
+ coroutineScope {
+ // Start the fault injector
+ injector?.start()
+
+ var offset = Long.MIN_VALUE
+
+ for (entry in trace.sortedBy { it.startTime }) {
+ val now = clock.millis()
+ val start = entry.startTime.toEpochMilli()
+
+ if (offset < 0) {
+ offset = start - now
+ }
+
+ // Make sure the trace entries are ordered by submission time
+ assert(start - offset >= 0) { "Invalid trace order" }
+
+ if (!submitImmediately) {
+ delay(max(0, (start - offset) - now))
+ }
+
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
+ val meta = mutableMapOf<String, Any>("workload" to workload)
+
+ val interferenceProfile = entry.interferenceProfile
+ if (interference && interferenceProfile != null) {
+ meta["interference-profile"] = interferenceProfile
+ }
+
+ launch {
+ val server = client.newServer(
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.cpuCount,
+ entry.memCapacity,
+ meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
+ ),
+ meta = meta
+ )
+
+ // Wait for the server reach its end time
+ val endTime = entry.stopTime.toEpochMilli()
+ delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
+
+ // Stop the server after reaching the end-time of the virtual machine
+ server.stop()
+ }
+ }
+ }
+
+ yield()
+ } finally {
+ injector?.close()
+ client.close()
+ }
+}