summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-16 16:52:00 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:51 +0200
commitb14df2a0924774c5aed15cedeb1027abf8ee5361 (patch)
treed93171ad63477b97e160d3a10e3216c3de1b5ff5
parent29e52ae17bd567070b52e0bcd3c254ee7491189a (diff)
refactor(capelin): Make workload sampling model extensible
This change updates the workload sampling implementation to be more flexible in the way the workload is constructed. Users can now sample multiple workloads at the same time using multiple samplers and use them as a single workload to simulate.
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt)17
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt)81
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt34
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt62
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt11
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt (renamed from opendc-experiments/opendc-experiments-radice/build.gradle.kts)48
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt66
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt143
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt61
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt)27
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt261
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt26
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt23
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt68
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt199
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt84
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet)bin2081 -> 2081 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet (renamed from opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet)bin1647189 -> 1647189 bytes
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt27
28 files changed, 558 insertions, 767 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
index 87903623..fcd9dd7e 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt
@@ -24,8 +24,8 @@ package org.opendc.compute.simulator.failure
import org.apache.commons.math3.distribution.RealDistribution
import org.opendc.compute.simulator.SimHost
+import java.util.*
import kotlin.math.roundToInt
-import kotlin.random.Random
/**
* A [VictimSelector] that stochastically selects a set of hosts to be failed.
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index b0795d61..78002c2f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -20,13 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.trace
+package org.opendc.compute.workload
+
+import java.util.*
/**
- * An interface for reading workloads into memory.
- *
- * This interface must guarantee that the entries are delivered in order of submission time.
- *
- * @param T The shape of the workloads supported by this reader.
+ * An interface that describes how a workload is resolved.
*/
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
+public interface ComputeWorkload {
+ /**
+ * Resolve the workload into a list of [VirtualMachine]s to simulate.
+ */
+ public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index ae20482d..46176609 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -20,30 +20,42 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.trace
+package org.opendc.compute.workload
+import mu.KotlinLogging
import org.opendc.compute.workload.trace.bp.BPTraceFormat
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.trace.*
import java.io.File
-import java.util.UUID
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.math.roundToLong
/**
- * A [TraceReader] for the internal VM workload trace format.
+ * A helper class for loading compute workload traces into memory.
*
- * @param path The directory of the traces.
+ * @param baseDir The directory containing the traces.
*/
-public class RawParquetTraceReader(private val path: File) {
+public class ComputeWorkloadLoader(private val baseDir: File) {
/**
- * The [Trace] that represents this trace.
+ * The logger for this instance.
*/
- private val trace = BPTraceFormat().open(path.toURI().toURL())
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The [BPTraceFormat] instance to load the traces
+ */
+ private val format = BPTraceFormat()
+
+ /**
+ * The cache of workloads.
+ */
+ private val cache = ConcurrentHashMap<String, List<VirtualMachine>>()
/**
* Read the fragments into memory.
*/
- private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> {
+ private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
@@ -75,11 +87,11 @@ public class RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
var counter = 0
- val entries = mutableListOf<TraceEntry<SimWorkload>>()
+ val entries = mutableListOf<VirtualMachine>()
return try {
while (reader.nextRow()) {
@@ -96,23 +108,25 @@ public class RawParquetTraceReader(private val path: File) {
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
- val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val workload = SimTraceWorkload(vmFragments)
+ val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs
+
entries.add(
- TraceEntry(
- uid, id, submissionTime.toEpochMilli(), workload,
- mapOf(
- "submit-time" to submissionTime.toEpochMilli(),
- "end-time" to endTime.toEpochMilli(),
- "total-load" to totalLoad,
- "cores" to maxCores,
- "required-memory" to requiredMemory.toLong(),
- "workload" to workload
- )
+ VirtualMachine(
+ uid,
+ id,
+ maxCores,
+ requiredMemory.roundToLong(),
+ totalLoad,
+ submissionTime,
+ endTime,
+ vmFragments
)
)
}
+ // Make sure the virtual machines are ordered by start time
+ entries.sortBy { it.startTime }
+
entries
} catch (e: Exception) {
e.printStackTrace()
@@ -123,17 +137,24 @@ public class RawParquetTraceReader(private val path: File) {
}
/**
- * The entries in the trace.
+ * Load the trace with the specified [name].
*/
- private val entries: List<TraceEntry<SimWorkload>>
+ public fun get(name: String): List<VirtualMachine> {
+ return cache.computeIfAbsent(name) {
+ val path = baseDir.resolve(it)
+
+ logger.info { "Loading trace $it at $path" }
- init {
- val fragments = parseFragments()
- entries = parseMeta(fragments)
+ val trace = format.open(path.toURI().toURL())
+ val fragments = parseFragments(trace)
+ parseMeta(trace, fragments)
+ }
}
/**
- * Read the entries in the trace.
+ * Clear the workload cache.
*/
- public fun read(): List<TraceEntry<SimWorkload>> = entries
+ public fun reset() {
+ cache.clear()
+ }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index 6da0f49a..ed45bd8a 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -34,14 +34,13 @@ import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.compute.workload.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.compute.*
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.max
@@ -90,10 +89,11 @@ public class ComputeWorkloadRunner(
}
/**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader].
+ * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*/
- public suspend fun run(reader: TraceReader<SimWorkload>) {
- val injector = failureModel?.createInjector(context, clock, service)
+ public suspend fun run(trace: List<VirtualMachine>, seed: Long) {
+ val random = Random(seed)
+ val injector = failureModel?.createInjector(context, clock, service, random)
val client = service.newClient()
// Create new image for the virtual machine
@@ -106,35 +106,36 @@ public class ComputeWorkloadRunner(
var offset = Long.MIN_VALUE
- while (reader.hasNext()) {
- val entry = reader.next()
+ for (entry in trace.sortedBy { it.startTime }) {
+ val now = clock.millis()
+ val start = entry.startTime.toEpochMilli()
if (offset < 0) {
- offset = entry.start - clock.millis()
+ offset = start - now
}
// Make sure the trace entries are ordered by submission time
- assert(entry.start - offset >= 0) { "Invalid trace order" }
- delay(max(0, (entry.start - offset) - clock.millis()))
+ assert(start - offset >= 0) { "Invalid trace order" }
+ delay(max(0, (start - offset) - now))
launch {
val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+ val workload = SimTraceWorkload(entry.trace, workloadOffset)
val server = client.newServer(
entry.name,
image,
client.newFlavor(
entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
+ entry.cpuCount,
+ entry.memCapacity
),
- meta = entry.meta + mapOf("workload" to workload)
+ meta = mapOf("workload" to workload)
)
// Wait for the server reach its end time
- val endTime = entry.meta["end-time"] as Long
- delay(endTime + workloadOffset - clock.millis() + 1)
+ val endTime = entry.stopTime.toEpochMilli()
+ delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
// Delete the server after reaching the end-time of the virtual machine
server.delete()
@@ -145,7 +146,6 @@ public class ComputeWorkloadRunner(
yield()
} finally {
injector?.close()
- reader.close()
client.close()
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
new file mode 100644
index 00000000..f58ce587
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("ComputeWorkloads")
+package org.opendc.compute.workload
+
+import org.opendc.compute.workload.internal.CompositeComputeWorkload
+import org.opendc.compute.workload.internal.HpcSampledComputeWorkload
+import org.opendc.compute.workload.internal.LoadSampledComputeWorkload
+import org.opendc.compute.workload.internal.TraceComputeWorkload
+
+/**
+ * Construct a workload from a trace.
+ */
+public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name)
+
+/**
+ * Construct a composite workload with the specified fractions.
+ */
+public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload {
+ return CompositeComputeWorkload(pairs.toMap())
+}
+
+/**
+ * Sample a workload by a [fraction] of the total load.
+ */
+public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload {
+ return LoadSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC VMs (count)
+ */
+public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction)
+}
+
+/**
+ * Sample a workload by a [fraction] of the HPC load
+ */
+public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload {
+ return HpcSampledComputeWorkload(this, fraction, sampleLoad = true)
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
index 43dd8321..4d9ef15d 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
@@ -25,6 +25,7 @@ package org.opendc.compute.workload
import org.opendc.compute.service.ComputeService
import org.opendc.compute.simulator.failure.HostFaultInjector
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -34,5 +35,5 @@ public interface FailureModel {
/**
* Construct a [HostFaultInjector] for the specified [service].
*/
- public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector
+ public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
index 55c61be1..be7120b9 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
@@ -32,9 +32,9 @@ import org.opendc.compute.simulator.failure.StartStopHostFault
import org.opendc.compute.simulator.failure.StochasticVictimSelector
import java.time.Clock
import java.time.Duration
+import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
-import kotlin.random.Random
/**
* Obtain a [FailureModel] based on the GRID'5000 failure trace.
@@ -42,14 +42,15 @@ import kotlin.random.Random
* This fault injector uses parameters from the GRID'5000 failure trace as described in
* "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
*/
-public fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
+public fun grid5000(failureInterval: Duration): FailureModel {
return object : FailureModel {
override fun createInjector(
context: CoroutineContext,
clock: Clock,
- service: ComputeService
+ service: ComputeService,
+ random: Random
): HostFaultInjector {
- val rng = Well19937c(seed)
+ val rng = Well19937c(random.nextLong())
val hosts = service.hosts.map { it as SimHost }.toSet()
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
@@ -59,7 +60,7 @@ public fun grid5000(failureInterval: Duration, seed: Int): FailureModel {
clock,
hosts,
iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random),
fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
)
}
diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
index 0c716183..40484b68 100644
--- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -20,28 +20,30 @@
* SOFTWARE.
*/
-description = "Experiments for the Risk Analysis work"
+package org.opendc.compute.workload
-/* Build configuration */
-plugins {
- `experiment-conventions`
- `testing-conventions`
-}
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import java.time.Instant
+import java.util.*
-dependencies {
- api(platform(projects.opendcPlatform))
- api(projects.opendcHarness.opendcHarnessApi)
- implementation(projects.opendcFormat)
- implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcCompute.opendcComputeSimulator)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
-
- implementation(libs.kotlin.logging)
- implementation(libs.config)
- implementation(libs.progressbar)
- implementation(libs.clikt)
-
- implementation(libs.parquet)
- testImplementation(libs.log4j.slf4j)
-}
+/**
+ * A virtual machine workload.
+ *
+ * @param uid The unique identifier of the virtual machine.
+ * @param name The name of the virtual machine.
+ * @param cpuCount The number of vCPUs in the VM.
+ * @param memCapacity The provisioned memory for the VM.
+ * @param startTime The start time of the VM.
+ * @param stopTime The stop time of the VM.
+ * @param trace The trace fragments that belong to this VM.
+ */
+public data class VirtualMachine(
+ val uid: UUID,
+ val name: String,
+ val cpuCount: Int,
+ val memCapacity: Long,
+ val totalLoad: Double,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val trace: Sequence<SimTraceWorkload.Fragment>,
+)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
new file mode 100644
index 00000000..9b2bec55
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
+ */
+internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
+
+ val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
+
+ val res = mutableListOf<VirtualMachine>()
+
+ for ((fraction, vms) in traces) {
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+ }
+
+ val vmCount = traces.sumOf { (_, vms) -> vms.size }
+ logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
new file mode 100644
index 00000000..52f4c672
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that samples HPC VMs in the workload.
+ *
+ * @param fraction The fraction of load/virtual machines to sample
+ * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs.
+ */
+internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The pattern to match compute nodes in the workload.
+ */
+ private val pattern = Regex("^(ComputeNode|cn).*")
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+
+ val (hpc, nonHpc) = vms.partition { entry ->
+ val name = entry.name
+ name.matches(pattern)
+ }
+
+ val hpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ hpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ val nonHpcSequence = generateSequence(0) { it + 1 }
+ .map { index ->
+ val res = mutableListOf<VirtualMachine>()
+ nonHpc.mapTo(res) { sample(it, index) }
+ res.shuffle(random)
+ res
+ }
+ .flatten()
+
+ logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+
+ logger.debug { "Total trace load: $totalLoad" }
+ var hpcCount = 0
+ var hpcLoad = 0.0
+ var nonHpcCount = 0
+ var nonHpcLoad = 0.0
+
+ val res = mutableListOf<VirtualMachine>()
+
+ if (sampleLoad) {
+ var currentLoad = 0.0
+ for (entry in hpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ hpcLoad += entryLoad
+ hpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ for (entry in nonHpcSequence) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > 1) {
+ break
+ }
+
+ nonHpcLoad += entryLoad
+ nonHpcCount += 1
+ currentLoad += entryLoad
+ res += entry
+ }
+ } else {
+ hpcSequence
+ .take((fraction * vms.size).toInt())
+ .forEach { entry ->
+ hpcLoad += entry.totalLoad
+ hpcCount += 1
+ res.add(entry)
+ }
+
+ nonHpcSequence
+ .take(((1 - fraction) * vms.size).toInt())
+ .forEach { entry ->
+ nonHpcLoad += entry.totalLoad
+ nonHpcCount += 1
+ res.add(entry)
+ }
+ }
+
+ logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
+ logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+
+ /**
+ * Sample a random trace entry.
+ */
+ private fun sample(entry: VirtualMachine, i: Int): VirtualMachine {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
new file mode 100644
index 00000000..ef6de729
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
+
+/**
+ * A [ComputeWorkload] that is sampled based on total load.
+ */
+internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload {
+ /**
+ * The logging instance of this class.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ val vms = source.resolve(loader, random)
+ val res = mutableListOf<VirtualMachine>()
+
+ val totalLoad = vms.sumOf { it.totalLoad }
+ var currentLoad = 0.0
+
+ for (entry in vms) {
+ val entryLoad = entry.totalLoad
+ if ((currentLoad + entryLoad) / totalLoad > fraction) {
+ break
+ }
+
+ currentLoad += entryLoad
+ res += entry
+ }
+
+ logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
+
+ return res
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index bfa2d051..d657ff01 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -20,23 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.compute.workload.trace
+package org.opendc.compute.workload.internal
-import java.util.UUID
+import org.opendc.compute.workload.ComputeWorkload
+import org.opendc.compute.workload.ComputeWorkloadLoader
+import org.opendc.compute.workload.VirtualMachine
+import java.util.*
/**
- * An entry in a workload trace.
- *
- * @param uid The unique identifier of the entry.
- * @param name The name of the entry.
- * @param start The start time of the workload.
- * @param workload The workload of the entry.
- * @param meta The meta-data associated with the workload.
+ * A [ComputeWorkload] from a trace.
*/
-public data class TraceEntry<out T>(
- val uid: UUID,
- val name: String,
- val start: Long,
- val workload: T,
- val meta: Map<String, Any>
-)
+internal class TraceComputeWorkload(val name: String) : ComputeWorkload {
+ override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
+ return loader.get(name)
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt
deleted file mode 100644
index 36cd0a7d..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.trace
-
-import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.filter2.predicate.Statistics
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
-import org.apache.parquet.io.api.Binary
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.trace.util.parquet.LocalInputFile
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly.
- *
- * @param traceFile The directory of the traces.
- * @param selectedVms The list of VMs to read from the trace.
- */
-public class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> {
- private val logger = KotlinLogging.logger {}
-
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>>
-
- /**
- * The intermediate buffer to store the read records in.
- */
- private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
-
- /**
- * An optional filter for filtering the selected VMs
- */
- private val filter =
- if (selectedVms.isEmpty())
- null
- else
- FilterCompat.get(
- FilterApi.userDefined(
- FilterApi.binaryColumn("id"),
- SelectedVmFilter(
- TreeSet(selectedVms)
- )
- )
- )
-
- /**
- * A poisonous fragment.
- */
- private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0))
-
- /**
- * The thread to read the records in.
- */
- private val readerThread = thread(start = true, name = "sc20-reader") {
- val reader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- try {
- while (true) {
- val record = reader.read()
-
- if (record == null) {
- queue.put(poison)
- break
- }
-
- val id = record["id"].toString()
- val time = record["time"] as Long
- val duration = record["duration"] as Long
- val cores = record["cores"] as Int
- val cpuUsage = record["cpuUsage"] as Double
-
- val fragment = SimTraceWorkload.Fragment(
- time,
- duration,
- cpuUsage,
- cores
- )
-
- queue.put(id to fragment)
- }
- } catch (e: InterruptedException) {
- // Do not rethrow this
- } finally {
- reader.close()
- }
- }
-
- /**
- * Fill the buffers with the VMs
- */
- private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
- if (!hasNext) {
- return
- }
-
- val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
- queue.drainTo(fragments)
-
- for ((id, fragment) in fragments) {
- if (id == poison.first) {
- hasNext = false
- return
- }
- buffers[id]?.forEach { it.add(fragment) }
- }
- }
-
- /**
- * A flag to indicate whether the reader has more entries.
- */
- private var hasNext: Boolean = true
-
- /**
- * Initialize the reader.
- */
- init {
- val takenIds = mutableSetOf<UUID>()
- val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
-
- val metaReader = AvroParquetReader
- .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
- .disableCompatibility()
- .withFilter(filter)
- .build()
-
- while (true) {
- val record = metaReader.read() ?: break
- val id = record["id"].toString()
- entries[id] = record
- }
-
- metaReader.close()
-
- val selection = selectedVms.ifEmpty { entries.keys }
-
- // Create the entry iterator
- iterator = selection.asSequence()
- .mapNotNull { entries[it] }
- .mapIndexed { index, record ->
- val id = record["id"].toString()
- val submissionTime = record["submissionTime"] as Long
- val endTime = record["endTime"] as Long
- val maxCores = record["maxCores"] as Int
- val requiredMemory = record["requiredMemory"] as Long
- val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray())
-
- assert(uid !in takenIds)
- takenIds += uid
-
- logger.info { "Processing VM $id" }
-
- val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
- buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence {
- var time = submissionTime
- repeat@ while (true) {
- if (externalBuffer.isEmpty()) {
- if (hasNext) {
- pull(buffers)
- continue
- } else {
- break
- }
- }
-
- internalBuffer.addAll(externalBuffer)
- externalBuffer.clear()
-
- for (fragment in internalBuffer) {
- yield(fragment)
-
- time += fragment.duration
- if (time >= endTime) {
- break@repeat
- }
- }
-
- internalBuffer.clear()
- }
-
- buffers.remove(id)
- }
- val workload = SimTraceWorkload(fragments)
- val meta = mapOf(
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
-
- TraceEntry(uid, id, submissionTime, workload, meta)
- }
- .sortedBy { it.start }
- .toList()
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {
- readerThread.interrupt()
- }
-
- private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable {
- override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8())
-
- override fun canDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty()
- }
-
- override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean {
- val min = statistics.min
- val max = statistics.max
-
- return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
index faabe5cb..31e8f961 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt
@@ -22,7 +22,8 @@
package org.opendc.experiments.capelin
-import org.opendc.experiments.capelin.model.CompositeWorkload
+import org.opendc.compute.workload.composite
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") {
)
override val workload: Workload by anyOf(
- CompositeWorkload(
+ Workload(
"all-azure",
- listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0)
),
- CompositeWorkload(
+ Workload(
"solvinity-25-azure-75",
- listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75)
),
- CompositeWorkload(
+ Workload(
"solvinity-50-azure-50",
- listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5)
),
- CompositeWorkload(
+ Workload(
"solvinity-75-azure-25",
- listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25)
),
- CompositeWorkload(
+ Workload(
"all-solvinity",
- listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)),
- totalSampleLoad
+ composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0)
)
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
index e1cf8517..cd093e6c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
index a995e467..73e59a58 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt
@@ -22,8 +22,10 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByHpc
+import org.opendc.compute.workload.sampleByHpcLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
-import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.harness.dsl.anyOf
@@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC),
- Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD),
- Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD)
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpc(1.0)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
index 49559e0e..9d5717bb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
index 1aac4f9e..7ab586b3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt
@@ -22,6 +22,8 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.sampleByLoad
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena")
)
override val workload: Workload by anyOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.1)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.25)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(0.5)),
+ Workload("solvinity", trace("solvinity").sampleByLoad(1.0))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 02811d83..630b76c4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -24,18 +24,16 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import mu.KotlinLogging
+import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.ComputeWorkloadRunner
import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
@@ -47,7 +45,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
import java.util.*
-import java.util.concurrent.ConcurrentHashMap
import kotlin.math.roundToLong
/**
@@ -92,9 +89,9 @@ abstract class Portfolio(name: String) : Experiment(name) {
abstract val allocationPolicy: String
/**
- * A map of trace readers.
+ * A helper class to load workload traces.
*/
- private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>()
+ private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path")))
/**
* Perform a single trial for this portfolio.
@@ -102,19 +99,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val seeder = Random(repeat.toLong())
- val workload = workload
- val workloadNames = if (workload is CompositeWorkload) {
- workload.workloads.map { it.name }
- } else {
- listOf(workload.name)
- }
- val rawReaders = workloadNames.map { workloadName ->
- traceReaders.computeIfAbsent(workloadName) {
- logger.info { "Loading trace $workloadName" }
- RawParquetTraceReader(File(config.getString("trace-path"), workloadName))
- }
- }
- val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt())
val performanceInterferenceModel = if (operationalPhenomena.hasInterference)
PerformanceInterferenceReader()
.read(File(config.getString("interference-model")))
@@ -125,7 +109,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements)
val failureModel =
if (operationalPhenomena.failureFrequency > 0)
- grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt())
+ grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
val runner = ComputeWorkloadRunner(
@@ -149,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
runner.apply(topology)
// Run the workload trace
- runner.run(trace)
+ runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
runner.close()
metricReader.close()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
index b6d3b30c..17ec48d4 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
index 90840db8..98eb989d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt
@@ -22,6 +22,7 @@
package org.opendc.experiments.capelin
+import org.opendc.compute.workload.trace
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
import org.opendc.experiments.capelin.model.Workload
@@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") {
)
override val workload: Workload by anyOf(
- Workload("solvinity", 1.0)
+ Workload("solvinity", trace("solvinity"))
)
override val operationalPhenomena: OperationalPhenomena by anyOf(
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
index c4ddd158..a2e71243 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt
@@ -22,23 +22,12 @@
package org.opendc.experiments.capelin.model
-public enum class SamplingStrategy {
- REGULAR,
- HPC,
- HPC_LOAD
-}
+import org.opendc.compute.workload.ComputeWorkload
/**
- * A workload that is considered for a scenario.
- */
-public open class Workload(
- public open val name: String,
- public val fraction: Double,
- public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR
-)
-
-/**
- * A workload that is composed of multiple workloads.
+ * A single workload originating from a trace.
+ *
+ * @param name the name of the workload.
+ * @param source The source of the workload data.
*/
-public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) :
- Workload(name, -1.0)
+data class Workload(val name: String, val source: ComputeWorkload)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
deleted file mode 100644
index 498636ba..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import org.opendc.compute.workload.trace.RawParquetTraceReader
-import org.opendc.compute.workload.trace.TraceEntry
-import org.opendc.compute.workload.trace.TraceReader
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-
-/**
- * A [TraceReader] for the internal VM workload trace format.
- *
- * @param rawReaders The internal raw trace readers to use.
- * @param workload The workload to read.
- * @param seed The seed to use for sampling.
- */
-public class ParquetTraceReader(
- rawReaders: List<RawParquetTraceReader>,
- workload: Workload,
- seed: Int
-) : TraceReader<SimWorkload> {
- /**
- * The iterator over the actual trace.
- */
- private val iterator: Iterator<TraceEntry<SimWorkload>> =
- rawReaders
- .map { it.read() }
- .run {
- if (workload is CompositeWorkload) {
- this.zip(workload.workloads)
- } else {
- this.zip(listOf(workload))
- }
- }
- .flatMap {
- sampleWorkload(it.first, workload, it.second, seed)
- .sortedBy(TraceEntry<SimWorkload>::start)
- }
- .iterator()
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<SimWorkload> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
deleted file mode 100644
index b42951df..00000000
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.capelin.trace
-
-import mu.KotlinLogging
-import org.opendc.compute.workload.trace.TraceEntry
-import org.opendc.experiments.capelin.model.CompositeWorkload
-import org.opendc.experiments.capelin.model.SamplingStrategy
-import org.opendc.experiments.capelin.model.Workload
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-import kotlin.random.Random
-
-private val logger = KotlinLogging.logger {}
-
-/**
- * Sample the workload for the specified [run].
- */
-public fun sampleWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- return when {
- workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
- workload.samplingStrategy == SamplingStrategy.HPC ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false)
- workload.samplingStrategy == SamplingStrategy.HPC_LOAD ->
- sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true)
- else ->
- sampleRegularWorkload(trace, workload, workload, seed)
- }
-}
-
-/**
- * Sample a regular (non-HPC) workload.
- */
-public fun sampleRegularWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- subWorkload: Workload,
- seed: Int
-): List<TraceEntry<SimWorkload>> {
- val fraction = subWorkload.fraction
-
- val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- shuffled.sumOf { it.meta.getValue("total-load") as Double }
- }
- var currentLoad = 0.0
-
- for (entry in shuffled) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a HPC workload.
- */
-public fun sampleHpcWorkload(
- trace: List<TraceEntry<SimWorkload>>,
- workload: Workload,
- seed: Int,
- sampleOnLoad: Boolean
-): List<TraceEntry<SimWorkload>> {
- val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
- val random = Random(seed)
-
- val fraction = workload.fraction
- val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<TraceEntry<SimWorkload>>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = if (workload is CompositeWorkload) {
- workload.totalLoad
- } else {
- trace.sumOf { it.meta.getValue("total-load") as Double }
- }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<TraceEntry<SimWorkload>>()
-
- if (sampleOnLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.meta.getValue("total-load") as Double
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * trace.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.meta.getValue("total-load") as Double
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * trace.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.meta.getValue("total-load") as Double
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
-}
-
-/**
- * Sample a random trace entry.
- */
-private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
-}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index c1386bfe..140a84db 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,18 +31,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.workload.ComputeWorkloadRunner
-import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.trace.RawParquetTraceReader
-import org.opendc.compute.workload.trace.TraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
-import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.topology.clusterTopology
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
@@ -68,6 +62,11 @@ class CapelinIntegrationTest {
private lateinit var computeScheduler: FilterScheduler
/**
+ * The [ComputeWorkloadLoader] responsible for loading the traces.
+ */
+ private lateinit var workloadLoader: ComputeWorkloadLoader
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
@@ -77,6 +76,7 @@ class CapelinIntegrationTest {
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
+ workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace"))
}
/**
@@ -84,24 +84,24 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val traceReader = createTestTraceReader()
- val simulator = ComputeWorkloadRunner(
+ val workload = createTestWorkload(1.0)
+ val runner = ComputeWorkloadRunner(
coroutineContext,
clock,
computeScheduler
)
val topology = createTopology()
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
try {
- simulator.apply(topology)
- simulator.run(traceReader)
+ runner.apply(topology)
+ runner.run(workload, 0)
} finally {
- simulator.close()
+ runner.close()
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -117,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(221949826, monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(68421374, monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(947010, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.783711298639437E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -131,7 +131,7 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
+ val workload = createTestWorkload(0.25, seed)
val simulator = ComputeWorkloadRunner(
coroutineContext,
@@ -143,7 +143,7 @@ class CapelinIntegrationTest {
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -161,9 +161,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -173,9 +173,8 @@ class CapelinIntegrationTest {
*/
@Test
fun testInterference() = runBlockingSimulation {
- val seed = 1
- val traceReader = createTestTraceReader(0.25, seed)
-
+ val seed = 0
+ val workload = createTestWorkload(1.0, seed)
val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
val performanceInterferenceModel =
PerformanceInterferenceReader()
@@ -193,7 +192,7 @@ class CapelinIntegrationTest {
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -211,10 +210,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } }
+ { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(3378, monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -228,15 +227,15 @@ class CapelinIntegrationTest {
coroutineContext,
clock,
computeScheduler,
- grid5000(Duration.ofDays(7), seed)
+ grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
- val traceReader = createTestTraceReader(0.25, seed)
+ val workload = createTestWorkload(0.25, seed)
val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
try {
simulator.apply(topology)
- simulator.run(traceReader)
+ simulator.run(workload, seed.toLong())
} finally {
simulator.close()
metricReader.close()
@@ -254,23 +253,20 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(8640140, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(12100660, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(939456, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } }
+ { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } }
)
}
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
- return ParquetTraceReader(
- listOf(RawParquetTraceReader(File("src/test/resources/trace"))),
- Workload("test", fraction),
- seed
- )
+ private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> {
+ val source = trace("bitbrains-small").sampleByLoad(fraction)
+ return source.resolve(workloadLoader, Random(seed.toLong()))
}
/**
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
index ee76d38f..ee76d38f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
index 9b1cde13..9b1cde13 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 48183d71..1b518fee 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -28,15 +28,12 @@ import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import kotlinx.coroutines.*
import mu.KotlinLogging
-import org.opendc.compute.workload.ComputeWorkloadRunner
-import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.*
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
-import org.opendc.compute.workload.trace.RawParquetTraceReader
import org.opendc.compute.workload.util.PerformanceInterferenceReader
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.util.createComputeScheduler
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
@@ -136,13 +133,9 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.info { "Constructing performance interference model" }
- val traceDir = File(
- tracePath,
- scenario.trace.traceId
- )
- val traceReader = RawParquetTraceReader(traceDir)
+ val workloadLoader = ComputeWorkloadLoader(tracePath)
val interferenceGroups = let {
- val path = File(traceDir, "performance-interference-model.json")
+ val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json")
val operational = scenario.operationalPhenomena
val enabled = operational.performanceInterferenceEnabled
@@ -158,7 +151,7 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) }
- runRepeat(scenario, repeat, topology, traceReader, interferenceModel)
+ runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel)
}
}
@@ -174,7 +167,7 @@ class RunnerCli : CliktCommand(name = "runner") {
scenario: Scenario,
repeat: Int,
topology: Topology,
- traceReader: RawParquetTraceReader,
+ workloadLoader: ComputeWorkloadLoader,
interferenceModel: VmInterferenceModel?
): WebComputeMonitor.Result {
val monitor = WebComputeMonitor()
@@ -188,15 +181,11 @@ class RunnerCli : CliktCommand(name = "runner") {
val operational = scenario.operationalPhenomena
val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
+ val workload = Workload(workloadName, trace(workloadName).sampleByLoad(workloadFraction))
- val trace = ParquetTraceReader(
- listOf(traceReader),
- Workload(workloadName, workloadFraction),
- repeat
- )
val failureModel =
if (operational.failuresEnabled)
- grid5000(Duration.ofDays(7), repeat)
+ grid5000(Duration.ofDays(7))
else
null
@@ -214,7 +203,7 @@ class RunnerCli : CliktCommand(name = "runner") {
// Instantiate the topology onto the simulator
simulator.apply(topology)
// Run workload trace
- simulator.run(trace)
+ simulator.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
simulator.close()
metricReader.close()