diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-24 14:29:29 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-06-24 14:29:29 +0200 |
| commit | 36cb3c0cf642990a7b087a56d627a0de4fe2e71f (patch) | |
| tree | 67c09fa437bc9b1f37f23b80b970b6aa686ad818 /opendc-experiments/opendc-experiments-capelin | |
| parent | a29a61334adb8432c69800b19508eca4eff4bfd1 (diff) | |
| parent | e56967a29ac2b2d26cc085b1f3e27096dad6a170 (diff) | |
simulator: Support perf interference in uniform resource model
This pull request re-implements the performance interference model to integrate
with the uniform resource model in OpenDC. This forms the basis for other forms
of resource interference (e.g., network or disk).
* Add interface for resource interference in uniform resource
model (`opendc-simulator-resources`)
* Remove dependency on performance interference model from trace readers
* Re-implement the performance interference model on top of the interface
in the uniform resource model.
**Breaking API Changes**
* The original performance interference model classes are removed
* The SC20 trace and environment related readers have moved to the Capelin experiments module.
* Changes to the interfaces in `opendc-format`.
Implements #103
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
12 files changed, 381 insertions, 148 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 47f5f71e..9548253d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -41,11 +41,11 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector @@ -53,7 +53,6 @@ import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.io.File import java.time.Clock import kotlin.coroutines.resume import kotlin.math.ln @@ -68,7 +67,7 @@ private val logger = KotlinLogging.logger {} /** * Construct the failure domain for the experiments. */ -public fun createFailureDomain( +fun createFailureDomain( coroutineScope: CoroutineScope, clock: Clock, seed: Int, @@ -100,7 +99,7 @@ public fun createFailureDomain( /** * Obtain the [FaultInjector] to use for the experiments. */ -public fun createFaultInjector( +fun createFaultInjector( coroutineScope: CoroutineScope, clock: Clock, random: Random, @@ -119,30 +118,14 @@ public fun createFaultInjector( } /** - * Create the trace reader from which the VM workloads are read. - */ -public fun createTraceReader( - path: File, - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List<String>, - seed: Int -): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - -/** * Construct the environment for a simulated compute service.. */ -public suspend fun withComputeService( +suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, environmentReader: EnvironmentReader, scheduler: ComputeScheduler, + interferenceModel: VmInterferenceModel? = null, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val interpreter = SimResourceInterpreter(coroutineContext, clock) @@ -158,7 +141,8 @@ public suspend fun withComputeService( interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), - def.powerModel + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() ) } @@ -181,16 +165,13 @@ public suspend fun withComputeService( /** * Attach the specified monitor to the VM provisioner. */ -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun withMonitor( +suspend fun withMonitor( monitor: ExperimentMonitor, clock: Clock, metricProducer: MetricProducer, scheduler: ComputeService, block: suspend CoroutineScope.() -> Unit ): Unit = coroutineScope { - val monitorJobs = mutableSetOf<Job>() - // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -211,24 +192,23 @@ public suspend fun withMonitor( try { block(this) } finally { - monitorJobs.forEach(Job::cancel) reader.close() monitor.close() } } -public class ComputeMetrics { - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var unscheduledVms: Int = 0 - public var finishedVms: Int = 0 +class ComputeMetrics { + var submittedVms: Int = 0 + var queuedVms: Int = 0 + var runningVms: Int = 0 + var unscheduledVms: Int = 0 + var finishedVms: Int = 0 } /** * Collect the metrics of the compute service. */ -public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { +fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { val metrics = metricProducer.collectAllMetrics().associateBy { it.name } val res = ComputeMetrics() try { @@ -247,7 +227,7 @@ public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { /** * Process the trace. */ -public suspend fun processTrace( +suspend fun processTrace( clock: Clock, reader: TraceReader<SimWorkload>, scheduler: ComputeService, @@ -306,7 +286,7 @@ public suspend fun processTrace( /** * Create a [MeterProvider] instance for the experiment. */ -public fun createMeterProvider(clock: Clock): MeterProvider { +fun createMeterProvider(clock: Clock): MeterProvider { val powerSelector = InstrumentSelector.builder() .setInstrumentNameRegex("power\\.usage") .setInstrumentType(InstrumentType.VALUE_RECORDER) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index b70eefb2..cbb5bfd9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -32,29 +32,30 @@ import org.opendc.compute.service.scheduler.* import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.* +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.PerformanceInterferenceModelReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import java.io.File +import java.io.FileInputStream import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. * * @param name The name of the portfolio. */ -public abstract class Portfolio(name: String) : Experiment(name) { +abstract class Portfolio(name: String) : Experiment(name) { /** * The logger for this portfolio instance. */ @@ -71,34 +72,29 @@ public abstract class Portfolio(name: String) : Experiment(name) { private val vmPlacements by anyOf(emptyMap<String, String>()) /** - * The path to the performance interference model. - */ - private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null) - - /** * The topology to test. */ - public abstract val topology: Topology + abstract val topology: Topology /** * The workload to test. */ - public abstract val workload: Workload + abstract val workload: Workload /** * The operational phenomenas to consider. */ - public abstract val operationalPhenomena: OperationalPhenomena + abstract val operationalPhenomena: OperationalPhenomena /** * The allocation policies to consider. */ - public abstract val allocationPolicy: String + abstract val allocationPolicy: String /** * A map of trace readers. */ - private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>() + private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>() /** * Perform a single trial for this portfolio. @@ -106,7 +102,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val environment = Sc20ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) + val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = createComputeScheduler(seeder) @@ -122,14 +118,17 @@ public abstract class Portfolio(name: String) : Experiment(name) { val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } - Sc20RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) + RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - val performanceInterferenceModel = performanceInterferenceModel - ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder.asKotlinRandom()) ?: emptyMap() - val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) + val performanceInterferenceModel = if (operationalPhenomena.hasInterference) + PerformanceInterferenceReader(FileInputStream(config.getString("interference-model"))) + .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) } + else + null + + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( File(config.getString("output-path")), @@ -137,7 +136,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt new file mode 100644 index 00000000..d73d14f5 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.env + +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.MachineDef +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.LinearPowerModel +import java.io.File +import java.io.FileInputStream +import java.io.InputStream +import java.util.* + +/** + * A [EnvironmentReader] for the internal environment format. + * + * @param input The input stream describing the physical cluster. + */ +class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader { + /** + * Construct a [ClusterEnvironmentReader] for the specified [file]. + */ + constructor(file: File) : this(FileInputStream(file)) + + override fun read(): List<MachineDef> { + var clusterIdCol = 0 + var speedCol = 0 + var numberOfHostsCol = 0 + var memoryPerHostCol = 0 + var coresPerHostCol = 0 + + var clusterIdx = 0 + var clusterId: String + var speed: Double + var numberOfHosts: Int + var memoryPerHost: Long + var coresPerHost: Int + + val nodes = mutableListOf<MachineDef>() + val random = Random(0) + + input.bufferedReader().use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the file + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";") + + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + clusterIdCol = header["ClusterID"]!! + speedCol = header["Speed"]!! + numberOfHostsCol = header["numberOfHosts"]!! + memoryPerHostCol = header["memoryCapacityPerHost"]!! + coresPerHostCol = header["coreCountPerHost"]!! + return@forEachIndexed + } + + clusterIdx++ + clusterId = values[clusterIdCol].trim() + speed = values[speedCol].trim().toDouble() * 1000.0 + numberOfHosts = values[numberOfHostsCol].trim().toInt() + memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L + coresPerHost = values[coresPerHostCol].trim().toInt() + + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + + repeat(numberOfHosts) { + nodes.add( + MachineDef( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$it", + mapOf("cluster" to clusterId), + MachineModel( + List(coresPerHost) { coreId -> + ProcessingUnit(unknownProcessingNode, coreId, speed) + }, + listOf(unknownMemoryUnit) + ), + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearPowerModel(350.0, idlePower = 200.0) + ) + ) + } + } + } + + return nodes + } + + override fun close() { + input.close() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 7f25137e..5ad75565 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -26,21 +26,17 @@ import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload -import java.util.TreeSet /** * A [TraceReader] for the internal VM workload trace format. * - * @param reader The internal trace reader to use. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - * @param run The run to which this reader belongs. + * @param rawReaders The internal raw trace readers to use. + * @param workload The workload to read. + * @param seed The seed to use for sampling. */ -public class Sc20ParquetTraceReader( - rawReaders: List<Sc20RawParquetTraceReader>, - performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, +public class ParquetTraceReader( + rawReaders: List<RawParquetTraceReader>, workload: Workload, seed: Int ) : TraceReader<SimWorkload> { @@ -59,20 +55,6 @@ public class Sc20ParquetTraceReader( } .map { sampleWorkload(it.first, workload, it.second, seed) } .flatten() - .run { - // Apply performance interference model - if (performanceInterferenceModel.isEmpty()) - this - else { - map { entry -> - val id = entry.name - val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - - entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) - } - } - } .iterator() override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt new file mode 100644 index 00000000..a19f5699 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class PerformanceInterferenceReader( + private val input: InputStream, + private val mapper: ObjectMapper = jacksonObjectMapper() +) : AutoCloseable { + init { + mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) + } + + /** + * Read the performance interface model from the input. + */ + fun read(): List<VmInterferenceGroup> { + return mapper.readValue(input) + } + + override fun close() { + input.close() + } + + private data class GroupMixin( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set<String>, + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 54151c9f..94193780 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin.trace -import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -32,14 +31,12 @@ import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.util.UUID -private val logger = KotlinLogging.logger {} - /** * A [TraceReader] for the internal VM workload trace format. * * @param path The directory of the traces. */ -public class Sc20RawParquetTraceReader(private val path: File) { +class RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ @@ -136,14 +133,5 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<SimWorkload>> = entries - - /** - * Create a [TraceReader] instance. - */ - public fun createReader(): TraceReader<SimWorkload> { - return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() { - override fun close() {} - } - } + fun read(): List<TraceEntry<SimWorkload>> = entries } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 6792c2ab..a3b45f47 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -33,8 +33,6 @@ import org.apache.parquet.io.api.Binary import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.format.util.LocalInputFile -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.File @@ -44,7 +42,6 @@ import java.util.TreeSet import java.util.UUID import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread -import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -52,14 +49,9 @@ private val logger = KotlinLogging.logger {} * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. * * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + * @param selectedVms The list of VMs to read from the trace. */ -public class Sc20StreamingParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel? = null, - selectedVms: List<String> = emptyList(), - random: Random -) : TraceReader<SimWorkload> { +class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ @@ -227,14 +219,6 @@ public class Sc20StreamingParquetTraceReader( buffers.remove(id) } - val relevantPerformanceInterferenceModelItems = - if (performanceInterferenceModel != null) - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) - else - null val workload = SimTraceWorkload(fragments) val meta = mapOf( "cores" to maxCores, @@ -242,13 +226,7 @@ public class Sc20StreamingParquetTraceReader( "workload" to workload ) - TraceEntry( - uid, id, submissionTime, workload, - if (performanceInterferenceModel != null) - meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any) - else - meta - ) + TraceEntry(uid, id, submissionTime, workload, meta) } .sortedBy { it.start } .toList() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index d0031a66..7cd1f159 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -41,7 +41,6 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.sc20.Sc20VmPlacementReader import org.opendc.format.util.LocalOutputFile import java.io.BufferedReader import java.io.File @@ -53,7 +52,7 @@ import kotlin.math.min /** * Represents the command for converting traces */ -public class TraceConverterCli : CliktCommand(name = "trace-converter") { +class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The directory where the trace should be stored. */ @@ -149,24 +148,24 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The supported trace conversions. */ -public sealed class TraceConversion(name: String) : OptionGroup(name) { +sealed class TraceConversion(name: String) : OptionGroup(name) { /** * Read the fragments of the trace. */ - public abstract fun read( + abstract fun read( traceDirectory: File, metaSchema: Schema, metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> } -public class SolvinityConversion : TraceConversion("Solvinity") { +class SolvinityConversion : TraceConversion("Solvinity") { private val clusters by option() .split(",") private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") .file(canBeDir = false) - .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } + .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } } .required() override fun read( @@ -335,7 +334,7 @@ public class SolvinityConversion : TraceConversion("Solvinity") { /** * Conversion of the Bitbrains public trace. */ -public class BitbrainsConversion : TraceConversion("Bitbrains") { +class BitbrainsConversion : TraceConversion("Bitbrains") { override fun read( traceDirectory: File, metaSchema: Schema, @@ -447,7 +446,7 @@ public class BitbrainsConversion : TraceConversion("Bitbrains") { /** * Conversion of the Azure public VM trace. */ -public class AzureConversion : TraceConversion("Azure") { +class AzureConversion : TraceConversion("Azure") { private val seed by option(help = "seed for trace sampling") .long() .default(0) @@ -604,18 +603,18 @@ public class AzureConversion : TraceConversion("Azure") { } } -public data class Fragment( - public val id: String, - public val tick: Long, - public val flops: Long, - public val duration: Long, - public val usage: Double, - public val cores: Int +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int ) -public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) +class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt new file mode 100644 index 00000000..7a1683f0 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream + +/** + * A parser for the JSON VM placement data files used for the TPDS article on Capelin. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +public class VmPlacementReader( + private val input: InputStream, + private val mapper: ObjectMapper = jacksonObjectMapper() +) : AutoCloseable { + /** + * Read the VM placements from the input. + */ + public fun read(): Map<String, String> { + return mapper.readValue<Map<String, String>>(input) + .mapKeys { "vm__workload__${it.key}.txt" } + .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 + } + + override fun close() { + input.close() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 4b21b4f7..08e04ddf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,12 +34,12 @@ import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation @@ -161,9 +161,8 @@ class CapelinIntegrationTest { * Obtain the trace reader for the test. */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { - return Sc20ParquetTraceReader( - listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), - emptyMap(), + return ParquetTraceReader( + listOf(RawParquetTraceReader(File("src/test/resources/trace"))), Workload("test", fraction), seed ) @@ -173,8 +172,8 @@ class CapelinIntegrationTest { * Obtain the environment reader for the test. */ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") - return Sc20ClusterEnvironmentReader(stream) + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt")) + return ClusterEnvironmentReader(stream) } class TestExperimentReporter : ExperimentMonitor { diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt new file mode 100644 index 00000000..9b1513dc --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll + +/** + * Test suite for the [PerformanceInterferenceReader] class. + */ +class PerformanceInterferenceReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) + val reader = PerformanceInterferenceReader(input) + + val result = reader.use { reader.read() } + + assertAll( + { assertEquals(2, result.size) }, + { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, + { assertEquals(0.0, result[0].targetLoad, 0.001) }, + { assertEquals(0.8830158730158756, result[0].score, 0.001) } + ) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json new file mode 100644 index 00000000..1be5852b --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json @@ -0,0 +1,22 @@ +[ + { + "vms": [ + "vm_a", + "vm_c", + "vm_x", + "vm_y" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "vm_a", + "vm_b", + "vm_c", + "vm_d" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] |
