diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-16 16:52:00 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-19 14:02:51 +0200 |
| commit | b14df2a0924774c5aed15cedeb1027abf8ee5361 (patch) | |
| tree | d93171ad63477b97e160d3a10e3216c3de1b5ff5 | |
| parent | 29e52ae17bd567070b52e0bcd3c254ee7491189a (diff) | |
refactor(capelin): Make workload sampling model extensible
This change updates the workload sampling implementation to be more
flexible in the way the workload is constructed. Users can now sample
multiple workloads at the same time using multiple samplers and use them
as a single workload to simulate.
28 files changed, 558 insertions, 767 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt index 87903623..fcd9dd7e 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/failure/StochasticVictimSelector.kt @@ -24,8 +24,8 @@ package org.opendc.compute.simulator.failure import org.apache.commons.math3.distribution.RealDistribution import org.opendc.compute.simulator.SimHost +import java.util.* import kotlin.math.roundToInt -import kotlin.random.Random /** * A [VictimSelector] that stochastically selects a set of hosts to be failed. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index b0795d61..78002c2f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -20,13 +20,16 @@ * SOFTWARE. */ -package org.opendc.compute.workload.trace +package org.opendc.compute.workload + +import java.util.* /** - * An interface for reading workloads into memory. - * - * This interface must guarantee that the entries are delivered in order of submission time. - * - * @param T The shape of the workloads supported by this reader. + * An interface that describes how a workload is resolved. */ -public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable +public interface ComputeWorkload { + /** + * Resolve the workload into a list of [VirtualMachine]s to simulate. + */ + public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index ae20482d..46176609 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/RawParquetTraceReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -20,30 +20,42 @@ * SOFTWARE. */ -package org.opendc.compute.workload.trace +package org.opendc.compute.workload +import mu.KotlinLogging import org.opendc.compute.workload.trace.bp.BPTraceFormat import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.trace.* import java.io.File -import java.util.UUID +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.roundToLong /** - * A [TraceReader] for the internal VM workload trace format. + * A helper class for loading compute workload traces into memory. * - * @param path The directory of the traces. + * @param baseDir The directory containing the traces. */ -public class RawParquetTraceReader(private val path: File) { +public class ComputeWorkloadLoader(private val baseDir: File) { /** - * The [Trace] that represents this trace. + * The logger for this instance. */ - private val trace = BPTraceFormat().open(path.toURI().toURL()) + private val logger = KotlinLogging.logger {} + + /** + * The [BPTraceFormat] instance to load the traces + */ + private val format = BPTraceFormat() + + /** + * The cache of workloads. + */ + private val cache = ConcurrentHashMap<String, List<VirtualMachine>>() /** * Read the fragments into memory. */ - private fun parseFragments(): Map<String, List<SimTraceWorkload.Fragment>> { + private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() @@ -75,11 +87,11 @@ public class RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { + private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() var counter = 0 - val entries = mutableListOf<TraceEntry<SimWorkload>>() + val entries = mutableListOf<VirtualMachine>() return try { while (reader.nextRow()) { @@ -96,23 +108,25 @@ public class RawParquetTraceReader(private val path: File) { val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val workload = SimTraceWorkload(vmFragments) + val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs + entries.add( - TraceEntry( - uid, id, submissionTime.toEpochMilli(), workload, - mapOf( - "submit-time" to submissionTime.toEpochMilli(), - "end-time" to endTime.toEpochMilli(), - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory.toLong(), - "workload" to workload - ) + VirtualMachine( + uid, + id, + maxCores, + requiredMemory.roundToLong(), + totalLoad, + submissionTime, + endTime, + vmFragments ) ) } + // Make sure the virtual machines are ordered by start time + entries.sortBy { it.startTime } + entries } catch (e: Exception) { e.printStackTrace() @@ -123,17 +137,24 @@ public class RawParquetTraceReader(private val path: File) { } /** - * The entries in the trace. + * Load the trace with the specified [name]. */ - private val entries: List<TraceEntry<SimWorkload>> + public fun get(name: String): List<VirtualMachine> { + return cache.computeIfAbsent(name) { + val path = baseDir.resolve(it) + + logger.info { "Loading trace $it at $path" } - init { - val fragments = parseFragments() - entries = parseMeta(fragments) + val trace = format.open(path.toURI().toURL()) + val fragments = parseFragments(trace) + parseMeta(trace, fragments) + } } /** - * Read the entries in the trace. + * Clear the workload cache. */ - public fun read(): List<TraceEntry<SimWorkload>> = entries + public fun reset() { + cache.clear() + } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt index 6da0f49a..ed45bd8a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -34,14 +34,13 @@ import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec -import org.opendc.compute.workload.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.compute.* import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock +import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -90,10 +89,11 @@ public class ComputeWorkloadRunner( } /** - * Run a simulation of the [ComputeService] by replaying the workload trace given by [reader]. + * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. */ - public suspend fun run(reader: TraceReader<SimWorkload>) { - val injector = failureModel?.createInjector(context, clock, service) + public suspend fun run(trace: List<VirtualMachine>, seed: Long) { + val random = Random(seed) + val injector = failureModel?.createInjector(context, clock, service, random) val client = service.newClient() // Create new image for the virtual machine @@ -106,35 +106,36 @@ public class ComputeWorkloadRunner( var offset = Long.MIN_VALUE - while (reader.hasNext()) { - val entry = reader.next() + for (entry in trace.sortedBy { it.startTime }) { + val now = clock.millis() + val start = entry.startTime.toEpochMilli() if (offset < 0) { - offset = entry.start - clock.millis() + offset = start - now } // Make sure the trace entries are ordered by submission time - assert(entry.start - offset >= 0) { "Invalid trace order" } - delay(max(0, (entry.start - offset) - clock.millis())) + assert(start - offset >= 0) { "Invalid trace order" } + delay(max(0, (start - offset) - now)) launch { val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + val workload = SimTraceWorkload(entry.trace, workloadOffset) val server = client.newServer( entry.name, image, client.newFlavor( entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long + entry.cpuCount, + entry.memCapacity ), - meta = entry.meta + mapOf("workload" to workload) + meta = mapOf("workload" to workload) ) // Wait for the server reach its end time - val endTime = entry.meta["end-time"] as Long - delay(endTime + workloadOffset - clock.millis() + 1) + val endTime = entry.stopTime.toEpochMilli() + delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) // Delete the server after reaching the end-time of the virtual machine server.delete() @@ -145,7 +146,6 @@ public class ComputeWorkloadRunner( yield() } finally { injector?.close() - reader.close() client.close() } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt new file mode 100644 index 00000000..f58ce587 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("ComputeWorkloads") +package org.opendc.compute.workload + +import org.opendc.compute.workload.internal.CompositeComputeWorkload +import org.opendc.compute.workload.internal.HpcSampledComputeWorkload +import org.opendc.compute.workload.internal.LoadSampledComputeWorkload +import org.opendc.compute.workload.internal.TraceComputeWorkload + +/** + * Construct a workload from a trace. + */ +public fun trace(name: String): ComputeWorkload = TraceComputeWorkload(name) + +/** + * Construct a composite workload with the specified fractions. + */ +public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload { + return CompositeComputeWorkload(pairs.toMap()) +} + +/** + * Sample a workload by a [fraction] of the total load. + */ +public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload { + return LoadSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC VMs (count) + */ +public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction) +} + +/** + * Sample a workload by a [fraction] of the HPC load + */ +public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload { + return HpcSampledComputeWorkload(this, fraction, sampleLoad = true) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt index 43dd8321..4d9ef15d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt @@ -25,6 +25,7 @@ package org.opendc.compute.workload import org.opendc.compute.service.ComputeService import org.opendc.compute.simulator.failure.HostFaultInjector import java.time.Clock +import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -34,5 +35,5 @@ public interface FailureModel { /** * Construct a [HostFaultInjector] for the specified [service]. */ - public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService): HostFaultInjector + public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt index 55c61be1..be7120b9 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt @@ -32,9 +32,9 @@ import org.opendc.compute.simulator.failure.StartStopHostFault import org.opendc.compute.simulator.failure.StochasticVictimSelector import java.time.Clock import java.time.Duration +import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.ln -import kotlin.random.Random /** * Obtain a [FailureModel] based on the GRID'5000 failure trace. @@ -42,14 +42,15 @@ import kotlin.random.Random * This fault injector uses parameters from the GRID'5000 failure trace as described in * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009. */ -public fun grid5000(failureInterval: Duration, seed: Int): FailureModel { +public fun grid5000(failureInterval: Duration): FailureModel { return object : FailureModel { override fun createInjector( context: CoroutineContext, clock: Clock, - service: ComputeService + service: ComputeService, + random: Random ): HostFaultInjector { - val rng = Well19937c(seed) + val rng = Well19937c(random.nextLong()) val hosts = service.hosts.map { it as SimHost }.toSet() // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 @@ -59,7 +60,7 @@ public fun grid5000(failureInterval: Duration, seed: Int): FailureModel { clock, hosts, iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03), - selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random), fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) ) } diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 0c716183..40484b68 100644 --- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -20,28 +20,30 @@ * SOFTWARE. */ -description = "Experiments for the Risk Analysis work" +package org.opendc.compute.workload -/* Build configuration */ -plugins { - `experiment-conventions` - `testing-conventions` -} +import org.opendc.simulator.compute.workload.SimTraceWorkload +import java.time.Instant +import java.util.* -dependencies { - api(platform(projects.opendcPlatform)) - api(projects.opendcHarness.opendcHarnessApi) - implementation(projects.opendcFormat) - implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - - implementation(libs.kotlin.logging) - implementation(libs.config) - implementation(libs.progressbar) - implementation(libs.clikt) - - implementation(libs.parquet) - testImplementation(libs.log4j.slf4j) -} +/** + * A virtual machine workload. + * + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM. + * @param startTime The start time of the VM. + * @param stopTime The stop time of the VM. + * @param trace The trace fragments that belong to this VM. + */ +public data class VirtualMachine( + val uid: UUID, + val name: String, + val cpuCount: Int, + val memCapacity: Long, + val totalLoad: Double, + val startTime: Instant, + val stopTime: Instant, + val trace: Sequence<SimTraceWorkload.Fragment>, +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt new file mode 100644 index 00000000..9b2bec55 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads. + */ +internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } + + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } + + val res = mutableListOf<VirtualMachine>() + + for ((fraction, vms) in traces) { + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + } + + val vmCount = traces.sumOf { (_, vms) -> vms.size } + logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt new file mode 100644 index 00000000..52f4c672 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that samples HPC VMs in the workload. + * + * @param fraction The fraction of load/virtual machines to sample + * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs. + */ +internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + /** + * The pattern to match compute nodes in the workload. + */ + private val pattern = Regex("^(ComputeNode|cn).*") + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + + val (hpc, nonHpc) = vms.partition { entry -> + val name = entry.name + name.matches(pattern) + } + + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf<VirtualMachine>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } + + val totalLoad = vms.sumOf { it.totalLoad } + + logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 + + val res = mutableListOf<VirtualMachine>() + + if (sampleLoad) { + var currentLoad = 0.0 + for (entry in hpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + hpcLoad += entryLoad + hpcCount += 1 + currentLoad += entryLoad + res += entry + } + + for (entry in nonHpcSequence) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > 1) { + break + } + + nonHpcLoad += entryLoad + nonHpcCount += 1 + currentLoad += entryLoad + res += entry + } + } else { + hpcSequence + .take((fraction * vms.size).toInt()) + .forEach { entry -> + hpcLoad += entry.totalLoad + hpcCount += 1 + res.add(entry) + } + + nonHpcSequence + .take(((1 - fraction) * vms.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.totalLoad + nonHpcCount += 1 + res.add(entry) + } + } + + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } + + /** + * Sample a random trace entry. + */ + private fun sample(entry: VirtualMachine, i: Int): VirtualMachine { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt new file mode 100644 index 00000000..ef6de729 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.internal + +import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* + +/** + * A [ComputeWorkload] that is sampled based on total load. + */ +internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload { + /** + * The logging instance of this class. + */ + private val logger = KotlinLogging.logger {} + + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + val vms = source.resolve(loader, random) + val res = mutableListOf<VirtualMachine>() + + val totalLoad = vms.sumOf { it.totalLoad } + var currentLoad = 0.0 + + for (entry in vms) { + val entryLoad = entry.totalLoad + if ((currentLoad + entryLoad) / totalLoad > fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + + return res + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index bfa2d051..d657ff01 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceEntry.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -20,23 +20,18 @@ * SOFTWARE. */ -package org.opendc.compute.workload.trace +package org.opendc.compute.workload.internal -import java.util.UUID +import org.opendc.compute.workload.ComputeWorkload +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine +import java.util.* /** - * An entry in a workload trace. - * - * @param uid The unique identifier of the entry. - * @param name The name of the entry. - * @param start The start time of the workload. - * @param workload The workload of the entry. - * @param meta The meta-data associated with the workload. + * A [ComputeWorkload] from a trace. */ -public data class TraceEntry<out T>( - val uid: UUID, - val name: String, - val start: Long, - val workload: T, - val meta: Map<String, Any> -) +internal class TraceComputeWorkload(val name: String) : ComputeWorkload { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> { + return loader.get(name) + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt deleted file mode 100644 index 36cd0a7d..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/StreamingParquetTraceReader.kt +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.trace.util.parquet.LocalInputFile -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param selectedVms The list of VMs to read from the trace. - */ -public class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> { - private val logger = KotlinLogging.logger {} - - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0L, 0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader - .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val time = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - time, - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf<UUID>() - val entries = mutableMapOf<String, GenericData.Record>() - val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() - - val metaReader = AvroParquetReader - .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = selectedVms.ifEmpty { entries.keys } - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info { "Processing VM $id" } - - val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val workload = SimTraceWorkload(fragments) - val meta = mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - - TraceEntry(uid, id, submissionTime, workload, meta) - } - .sortedBy { it.start } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt index faabe5cb..31e8f961 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt @@ -22,7 +22,8 @@ package org.opendc.experiments.capelin -import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.compute.workload.composite +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -42,30 +43,25 @@ public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { ) override val workload: Workload by anyOf( - CompositeWorkload( + Workload( "all-azure", - listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), - totalSampleLoad + composite(trace("solvinity-short") to 0.0, trace("azure") to 1.0) ), - CompositeWorkload( + Workload( "solvinity-25-azure-75", - listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), - totalSampleLoad + composite(trace("solvinity-short") to 0.25, trace("azure") to 0.75) ), - CompositeWorkload( + Workload( "solvinity-50-azure-50", - listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), - totalSampleLoad + composite(trace("solvinity-short") to 0.5, trace("azure") to 0.5) ), - CompositeWorkload( + Workload( "solvinity-75-azure-25", - listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), - totalSampleLoad + composite(trace("solvinity-short") to 0.75, trace("azure") to 0.25) ), - CompositeWorkload( + Workload( "all-solvinity", - listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), - totalSampleLoad + composite(trace("solvinity-short") to 1.0, trace("azure") to 0.0) ) ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt index e1cf8517..cd093e6c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -44,10 +46,10 @@ public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt index a995e467..73e59a58 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt @@ -22,8 +22,10 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByHpc +import org.opendc.compute.workload.sampleByHpcLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.harness.dsl.anyOf @@ -40,13 +42,13 @@ public class MoreHpcPortfolio : Portfolio("more_hpc") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) + Workload("solvinity", trace("solvinity").sampleByHpc(0.0)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpc(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpc(1.0)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByHpcLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt index 49559e0e..9d5717bb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -40,10 +42,10 @@ public class MoreVelocityPortfolio : Portfolio("more_velocity") { ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt index 1aac4f9e..7ab586b3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt @@ -22,6 +22,8 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.sampleByLoad +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,10 +38,10 @@ public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") ) override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity").sampleByLoad(0.1)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.25)), + Workload("solvinity", trace("solvinity").sampleByLoad(0.5)), + Workload("solvinity", trace("solvinity").sampleByLoad(1.0)) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 02811d83..630b76c4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -24,18 +24,16 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import mu.KotlinLogging +import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner import org.opendc.compute.workload.export.parquet.ParquetExportMonitor import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.trace.RawParquetTraceReader import org.opendc.compute.workload.util.PerformanceInterferenceReader -import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf @@ -47,7 +45,6 @@ import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration import java.util.* -import java.util.concurrent.ConcurrentHashMap import kotlin.math.roundToLong /** @@ -92,9 +89,9 @@ abstract class Portfolio(name: String) : Experiment(name) { abstract val allocationPolicy: String /** - * A map of trace readers. + * A helper class to load workload traces. */ - private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>() + private val workloadLoader = ComputeWorkloadLoader(File(config.getString("trace-path"))) /** * Perform a single trial for this portfolio. @@ -102,19 +99,6 @@ abstract class Portfolio(name: String) : Experiment(name) { override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val workload = workload - val workloadNames = if (workload is CompositeWorkload) { - workload.workloads.map { it.name } - } else { - listOf(workload.name) - } - val rawReaders = workloadNames.map { workloadName -> - traceReaders.computeIfAbsent(workloadName) { - logger.info { "Loading trace $workloadName" } - RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) - } - } - val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val performanceInterferenceModel = if (operationalPhenomena.hasInterference) PerformanceInterferenceReader() .read(File(config.getString("interference-model"))) @@ -125,7 +109,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val computeScheduler = createComputeScheduler(allocationPolicy, seeder, vmPlacements) val failureModel = if (operationalPhenomena.failureFrequency > 0) - grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()), seeder.nextInt()) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null val runner = ComputeWorkloadRunner( @@ -149,7 +133,7 @@ abstract class Portfolio(name: String) : Experiment(name) { runner.apply(topology) // Run the workload trace - runner.run(trace) + runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { runner.close() metricReader.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt index b6d3b30c..17ec48d4 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class ReplayPortfolio : Portfolio("replay") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt index 90840db8..98eb989d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.workload.trace import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload @@ -36,7 +37,7 @@ public class TestPortfolio : Portfolio("test") { ) override val workload: Workload by anyOf( - Workload("solvinity", 1.0) + Workload("solvinity", trace("solvinity")) ) override val operationalPhenomena: OperationalPhenomena by anyOf( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt index c4ddd158..a2e71243 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt @@ -22,23 +22,12 @@ package org.opendc.experiments.capelin.model -public enum class SamplingStrategy { - REGULAR, - HPC, - HPC_LOAD -} +import org.opendc.compute.workload.ComputeWorkload /** - * A workload that is considered for a scenario. - */ -public open class Workload( - public open val name: String, - public val fraction: Double, - public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR -) - -/** - * A workload that is composed of multiple workloads. + * A single workload originating from a trace. + * + * @param name the name of the workload. + * @param source The source of the workload data. */ -public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) : - Workload(name, -1.0) +data class Workload(val name: String, val source: ComputeWorkload) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt deleted file mode 100644 index 498636ba..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.compute.workload.trace.RawParquetTraceReader -import org.opendc.compute.workload.trace.TraceEntry -import org.opendc.compute.workload.trace.TraceReader -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param rawReaders The internal raw trace readers to use. - * @param workload The workload to read. - * @param seed The seed to use for sampling. - */ -public class ParquetTraceReader( - rawReaders: List<RawParquetTraceReader>, - workload: Workload, - seed: Int -) : TraceReader<SimWorkload> { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .flatMap { - sampleWorkload(it.first, workload, it.second, seed) - .sortedBy(TraceEntry<SimWorkload>::start) - } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt deleted file mode 100644 index b42951df..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.opendc.compute.workload.trace.TraceEntry -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.SamplingStrategy -import org.opendc.experiments.capelin.model.Workload -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -public fun sampleWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - return when { - workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) - workload.samplingStrategy == SamplingStrategy.HPC -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) - workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) - else -> - sampleRegularWorkload(trace, workload, workload, seed) - } -} - -/** - * Sample a regular (non-HPC) workload. - */ -public fun sampleRegularWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - val fraction = subWorkload.fraction - - val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<SimWorkload>>() - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - shuffled.sumOf { it.meta.getValue("total-load") as Double } - } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a HPC workload. - */ -public fun sampleHpcWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - seed: Int, - sampleOnLoad: Boolean -): List<TraceEntry<SimWorkload>> { - val pattern = Regex("^vm__workload__(ComputeNode|cn).*") - val random = Random(seed) - - val fraction = workload.fraction - val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - trace.sumOf { it.meta.getValue("total-load") as Double } - } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf<TraceEntry<SimWorkload>>() - - if (sampleOnLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * trace.size).toInt()) - .forEach { entry -> - hpcLoad += entry.meta.getValue("total-load") as Double - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * trace.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.meta.getValue("total-load") as Double - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a random trace entry. - */ -private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> { - val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) - return entry.copy(uid = uid) -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index c1386bfe..140a84db 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -31,18 +31,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.workload.ComputeWorkloadRunner -import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.trace.RawParquetTraceReader -import org.opendc.compute.workload.trace.TraceReader import org.opendc.compute.workload.util.PerformanceInterferenceReader -import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor @@ -68,6 +62,11 @@ class CapelinIntegrationTest { private lateinit var computeScheduler: FilterScheduler /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader + + /** * Setup the experimental environment. */ @BeforeEach @@ -77,6 +76,7 @@ class CapelinIntegrationTest { filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) } /** @@ -84,24 +84,24 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val traceReader = createTestTraceReader() - val simulator = ComputeWorkloadRunner( + val workload = createTestWorkload(1.0) + val runner = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) try { - simulator.apply(topology) - simulator.run(traceReader) + runner.apply(topology) + runner.run(workload, 0) } finally { - simulator.close() + runner.close() metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -117,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(221949826, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(68421374, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(947010, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.783711298639437E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -131,7 +131,7 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val traceReader = createTestTraceReader(0.25, seed) + val workload = createTestWorkload(0.25, seed) val simulator = ComputeWorkloadRunner( coroutineContext, @@ -143,7 +143,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(traceReader) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -161,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -173,9 +173,8 @@ class CapelinIntegrationTest { */ @Test fun testInterference() = runBlockingSimulation { - val seed = 1 - val traceReader = createTestTraceReader(0.25, seed) - + val seed = 0 + val workload = createTestWorkload(1.0, seed) val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = PerformanceInterferenceReader() @@ -193,7 +192,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(traceReader) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -211,10 +210,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(3378, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -228,15 +227,15 @@ class CapelinIntegrationTest { coroutineContext, clock, computeScheduler, - grid5000(Duration.ofDays(7), seed) + grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") - val traceReader = createTestTraceReader(0.25, seed) + val workload = createTestWorkload(0.25, seed) val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) try { simulator.apply(topology) - simulator.run(traceReader) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -254,23 +253,20 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(8640140, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(12100660, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(939456, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } ) } /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { - return ParquetTraceReader( - listOf(RawParquetTraceReader(File("src/test/resources/trace"))), - Workload("test", fraction), - seed - ) + private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> { + val source = trace("bitbrains-small").sampleByLoad(fraction) + return source.resolve(workloadLoader, Random(seed.toLong())) } /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet Binary files differindex ee76d38f..ee76d38f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet Binary files differindex 9b1cde13..9b1cde13 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 48183d71..1b518fee 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -28,15 +28,12 @@ import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.* import mu.KotlinLogging -import org.opendc.compute.workload.ComputeWorkloadRunner -import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.trace.RawParquetTraceReader import org.opendc.compute.workload.util.PerformanceInterferenceReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.util.createComputeScheduler import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel @@ -136,13 +133,9 @@ class RunnerCli : CliktCommand(name = "runner") { logger.info { "Constructing performance interference model" } - val traceDir = File( - tracePath, - scenario.trace.traceId - ) - val traceReader = RawParquetTraceReader(traceDir) + val workloadLoader = ComputeWorkloadLoader(tracePath) val interferenceGroups = let { - val path = File(traceDir, "performance-interference-model.json") + val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") val operational = scenario.operationalPhenomena val enabled = operational.performanceInterferenceEnabled @@ -158,7 +151,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } - runRepeat(scenario, repeat, topology, traceReader, interferenceModel) + runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel) } } @@ -174,7 +167,7 @@ class RunnerCli : CliktCommand(name = "runner") { scenario: Scenario, repeat: Int, topology: Topology, - traceReader: RawParquetTraceReader, + workloadLoader: ComputeWorkloadLoader, interferenceModel: VmInterferenceModel? ): WebComputeMonitor.Result { val monitor = WebComputeMonitor() @@ -188,15 +181,11 @@ class RunnerCli : CliktCommand(name = "runner") { val operational = scenario.operationalPhenomena val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) + val workload = Workload(workloadName, trace(workloadName).sampleByLoad(workloadFraction)) - val trace = ParquetTraceReader( - listOf(traceReader), - Workload(workloadName, workloadFraction), - repeat - ) val failureModel = if (operational.failuresEnabled) - grid5000(Duration.ofDays(7), repeat) + grid5000(Duration.ofDays(7)) else null @@ -214,7 +203,7 @@ class RunnerCli : CliktCommand(name = "runner") { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(trace) + simulator.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { simulator.close() metricReader.close() |
