From 91793636facead15192ccad156ffb0927573d055 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 4 Jun 2021 22:52:44 +0200 Subject: simulator: Add interface for resource interference This change introduces an interface for modelling performance variability due to resource interference in systems where resources are shared across multiple consumers. --- .../resources/SimAbstractResourceAggregator.kt | 4 +-- .../simulator/resources/SimResourceDistributor.kt | 6 +++- .../resources/SimResourceDistributorMaxMin.kt | 42 ++++++++++++++-------- .../resources/SimResourceProviderLogic.kt | 17 ++++----- .../simulator/resources/SimResourceSwitch.kt | 6 +++- .../resources/SimResourceSwitchExclusive.kt | 6 +++- .../simulator/resources/SimResourceSwitchMaxMin.kt | 16 ++++++--- .../resources/impl/SimResourceContextImpl.kt | 3 +- .../resources/interference/InterferenceDomain.kt | 19 ++++++++++ .../resources/interference/InterferenceKey.kt | 28 +++++++++++++++ 10 files changed, 113 insertions(+), 34 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 84217278..8a24b3e7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -116,8 +116,8 @@ public abstract class SimAbstractResourceAggregator( updateCounters(ctx, work) } - override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return _inputConsumers.sumOf { it.remainingWork } + override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return work - _inputConsumers.sumOf { it.remainingWork } } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index 6bfbfc99..f384582f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceKey + /** * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ @@ -33,6 +35,8 @@ public interface SimResourceDistributor : SimResourceConsumer { /** * Create a new output for the distributor. + * + * @param key The key of the interference member to which the output belongs. */ - public fun newOutput(): SimResourceCloseableProvider + public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index d8fc8cb6..398797cf 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -22,15 +22,21 @@ package org.opendc.simulator.resources -import kotlin.math.max +import org.opendc.simulator.resources.interference.InterferenceDomain +import org.opendc.simulator.resources.interference.InterferenceKey import kotlin.math.min /** * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. + * + * @param interpreter The interpreter for managing the resource contexts. + * @param parent The parent resource system of the distributor. + * @param interferenceDomain The interference domain of the distributor. */ public class SimResourceDistributorMaxMin( private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null + private val parent: SimResourceSystem? = null, + private val interferenceDomain: InterferenceDomain? = null ) : SimResourceDistributor { override val outputs: Set get() = _outputs @@ -56,9 +62,14 @@ public class SimResourceDistributorMaxMin( */ private var totalAllocatedSpeed = 0.0 + /** + * The total requested speed for the output resources. + */ + private var totalRequestedSpeed = 0.0 + /* SimResourceDistributor */ - override fun newOutput(): SimResourceCloseableProvider { - val provider = Output(ctx?.capacity ?: 0.0) + override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { + val provider = Output(ctx?.capacity ?: 0.0, key) _outputs.add(provider) return provider } @@ -148,6 +159,7 @@ public class SimResourceDistributorMaxMin( assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } this.totalRequestedWork = totalRequestedWork + this.totalRequestedSpeed = totalRequestedSpeed this.totalAllocatedSpeed = capacity - availableSpeed val totalAllocatedWork = min( totalRequestedWork, @@ -169,7 +181,7 @@ public class SimResourceDistributorMaxMin( /** * An internal [SimResourceProvider] implementation for switch outputs. */ - private inner class Output(capacity: Double) : + private inner class Output(capacity: Double, private val key: InterferenceKey?) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceCloseableProvider, SimResourceProviderLogic, @@ -216,7 +228,6 @@ public class SimResourceDistributorMaxMin( check(!isClosed) { "Cannot re-use closed output" } activeOutputs += this - interpreter.batch { ctx.start() // Interrupt the input to re-schedule the resources @@ -262,19 +273,22 @@ public class SimResourceDistributorMaxMin( lastCommandTimestamp = ctx.clock.millis() } - override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - return if (work > 0.0) { - // Compute the fraction of compute time allocated to the output - val fraction = actualSpeed / totalAllocatedSpeed + // Compute the fraction of compute time allocated to the output + val fraction = actualSpeed / totalAllocatedSpeed - // Compute the work that was actually granted to the output. - val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction - max(0.0, work - processingAvailable) + // Compute the performance penalty due to resource interference + val perfScore = if (interferenceDomain != null) { + val load = totalAllocatedSpeed / requireNotNull(this@SimResourceDistributorMaxMin.ctx).capacity + interferenceDomain.apply(key, load) } else { - 0.0 + 1.0 } + + // Compute the work that was actually granted to the output. + return (totalRequestedWork - totalRemainingWork) * fraction * perfScore } /* Comparable */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 5231ecf5..17045557 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import kotlin.math.max - /** * The logic of a resource provider. */ @@ -63,19 +61,18 @@ public interface SimResourceProviderLogic { public fun onFinish(ctx: SimResourceControllableContext) /** - * Get the remaining work to process after a resource consumption. + * Compute the amount of work that was consumed over the specified [duration]. * - * @param work The size of the resource consumption. - * @param speed The speed of consumption. + * @param work The total size of the resource consumption. + * @param speed The speed of the resource provider. * @param duration The duration from the start of the consumption until now. - * @return The amount of work remaining. + * @return The amount of work that was consumed by the resource provider. */ - public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + public fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { return if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, work - processed) + return (duration / 1000.0) * speed } else { - 0.0 + work } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index f6e7b22f..d2aab634 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceKey + /** * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. */ @@ -43,8 +45,10 @@ public interface SimResourceSwitch : AutoCloseable { /** * Create a new output on the switch. + * + * @param key The key of the interference member to which the output belongs. */ - public fun newOutput(): SimResourceCloseableProvider + public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 4ff741ed..fbb541e5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceKey import java.util.ArrayDeque /** @@ -61,7 +62,10 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" } - override fun newOutput(): SimResourceCloseableProvider { + /** + * Add an output to the switch. + */ + override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 50d58798..ceb5a1a4 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -22,13 +22,21 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceDomain +import org.opendc.simulator.resources.interference.InterferenceKey + /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. + * + * @param interpreter The interpreter for managing the resource contexts. + * @param parent The parent resource system of the switch. + * @param interferenceDomain The interference domain of the switch. */ public class SimResourceSwitchMaxMin( interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null + parent: SimResourceSystem? = null, + interferenceDomain: InterferenceDomain? = null ) : SimResourceSwitch { /** * The output resource providers to which resource consumers can be attached. @@ -61,7 +69,7 @@ public class SimResourceSwitchMaxMin( /** * The distributor to distribute the aggregated resources. */ - private val distributor = SimResourceDistributorMaxMin(interpreter, parent) + private val distributor = SimResourceDistributorMaxMin(interpreter, parent, interferenceDomain) init { aggregator.startConsumer(distributor) @@ -70,10 +78,10 @@ public class SimResourceSwitchMaxMin( /** * Add an output to the switch. */ - override fun newOutput(): SimResourceCloseableProvider { + override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } - return distributor.newOutput() + return distributor.newOutput(key) } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 90c7bc75..98fad068 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.resources.impl import org.opendc.simulator.resources.* import java.time.Clock +import kotlin.math.max import kotlin.math.min /** @@ -318,7 +319,7 @@ internal class SimResourceContextImpl( */ private fun computeRemainingWork(now: Long): Double { return if (_work > 0.0) - logic.getRemainingWork(this, _work, speed, now - _timestamp) + max(0.0, _work - logic.getConsumedWork(this, _work, speed, now - _timestamp)) else 0.0 } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt new file mode 100644 index 00000000..1066777f --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt @@ -0,0 +1,19 @@ +package org.opendc.simulator.resources.interference + +import org.opendc.simulator.resources.SimResourceConsumer + +/** + * An interference domain represents a system of resources where [resource consumers][SimResourceConsumer] may incur + * performance variability due to operating on the same resources and therefore causing interference. + */ +public interface InterferenceDomain { + /** + * Compute the performance score of a participant in this interference domain. + * + * @param key The participant to obtain the score of or `null` if the participant has no key. + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the resource consumer, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(key: InterferenceKey?, load: Double): Double +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt new file mode 100644 index 00000000..8b12e7b4 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.interference + +/** + * A key that uniquely identifies a participant of an interference domain. + */ +public interface InterferenceKey -- cgit v1.2.3 From be34a55c2c2fe94a6883c6b97d2abe4c43288e8a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 23 Jun 2021 16:54:31 +0200 Subject: format: Remove performance interference from trace readers This change updates the trace reader implementation to remove their dependency on the performance interference model. In a future commit, we will instead pass the performance interference model via the host/hypervisor. --- .../experiments/capelin/ExperimentHelpers.kt | 49 +- .../org/opendc/experiments/capelin/Portfolio.kt | 34 +- .../capelin/env/ClusterEnvironmentReader.kt | 122 ++++ .../capelin/trace/ParquetTraceReader.kt | 65 +++ .../capelin/trace/RawParquetTraceReader.kt | 137 +++++ .../capelin/trace/Sc20ParquetTraceReader.kt | 83 --- .../capelin/trace/Sc20RawParquetTraceReader.kt | 149 ----- .../trace/Sc20StreamingParquetTraceReader.kt | 283 ---------- .../capelin/trace/Sc20TraceConverter.kt | 621 --------------------- .../capelin/trace/StreamingParquetTraceReader.kt | 261 +++++++++ .../experiments/capelin/trace/TraceConverter.kt | 620 ++++++++++++++++++++ .../experiments/capelin/trace/VmPlacementReader.kt | 52 ++ .../experiments/capelin/CapelinIntegrationTest.kt | 13 +- .../experiments/energy21/EnergyExperiment.kt | 5 +- .../org/opendc/format/environment/sc20/Model.kt | 67 --- .../sc20/Sc20ClusterEnvironmentReader.kt | 120 ---- .../environment/sc20/Sc20EnvironmentReader.kt | 97 ---- .../trace/PerformanceInterferenceModelReader.kt | 37 -- .../kotlin/org/opendc/format/trace/TraceReader.kt | 4 +- .../org/opendc/format/trace/VmPlacementReader.kt | 37 -- .../trace/sc20/PerformanceInterferenceEntry.kt | 7 - .../sc20/Sc20PerformanceInterferenceReader.kt | 65 --- .../opendc/format/trace/sc20/Sc20TraceReader.kt | 181 ------ .../format/trace/sc20/Sc20VmPlacementReader.kt | 51 -- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 28 +- 25 files changed, 1297 insertions(+), 1891 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 47f5f71e..06251dd3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -41,10 +41,8 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload @@ -53,7 +51,6 @@ import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.io.File import java.time.Clock import kotlin.coroutines.resume import kotlin.math.ln @@ -68,7 +65,7 @@ private val logger = KotlinLogging.logger {} /** * Construct the failure domain for the experiments. */ -public fun createFailureDomain( +fun createFailureDomain( coroutineScope: CoroutineScope, clock: Clock, seed: Int, @@ -100,7 +97,7 @@ public fun createFailureDomain( /** * Obtain the [FaultInjector] to use for the experiments. */ -public fun createFaultInjector( +fun createFaultInjector( coroutineScope: CoroutineScope, clock: Clock, random: Random, @@ -118,27 +115,10 @@ public fun createFaultInjector( ) } -/** - * Create the trace reader from which the VM workloads are read. - */ -public fun createTraceReader( - path: File, - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List, - seed: Int -): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - /** * Construct the environment for a simulated compute service.. */ -public suspend fun withComputeService( +suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, environmentReader: EnvironmentReader, @@ -182,15 +162,13 @@ public suspend fun withComputeService( * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public suspend fun withMonitor( +suspend fun withMonitor( monitor: ExperimentMonitor, clock: Clock, metricProducer: MetricProducer, scheduler: ComputeService, block: suspend CoroutineScope.() -> Unit ): Unit = coroutineScope { - val monitorJobs = mutableSetOf() - // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -211,24 +189,23 @@ public suspend fun withMonitor( try { block(this) } finally { - monitorJobs.forEach(Job::cancel) reader.close() monitor.close() } } -public class ComputeMetrics { - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var unscheduledVms: Int = 0 - public var finishedVms: Int = 0 +class ComputeMetrics { + var submittedVms: Int = 0 + var queuedVms: Int = 0 + var runningVms: Int = 0 + var unscheduledVms: Int = 0 + var finishedVms: Int = 0 } /** * Collect the metrics of the compute service. */ -public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { +fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { val metrics = metricProducer.collectAllMetrics().associateBy { it.name } val res = ComputeMetrics() try { @@ -247,7 +224,7 @@ public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { /** * Process the trace. */ -public suspend fun processTrace( +suspend fun processTrace( clock: Clock, reader: TraceReader, scheduler: ComputeService, @@ -306,7 +283,7 @@ public suspend fun processTrace( /** * Create a [MeterProvider] instance for the experiment. */ -public fun createMeterProvider(clock: Clock): MeterProvider { +fun createMeterProvider(clock: Clock): MeterProvider { val powerSelector = InstrumentSelector.builder() .setInstrumentNameRegex("power\\.usage") .setInstrumentType(InstrumentType.VALUE_RECORDER) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index b70eefb2..460da303 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -32,29 +32,27 @@ import org.opendc.compute.service.scheduler.* import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.* +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.PerformanceInterferenceModelReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation import java.io.File import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. * * @param name The name of the portfolio. */ -public abstract class Portfolio(name: String) : Experiment(name) { +abstract class Portfolio(name: String) : Experiment(name) { /** * The logger for this portfolio instance. */ @@ -70,35 +68,30 @@ public abstract class Portfolio(name: String) : Experiment(name) { */ private val vmPlacements by anyOf(emptyMap()) - /** - * The path to the performance interference model. - */ - private val performanceInterferenceModel by anyOf(null) - /** * The topology to test. */ - public abstract val topology: Topology + abstract val topology: Topology /** * The workload to test. */ - public abstract val workload: Workload + abstract val workload: Workload /** * The operational phenomenas to consider. */ - public abstract val operationalPhenomena: OperationalPhenomena + abstract val operationalPhenomena: OperationalPhenomena /** * The allocation policies to consider. */ - public abstract val allocationPolicy: String + abstract val allocationPolicy: String /** * A map of trace readers. */ - private val traceReaders = ConcurrentHashMap() + private val traceReaders = ConcurrentHashMap() /** * Perform a single trial for this portfolio. @@ -106,7 +99,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val environment = Sc20ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) + val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) val chan = Channel(Channel.CONFLATED) val allocationPolicy = createComputeScheduler(seeder) @@ -122,14 +115,11 @@ public abstract class Portfolio(name: String) : Experiment(name) { val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } - Sc20RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) + RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - val performanceInterferenceModel = performanceInterferenceModel - ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder.asKotlinRandom()) ?: emptyMap() - val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( File(config.getString("output-path")), diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt new file mode 100644 index 00000000..d73d14f5 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.env + +import org.opendc.format.environment.EnvironmentReader +import org.opendc.format.environment.MachineDef +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.LinearPowerModel +import java.io.File +import java.io.FileInputStream +import java.io.InputStream +import java.util.* + +/** + * A [EnvironmentReader] for the internal environment format. + * + * @param input The input stream describing the physical cluster. + */ +class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader { + /** + * Construct a [ClusterEnvironmentReader] for the specified [file]. + */ + constructor(file: File) : this(FileInputStream(file)) + + override fun read(): List { + var clusterIdCol = 0 + var speedCol = 0 + var numberOfHostsCol = 0 + var memoryPerHostCol = 0 + var coresPerHostCol = 0 + + var clusterIdx = 0 + var clusterId: String + var speed: Double + var numberOfHosts: Int + var memoryPerHost: Long + var coresPerHost: Int + + val nodes = mutableListOf() + val random = Random(0) + + input.bufferedReader().use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the file + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";") + + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + clusterIdCol = header["ClusterID"]!! + speedCol = header["Speed"]!! + numberOfHostsCol = header["numberOfHosts"]!! + memoryPerHostCol = header["memoryCapacityPerHost"]!! + coresPerHostCol = header["coreCountPerHost"]!! + return@forEachIndexed + } + + clusterIdx++ + clusterId = values[clusterIdCol].trim() + speed = values[speedCol].trim().toDouble() * 1000.0 + numberOfHosts = values[numberOfHostsCol].trim().toInt() + memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L + coresPerHost = values[coresPerHostCol].trim().toInt() + + val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) + val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) + + repeat(numberOfHosts) { + nodes.add( + MachineDef( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$it", + mapOf("cluster" to clusterId), + MachineModel( + List(coresPerHost) { coreId -> + ProcessingUnit(unknownProcessingNode, coreId, speed) + }, + listOf(unknownMemoryUnit) + ), + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearPowerModel(350.0, idlePower = 200.0) + ) + ) + } + } + } + + return nodes + } + + override fun close() { + input.close() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt new file mode 100644 index 00000000..2ebe65ea --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import org.opendc.experiments.capelin.model.CompositeWorkload +import org.opendc.experiments.capelin.model.Workload +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimWorkload + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param rawReaders The raw trace readers to use.. + * @param workload The workload to use. + * @param seed The seed to use for workload sampling. + */ +class ParquetTraceReader( + rawReaders: List, + workload: Workload, + seed: Int +) : TraceReader { + /** + * The iterator over the actual trace. + */ + private val iterator: Iterator> = + rawReaders + .map { it.read() } + .run { + if (workload is CompositeWorkload) { + this.zip(workload.workloads) + } else { + this.zip(listOf(workload)) + } + } + .map { sampleWorkload(it.first, workload, it.second, seed) } + .flatten() + .iterator() + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt new file mode 100644 index 00000000..94193780 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import org.apache.avro.generic.GenericData +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import java.io.File +import java.util.UUID + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param path The directory of the traces. + */ +class RawParquetTraceReader(private val path: File) { + /** + * Read the fragments into memory. + */ + private fun parseFragments(path: File): Map> { + val reader = LocalParquetReader(File(path, "trace.parquet")) + + val fragments = mutableMapOf>() + + return try { + while (true) { + val record = reader.read() ?: break + + val id = record["id"].toString() + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + + val fragment = SimTraceWorkload.Fragment( + duration, + cpuUsage, + cores + ) + + fragments.getOrPut(id) { mutableListOf() }.add(fragment) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(path: File, fragments: Map>): List> { + val metaReader = LocalParquetReader(File(path, "meta.parquet")) + + var counter = 0 + val entries = mutableListOf>() + + return try { + while (true) { + val record = metaReader.read() ?: break + + val id = record["id"].toString() + if (!fragments.containsKey(id)) { + continue + } + + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val vmFragments = fragments.getValue(id).asSequence() + val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs + val workload = SimTraceWorkload(vmFragments) + entries.add( + TraceEntry( + uid, id, submissionTime, workload, + mapOf( + "submit-time" to submissionTime, + "end-time" to endTime, + "total-load" to totalLoad, + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) + ) + ) + } + + entries + } catch (e: Exception) { + e.printStackTrace() + throw e + } finally { + metaReader.close() + } + } + + /** + * The entries in the trace. + */ + private val entries: List> + + init { + val fragments = parseFragments(path) + entries = parseMeta(path, fragments) + } + + /** + * Read the entries in the trace. + */ + fun read(): List> = entries +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt deleted file mode 100644 index 7f25137e..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.Workload -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.TreeSet - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param reader The internal trace reader to use. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - * @param run The run to which this reader belongs. - */ -public class Sc20ParquetTraceReader( - rawReaders: List, - performanceInterferenceModel: Map, - workload: Workload, - seed: Int -) : TraceReader { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .map { sampleWorkload(it.first, workload, it.second, seed) } - .flatten() - .run { - // Apply performance interference model - if (performanceInterferenceModel.isEmpty()) - this - else { - map { entry -> - val id = entry.name - val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - - entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) - } - } - } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt deleted file mode 100644 index 54151c9f..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.util.UUID - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param path The directory of the traces. - */ -public class Sc20RawParquetTraceReader(private val path: File) { - /** - * Read the fragments into memory. - */ - private fun parseFragments(path: File): Map> { - val reader = LocalParquetReader(File(path, "trace.parquet")) - - val fragments = mutableMapOf>() - - return try { - while (true) { - val record = reader.read() ?: break - - val id = record["id"].toString() - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - fragments.getOrPut(id) { mutableListOf() }.add(fragment) - } - - fragments - } finally { - reader.close() - } - } - - /** - * Read the metadata into a workload. - */ - private fun parseMeta(path: File, fragments: Map>): List> { - val metaReader = LocalParquetReader(File(path, "meta.parquet")) - - var counter = 0 - val entries = mutableListOf>() - - return try { - while (true) { - val record = metaReader.read() ?: break - - val id = record["id"].toString() - if (!fragments.containsKey(id)) { - continue - } - - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumOf { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val workload = SimTraceWorkload(vmFragments) - entries.add( - TraceEntry( - uid, id, submissionTime, workload, - mapOf( - "submit-time" to submissionTime, - "end-time" to endTime, - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - } - - entries - } catch (e: Exception) { - e.printStackTrace() - throw e - } finally { - metaReader.close() - } - } - - /** - * The entries in the trace. - */ - private val entries: List> - - init { - val fragments = parseFragments(path) - entries = parseMeta(path, fragments) - } - - /** - * Read the entries in the trace. - */ - public fun read(): List> = entries - - /** - * Create a [TraceReader] instance. - */ - public fun createReader(): TraceReader { - return object : TraceReader, Iterator> by entries.iterator() { - override fun close() {} - } - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt deleted file mode 100644 index 6792c2ab..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalInputFile -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -public class Sc20StreamingParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel? = null, - selectedVms: List = emptyList(), - random: Random -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - val reader = AvroParquetReader - .builder(LocalInputFile(File(traceFile, "trace.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf() - val entries = mutableMapOf() - val buffers = mutableMapOf>>() - - val metaReader = AvroParquetReader - .builder(LocalInputFile(File(traceFile, "meta.parquet"))) - .disableCompatibility() - .withFilter(filter) - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = selectedVms.ifEmpty { entries.keys } - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info("Processing VM $id") - - val internalBuffer = mutableListOf() - val externalBuffer = mutableListOf() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - if (performanceInterferenceModel != null) - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) - else - null - val workload = SimTraceWorkload(fragments) - val meta = mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - - TraceEntry( - uid, id, submissionTime, workload, - if (performanceInterferenceModel != null) - meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any) - else - meta - ) - } - .sortedBy { it.start } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } -} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt deleted file mode 100644 index d0031a66..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.long -import me.tongfei.progressbar.ProgressBar -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.sc20.Sc20VmPlacementReader -import org.opendc.format.util.LocalOutputFile -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.Random -import kotlin.math.max -import kotlin.math.min - -/** - * Represents the command for converting traces - */ -public class TraceConverterCli : CliktCommand(name = "trace-converter") { - /** - * The directory where the trace should be stored. - */ - private val outputPath by option("-O", "--output", help = "path to store the trace") - .file(canBeFile = false, mustExist = false) - .defaultLazy { File("output") } - - /** - * The directory where the input trace is located. - */ - private val inputPath by argument("input", help = "path to the input trace") - .file(canBeFile = false) - - /** - * The input type of the trace. - */ - private val type by option("-t", "--type", help = "input type of trace").groupChoice( - "solvinity" to SolvinityConversion(), - "bitbrains" to BitbrainsConversion(), - "azure" to AzureConversion() - ) - - override fun run() { - val metaSchema = SchemaBuilder - .record("meta") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("submissionTime").type().longType().noDefault() - .name("endTime").type().longType().noDefault() - .name("maxCores").type().intType().noDefault() - .name("requiredMemory").type().longType().noDefault() - .endRecord() - val schema = SchemaBuilder - .record("trace") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("cores").type().intType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("flops").type().longType().noDefault() - .endRecord() - - val metaParquet = File(outputPath, "meta.parquet") - val traceParquet = File(outputPath, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) - .withSchema(metaSchema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - try { - val type = type ?: throw IllegalArgumentException("Invalid trace conversion") - val allFragments = type.read(inputPath, metaSchema, metaWriter) - allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) - - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) - - writer.write(record) - } - } finally { - writer.close() - metaWriter.close() - } - } -} - -/** - * The supported trace conversions. - */ -public sealed class TraceConversion(name: String) : OptionGroup(name) { - /** - * Read the fragments of the trace. - */ - public abstract fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList -} - -public class SolvinityConversion : TraceConversion("Solvinity") { - private val clusters by option() - .split(",") - - private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") - .file(canBeDir = false) - .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } - .required() - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val clusters = clusters?.toSet() ?: emptySet() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - // Identify start time of the entire trace - var minTimestamp = Long.MAX_VALUE - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach file@{ vmFile -> - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val values = line.split("\t") - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - if (timestamp < minTimestamp) { - minTimestamp = timestamp - } - return@file - } - } - } - } - - println("Start of trace at $minTimestamp") - - val allFragments = mutableListOf() - - val begin = 15 * 24 * 60 * 60 * 1000L - val end = 45 * 24 * 60 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split("\t") - - vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val timestamp = - (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp - if (begin > timestamp || timestamp > end) { - continue - } - - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - if (minTime in begin until end) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - } - - return allFragments - } -} - -/** - * Conversion of the Bitbrains public trace. - */ -public class BitbrainsConversion : TraceConversion("Bitbrains") { - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -/** - * Conversion of the Azure public VM trace. - */ -public class AzureConversion : TraceConversion("Azure") { - private val seed by option(help = "seed for trace sampling") - .long() - .default(0) - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val random = Random(seed) - val fraction = 0.01 - - // Read VM table - val vmIdTableCol = 0 - val coreTableCol = 9 - val provisionedMemoryTableCol = 10 - - var vmId: String - var cores: Int - var requiredMemory: Long - - val vmIds = mutableSetOf() - val vmIdToMetadata = mutableMapOf() - - BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> - reader.lineSequence() - .chunked(1024) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - // Sample only a fraction of the VMs - if (random.nextDouble() > fraction) { - continue - } - - val values = line.split(",") - - // Exclude VMs with a large number of cores (not specified exactly) - if (values[coreTableCol].contains(">")) { - continue - } - - vmId = values[vmIdTableCol].trim() - cores = values[coreTableCol].trim().toInt() - requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB - - vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) - } - } - } - - // Read VM metric reading files - val timestampCol = 0 - val vmIdCol = 1 - val cpuUsageCol = 4 - val traceInterval = 5 * 60 * 1000L - - val vmIdToFragments = mutableMapOf>() - val vmIdToLastFragment = mutableMapOf() - val allFragments = mutableListOf() - - for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { - val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") - var timestamp: Long - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(",") - vmId = values[vmIdCol].trim() - - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { - continue - } - - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] - - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { - Fragment( - vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() - } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) - } - fragment - } - } - } - } - } - - for (entry in vmIdToLastFragment) { - if (entry.value != null) { - if (vmIdToFragments[entry.key] == null) { - vmIdToFragments[entry.key] = mutableListOf() - } - vmIdToFragments[entry.key]!!.add(entry.value!!) - } - } - - println("Read ${vmIdToLastFragment.size} VMs") - - for (entry in vmIdToMetadata) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.key) - metaRecord.put("submissionTime", entry.value.minTime) - metaRecord.put("endTime", entry.value.maxTime) - println("${entry.value.minTime} - ${entry.value.maxTime}") - metaRecord.put("maxCores", entry.value.cores) - metaRecord.put("requiredMemory", entry.value.requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -public data class Fragment( - public val id: String, - public val tick: Long, - public val flops: Long, - public val duration: Long, - public val usage: Double, - public val cores: Int -) - -public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) - -/** - * A script to convert a trace in text format into a Parquet trace. - */ -public fun main(args: Array): Unit = TraceConverterCli().main(args) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt new file mode 100644 index 00000000..a3b45f47 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.io.api.Binary +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalInputFile +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. + * + * @param traceFile The directory of the traces. + * @param selectedVms The list of VMs to read from the trace. + */ +class StreamingParquetTraceReader(traceFile: File, selectedVms: List = emptyList()) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * The intermediate buffer to store the read records in. + */ + private val queue = ArrayBlockingQueue>(1024) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get( + FilterApi.userDefined( + FilterApi.binaryColumn("id"), + SelectedVmFilter( + TreeSet(selectedVms) + ) + ) + ) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "trace.parquet"))) + .disableCompatibility() + .withFilter(filter) + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + + val fragment = SimTraceWorkload.Fragment( + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map>>) { + if (!hasNext) { + return + } + + val fragments = mutableListOf>() + queue.drainTo(fragments) + + for ((id, fragment) in fragments) { + if (id == poison.first) { + hasNext = false + return + } + buffers[id]?.forEach { it.add(fragment) } + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val takenIds = mutableSetOf() + val entries = mutableMapOf() + val buffers = mutableMapOf>>() + + val metaReader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "meta.parquet"))) + .disableCompatibility() + .withFilter(filter) + .build() + + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + entries[id] = record + } + + metaReader.close() + + val selection = selectedVms.ifEmpty { entries.keys } + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + logger.info("Processing VM $id") + + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence { + var time = submissionTime + repeat@ while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } + } + + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() + + for (fragment in internalBuffer) { + yield(fragment) + + time += fragment.duration + if (time >= endTime) { + break@repeat + } + } + + internalBuffer.clear() + } + + buffers.remove(id) + } + val workload = SimTraceWorkload(fragments) + val meta = mapOf( + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) + + TraceEntry(uid, id, submissionTime, workload, meta) + } + .sortedBy { it.start } + .toList() + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() { + readerThread.interrupt() + } + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt new file mode 100644 index 00000000..7cd1f159 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -0,0 +1,620 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.long +import me.tongfei.progressbar.ProgressBar +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.format.util.LocalOutputFile +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.Random +import kotlin.math.max +import kotlin.math.min + +/** + * Represents the command for converting traces + */ +class TraceConverterCli : CliktCommand(name = "trace-converter") { + /** + * The directory where the trace should be stored. + */ + private val outputPath by option("-O", "--output", help = "path to store the trace") + .file(canBeFile = false, mustExist = false) + .defaultLazy { File("output") } + + /** + * The directory where the input trace is located. + */ + private val inputPath by argument("input", help = "path to the input trace") + .file(canBeFile = false) + + /** + * The input type of the trace. + */ + private val type by option("-t", "--type", help = "input type of trace").groupChoice( + "solvinity" to SolvinityConversion(), + "bitbrains" to BitbrainsConversion(), + "azure" to AzureConversion() + ) + + override fun run() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("org.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("org.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val metaParquet = File(outputPath, "meta.parquet") + val traceParquet = File(outputPath, "trace.parquet") + + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } + + val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + try { + val type = type ?: throw IllegalArgumentException("Invalid trace conversion") + val allFragments = type.read(inputPath, metaSchema, metaWriter) + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + } finally { + writer.close() + metaWriter.close() + } + } +} + +/** + * The supported trace conversions. + */ +sealed class TraceConversion(name: String) : OptionGroup(name) { + /** + * Read the fragments of the trace. + */ + abstract fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList +} + +class SolvinityConversion : TraceConversion("Solvinity") { + private val clusters by option() + .split(",") + + private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") + .file(canBeDir = false) + .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } } + .required() + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val clusters = clusters?.toSet() ?: emptySet() + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + // Identify start time of the entire trace + var minTimestamp = Long.MAX_VALUE + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach file@{ vmFile -> + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + + val values = line.split("\t") + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + if (timestamp < minTimestamp) { + minTimestamp = timestamp + } + return@file + } + } + } + } + + println("Start of trace at $minTimestamp") + + val allFragments = mutableListOf() + + val begin = 15 * 24 * 60 * 60 * 1000L + val end = 45 * 24 * 60 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores: Int + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split("\t") + + vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + + val timestamp = + (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp + if (begin > timestamp || timestamp > end) { + continue + } + + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + if (minTime in begin until end) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + } + + return allFragments + } +} + +/** + * Conversion of the Bitbrains public trace. + */ +class BitbrainsConversion : TraceConversion("Bitbrains") { + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val timestampCol = 0 + val cpuUsageCol = 3 + val coreCol = 1 + val provisionedMemoryCol = 5 + val traceInterval = 5 * 60 * 1000L + + val allFragments = mutableListOf() + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores: Int + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .drop(1) + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(";\t") + + vmId = vmFile.name + + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + cores = values[coreCol].trim().toInt() + val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB + requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments + } +} + +/** + * Conversion of the Azure public VM trace. + */ +class AzureConversion : TraceConversion("Azure") { + private val seed by option(help = "seed for trace sampling") + .long() + .default(0) + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val random = Random(seed) + val fraction = 0.01 + + // Read VM table + val vmIdTableCol = 0 + val coreTableCol = 9 + val provisionedMemoryTableCol = 10 + + var vmId: String + var cores: Int + var requiredMemory: Long + + val vmIds = mutableSetOf() + val vmIdToMetadata = mutableMapOf() + + BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + // Sample only a fraction of the VMs + if (random.nextDouble() > fraction) { + continue + } + + val values = line.split(",") + + // Exclude VMs with a large number of cores (not specified exactly) + if (values[coreTableCol].contains(">")) { + continue + } + + vmId = values[vmIdTableCol].trim() + cores = values[coreTableCol].trim().toInt() + requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB + + vmIds.add(vmId) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) + } + } + } + + // Read VM metric reading files + val timestampCol = 0 + val vmIdCol = 1 + val cpuUsageCol = 4 + val traceInterval = 5 * 60 * 1000L + + val vmIdToFragments = mutableMapOf>() + val vmIdToLastFragment = mutableMapOf() + val allFragments = mutableListOf() + + for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { + val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") + var timestamp: Long + var cpuUsage: Double + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(",") + vmId = values[vmIdCol].trim() + + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } + + timestamp = values[timestampCol].trim().toLong() * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + Fragment( + vmId, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() + } + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) + } + fragment + } + } + } + } + } + + for (entry in vmIdToLastFragment) { + if (entry.value != null) { + if (vmIdToFragments[entry.key] == null) { + vmIdToFragments[entry.key] = mutableListOf() + } + vmIdToFragments[entry.key]!!.add(entry.value!!) + } + } + + println("Read ${vmIdToLastFragment.size} VMs") + + for (entry in vmIdToMetadata) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", entry.key) + metaRecord.put("submissionTime", entry.value.minTime) + metaRecord.put("endTime", entry.value.maxTime) + println("${entry.value.minTime} - ${entry.value.maxTime}") + metaRecord.put("maxCores", entry.value.cores) + metaRecord.put("requiredMemory", entry.value.requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments + } +} + +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int +) + +class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) + +/** + * A script to convert a trace in text format into a Parquet trace. + */ +fun main(args: Array): Unit = TraceConverterCli().main(args) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt new file mode 100644 index 00000000..fb641f1b --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream + +/** + * A parser for the JSON VM placement data files used for the SC20 paper. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +public class VmPlacementReader( + private val input: InputStream, + private val mapper: ObjectMapper = jacksonObjectMapper() +) : AutoCloseable { + /** + * Read the VM placements. + */ + fun read(): Map { + return mapper.readValue>(input) + .mapKeys { "vm__workload__${it.key}.txt" } + .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 + } + + override fun close() { + input.close() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 4b21b4f7..beaa798f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,12 +34,12 @@ import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation @@ -161,9 +161,8 @@ class CapelinIntegrationTest { * Obtain the trace reader for the test. */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader { - return Sc20ParquetTraceReader( - listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), - emptyMap(), + return ParquetTraceReader( + listOf(RawParquetTraceReader(File("src/test/resources/trace"))), Workload("test", fraction), seed ) @@ -174,7 +173,7 @@ class CapelinIntegrationTest { */ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") - return Sc20ClusterEnvironmentReader(stream) + return ClusterEnvironmentReader(stream) } class TestExperimentReporter : ExperimentMonitor { diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index 28928dcb..8fc4f6b8 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -38,7 +38,7 @@ import org.opendc.compute.service.scheduler.weights.RandomWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader +import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider @@ -53,7 +53,6 @@ import org.opendc.simulator.resources.SimResourceInterpreter import java.io.File import java.time.Clock import java.util.* -import kotlin.random.asKotlinRandom /** * Experiments for the OpenDC project on Energy modeling. @@ -88,7 +87,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { val meterProvider: MeterProvider = createMeterProvider(clock) val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) - val trace = Sc20StreamingParquetTraceReader(File(config.getString("trace-path"), trace), random = Random(1).asKotlinRandom()) + val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt deleted file mode 100644 index 58af8453..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - * @property memories The memories in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List, val memories: List) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt deleted file mode 100644 index 1efd2ddf..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel -import java.io.File -import java.io.FileInputStream -import java.io.InputStream -import java.util.* - -/** - * A [EnvironmentReader] for the internal environment format. - * - * @param environmentFile The file describing the physical cluster. - */ -public class Sc20ClusterEnvironmentReader( - private val input: InputStream -) : EnvironmentReader { - - public constructor(file: File) : this(FileInputStream(file)) - - public override fun read(): List { - var clusterIdCol = 0 - var speedCol = 0 - var numberOfHostsCol = 0 - var memoryPerHostCol = 0 - var coresPerHostCol = 0 - - var clusterIdx: Int = 0 - var clusterId: String - var speed: Double - var numberOfHosts: Int - var memoryPerHost: Long - var coresPerHost: Int - - val nodes = mutableListOf() - val random = Random(0) - - input.bufferedReader().use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the file - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(";") - - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - clusterIdCol = header["ClusterID"]!! - speedCol = header["Speed"]!! - numberOfHostsCol = header["numberOfHosts"]!! - memoryPerHostCol = header["memoryCapacityPerHost"]!! - coresPerHostCol = header["coreCountPerHost"]!! - return@forEachIndexed - } - - clusterIdx++ - clusterId = values[clusterIdCol].trim() - speed = values[speedCol].trim().toDouble() * 1000.0 - numberOfHosts = values[numberOfHostsCol].trim().toInt() - memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L - coresPerHost = values[coresPerHostCol].trim().toInt() - - val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) - val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) - - repeat(numberOfHosts) { - nodes.add( - MachineDef( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$it", - mapOf("cluster" to clusterId), - MachineModel( - List(coresPerHost) { coreId -> - ProcessingUnit(unknownProcessingNode, coreId, speed) - }, - listOf(unknownMemoryUnit) - ), - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, idlePower = 200.0) - ) - ) - } - } - } - - return nodes - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt deleted file mode 100644 index 9b77702e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - val memories = machine.memories.map { id -> - when (id) { - 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, memories), - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, idlePower = 200.0) - ) - } - } - } - } - } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt deleted file mode 100644 index f30e64cf..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.io.Closeable -import kotlin.random.Random - -/** - * An interface for reading descriptions of performance interference models into memory. - */ -public interface PerformanceInterferenceModelReader : Closeable { - /** - * Construct a [PerformanceInterferenceModel]. - */ - public fun construct(random: Random): Map -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt index 7df1acd3..797a88d5 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -22,8 +22,6 @@ package org.opendc.format.trace -import java.io.Closeable - /** * An interface for reading workloads into memory. * @@ -31,4 +29,4 @@ import java.io.Closeable * * @param T The shape of the workloads supported by this reader. */ -public interface TraceReader : Iterator>, Closeable +public interface TraceReader : Iterator>, AutoCloseable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt deleted file mode 100644 index 6861affe..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import java.io.Closeable - -/** - * An interface for reading VM placement data into memory. - */ -public interface VmPlacementReader : Closeable { - /** - * Construct a map of VMs to clusters. - */ - public fun construct(): Map -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt deleted file mode 100644 index 0da1f7c2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt +++ /dev/null @@ -1,7 +0,0 @@ -package org.opendc.format.trace.sc20 - -internal data class PerformanceInterferenceEntry( - val vms: List, - val minServerLoad: Double, - val performanceScore: Double -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt deleted file mode 100644 index 4267737d..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.PerformanceInterferenceModelReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.io.InputStream -import java.util.* -import kotlin.random.Random - -/** - * A parser for the JSON performance interference setup files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - PerformanceInterferenceModelReader { - /** - * The computed value from the file. - */ - private val items: Map> - - init { - val entries: List = mapper.readValue(input) - val res = mutableMapOf>() - for (entry in entries) { - val item = PerformanceInterferenceModel.Item(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore) - for (workload in entry.vms) { - res.computeIfAbsent(workload) { TreeSet() }.add(item) - } - } - - items = res - } - - override fun construct(random: Random): Map { - return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt deleted file mode 100644 index 1eb4bac2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -public class Sc20TraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List, - random: Random -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - val vms = if (selectedVms.isEmpty()) { - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - } else { - selectedVms.map { - File(traceDirectory, it) - } - } - - vms - .forEachIndexed { idx, vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var timestamp: Long - var cores = -1 - var minTime = Long.MAX_VALUE - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEach { line -> - val values = line.split(" ") - - vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - } - } - - val flopsFragments = sequence { - var last: SimTraceWorkload.Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(" ") - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) { - val oldFragment = last!! - SimTraceWorkload.Fragment( - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - - if (last != null) { - yield(last!!) - } - } - } - - val uuid = UUID(0, idx.toLong()) - - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), - Random(random.nextInt()) - ) - val workload = SimTraceWorkload(flopsFragments.asSequence()) - entries[uuid] = TraceEntry( - uuid, - vmId, - minTime, - workload, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt deleted file mode 100644 index 61bdea60..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.VmPlacementReader -import java.io.InputStream - -/** - * A parser for the JSON VM placement data files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - VmPlacementReader { - /** - * The environment that was read from the file. - */ - private val placements = mapper.readValue>(input) - - override fun construct(): Map { - return placements - .mapKeys { "vm__workload__${it.key}.txt" } - .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 - } - - override fun close() {} -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt index 09f7de35..f5d0b65e 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -47,10 +47,9 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File @@ -167,19 +166,7 @@ public class RunnerCli : CliktCommand(name = "runner") { tracePath, scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) ) - val traceReader = Sc20RawParquetTraceReader(traceDir) - val performanceInterferenceReader = let { - val path = File(traceDir, "performance-interference-model.json") - val operational = scenario.get("operational", Document::class.java) - val enabled = operational.getBoolean("performanceInterferenceEnabled") - - if (!enabled || !path.exists()) { - return@let null - } - - path.inputStream().use { Sc20PerformanceInterferenceReader(it) } - } - + val traceReader = RawParquetTraceReader(traceDir) val targets = portfolio.get("targets", Document::class.java) val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = topologyParser.read(topologyId) @@ -187,7 +174,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val results = (0 until targets.getInteger("repeatsPerScenario")).map { logger.info { "Starting repeat $it" } withTimeout(runTimeout * 1000) { - runRepeat(scenario, it, environment, traceReader, performanceInterferenceReader) + runRepeat(scenario, it, environment, traceReader) } } @@ -203,8 +190,7 @@ public class RunnerCli : CliktCommand(name = "runner") { scenario: Document, repeat: Int, environment: EnvironmentReader, - traceReader: Sc20RawParquetTraceReader, - performanceInterferenceReader: Sc20PerformanceInterferenceReader? + traceReader: RawParquetTraceReader, ): WebExperimentMonitor.Result { val monitor = WebExperimentMonitor() @@ -267,10 +253,8 @@ public class RunnerCli : CliktCommand(name = "runner") { else -> throw IllegalArgumentException("Unknown policy $policyName") } - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( + val trace = ParquetTraceReader( listOf(traceReader), - performanceInterferenceModel, Workload(workloadName, workloadFraction), seed ) -- cgit v1.2.3 From e56967a29ac2b2d26cc085b1f3e27096dad6a170 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 24 Jun 2021 12:54:52 +0200 Subject: simulator: Re-implement performance interference model This change updates reimplements the performance interference model to work on top of the universal resource model in `opendc-simulator-resources`. This enables us to model interference and performance variability of other resources such as disk or network in the future. --- .../src/main/kotlin/kotlin-conventions.gradle.kts | 1 + .../kotlin/org/opendc/compute/simulator/SimHost.kt | 43 +----- .../org/opendc/compute/simulator/SimHostTest.kt | 19 ++- .../experiments/capelin/ExperimentHelpers.kt | 7 +- .../org/opendc/experiments/capelin/Portfolio.kt | 11 +- .../capelin/trace/ParquetTraceReader.kt | 8 +- .../capelin/trace/PerformanceInterferenceReader.kt | 65 ++++++++ .../experiments/capelin/trace/VmPlacementReader.kt | 6 +- .../experiments/capelin/CapelinIntegrationTest.kt | 2 +- .../trace/PerformanceInterferenceReaderTest.kt | 47 ++++++ .../src/test/resources/perf-interference.json | 22 +++ .../format/trace/bitbrains/BitbrainsTraceReader.kt | 15 +- .../interference/PerformanceInterferenceModel.kt | 134 ---------------- .../compute/kernel/SimAbstractHypervisor.kt | 30 ++-- .../compute/kernel/SimFairShareHypervisor.kt | 12 +- .../kernel/SimFairShareHypervisorProvider.kt | 12 +- .../simulator/compute/kernel/SimHypervisor.kt | 10 +- .../compute/kernel/SimHypervisorProvider.kt | 4 + .../compute/kernel/SimSpaceSharedHypervisor.kt | 2 +- .../kernel/SimSpaceSharedHypervisorProvider.kt | 4 + .../kernel/interference/VmInterferenceDomain.kt | 43 ++++++ .../kernel/interference/VmInterferenceGroup.kt | 44 ++++++ .../kernel/interference/VmInterferenceModel.kt | 170 +++++++++++++++++++++ .../org/opendc/simulator/compute/SimMachineTest.kt | 12 +- .../simulator/compute/kernel/SimHypervisorTest.kt | 61 ++++++++ .../src/main/kotlin/org/opendc/runner/web/Main.kt | 29 +++- .../service/WorkflowServiceIntegrationTest.kt | 6 +- 27 files changed, 575 insertions(+), 244 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt diff --git a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts index 7fda64a2..6e4cab89 100644 --- a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts @@ -40,4 +40,5 @@ java { tasks.withType().configureEach { kotlinOptions.jvmTarget = Libs.jvmTarget.toString() kotlinOptions.freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn" + kotlinOptions.freeCompilerArgs += "-Xjvm-default=all" } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index be7bc667..5ea577f3 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -31,17 +31,15 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimHypervisorProvider import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver -import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.failures.FailureDomain import org.opendc.simulator.resources.SimResourceInterpreter @@ -61,24 +59,11 @@ public class SimHost( interpreter: SimResourceInterpreter, meter: Meter, hypervisor: SimHypervisorProvider, - scalingGovernor: ScalingGovernor, - scalingDriver: PowerDriver, + scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), + powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), + interferenceDomain: VmInterferenceDomain? = null ) : Host, FailureDomain, AutoCloseable { - - public constructor( - uid: UUID, - name: String, - model: MachineModel, - meta: Map, - context: CoroutineContext, - interpreter: SimResourceInterpreter, - meter: Meter, - hypervisor: SimHypervisorProvider, - powerModel: PowerModel = ConstantPowerModel(0.0), - mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, interpreter, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper) - /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ @@ -102,13 +87,15 @@ public class SimHost( /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, scalingDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver) /** * The hypervisor to run multiple workloads. */ public val hypervisor: SimHypervisor = hypervisor.create( interpreter, + scalingGovernor = scalingGovernor, + interferenceDomain = interferenceDomain, listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, @@ -260,7 +247,7 @@ public class SimHost( } require(canFit(server)) { "Server does not fit" } - val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) + val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel(), server.name)) guests[server] = guest _guests.add(1) @@ -317,23 +304,11 @@ public class SimHost( } private fun onGuestStart(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStart(vm.server.image.name) - } - } - _activeGuests.add(1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } private fun onGuestStop(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStop(vm.server.image.name) - } - } - _activeGuests.add(-1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } @@ -350,8 +325,6 @@ public class SimHost( * A virtual machine instance that the driver manages. */ private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - var state: ServerState = ServerState.TERMINATED suspend fun start() { diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5414d042..5a6fb03d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -33,11 +33,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider @@ -50,7 +46,7 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.util.UUID +import java.util.* import kotlin.coroutines.resume /** @@ -85,7 +81,16 @@ internal class SimHostTest { .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val virtDriver = SimHost( + uid = UUID.randomUUID(), + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + interpreter, + meterProvider.get("opendc-compute-simulator"), + SimFairShareHypervisorProvider() + ) val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 06251dd3..9548253d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -44,6 +44,8 @@ import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector @@ -123,6 +125,7 @@ suspend fun withComputeService( meterProvider: MeterProvider, environmentReader: EnvironmentReader, scheduler: ComputeScheduler, + interferenceModel: VmInterferenceModel? = null, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val interpreter = SimResourceInterpreter(coroutineContext, clock) @@ -138,7 +141,8 @@ suspend fun withComputeService( interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), - def.powerModel + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() ) } @@ -161,7 +165,6 @@ suspend fun withComputeService( /** * Attach the specified monitor to the VM provisioner. */ -@OptIn(ExperimentalCoroutinesApi::class) suspend fun withMonitor( monitor: ExperimentMonitor, clock: Clock, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 460da303..cbb5bfd9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -39,11 +39,14 @@ import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import java.io.File +import java.io.FileInputStream import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -119,6 +122,12 @@ abstract class Portfolio(name: String) : Experiment(name) { } } + val performanceInterferenceModel = if (operationalPhenomena.hasInterference) + PerformanceInterferenceReader(FileInputStream(config.getString("interference-model"))) + .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) } + else + null + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( @@ -127,7 +136,7 @@ abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 2ebe65ea..5ad75565 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -31,11 +31,11 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A [TraceReader] for the internal VM workload trace format. * - * @param rawReaders The raw trace readers to use.. - * @param workload The workload to use. - * @param seed The seed to use for workload sampling. + * @param rawReaders The internal raw trace readers to use. + * @param workload The workload to read. + * @param seed The seed to use for sampling. */ -class ParquetTraceReader( +public class ParquetTraceReader( rawReaders: List, workload: Workload, seed: Int diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt new file mode 100644 index 00000000..a19f5699 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class PerformanceInterferenceReader( + private val input: InputStream, + private val mapper: ObjectMapper = jacksonObjectMapper() +) : AutoCloseable { + init { + mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) + } + + /** + * Read the performance interface model from the input. + */ + fun read(): List { + return mapper.readValue(input) + } + + override fun close() { + input.close() + } + + private data class GroupMixin( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set, + ) +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt index fb641f1b..7a1683f0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt @@ -28,7 +28,7 @@ import com.fasterxml.jackson.module.kotlin.readValue import java.io.InputStream /** - * A parser for the JSON VM placement data files used for the SC20 paper. + * A parser for the JSON VM placement data files used for the TPDS article on Capelin. * * @param input The input stream to read from. * @param mapper The Jackson object mapper to use. @@ -38,9 +38,9 @@ public class VmPlacementReader( private val mapper: ObjectMapper = jacksonObjectMapper() ) : AutoCloseable { /** - * Read the VM placements. + * Read the VM placements from the input. */ - fun read(): Map { + public fun read(): Map { return mapper.readValue>(input) .mapKeys { "vm__workload__${it.key}.txt" } .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index beaa798f..08e04ddf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -172,7 +172,7 @@ class CapelinIntegrationTest { * Obtain the environment reader for the test. */ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt")) return ClusterEnvironmentReader(stream) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt new file mode 100644 index 00000000..9b1513dc --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll + +/** + * Test suite for the [PerformanceInterferenceReader] class. + */ +class PerformanceInterferenceReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) + val reader = PerformanceInterferenceReader(input) + + val result = reader.use { reader.read() } + + assertAll( + { assertEquals(2, result.size) }, + { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, + { assertEquals(0.0, result[0].targetLoad, 0.001) }, + { assertEquals(0.8830158730158756, result[0].score, 0.001) } + ) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json new file mode 100644 index 00000000..1be5852b --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json @@ -0,0 +1,22 @@ +[ + { + "vms": [ + "vm_a", + "vm_c", + "vm_x", + "vm_y" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "vm_a", + "vm_b", + "vm_c", + "vm_d" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 769b2b13..aaf8a240 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -24,8 +24,6 @@ package org.opendc.format.trace.bitbrains import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader @@ -38,12 +36,8 @@ import kotlin.math.min * A [TraceReader] for the public VM workload trace format. * * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ -public class BitbrainsTraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader { +public class BitbrainsTraceReader(traceDirectory: File) : TraceReader { /** * The internal iterator to use for this reader. */ @@ -123,12 +117,6 @@ public class BitbrainsTraceReader( val uuid = UUID(0L, vmId) - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) } - .toSortedSet() - ) - val workload = SimTraceWorkload(flopsHistory.asSequence()) entries[vmId] = TraceEntry( uuid, @@ -136,7 +124,6 @@ public class BitbrainsTraceReader( startTime, workload, mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to cores, "required-memory" to requiredMemory, "workload" to workload diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt deleted file mode 100644 index 4c409887..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.interference - -import java.util.* -import kotlin.random.Random - -/** - * Meta-data key for the [PerformanceInterferenceModel] of an image. - */ -public const val IMAGE_PERF_INTERFERENCE_MODEL: String = "image:performance-interference" - -/** - * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. - * - * @param items The [PerformanceInterferenceModel.Item]s that make up this model. - */ -public class PerformanceInterferenceModel( - public val items: SortedSet, - private val random: Random = Random(0) -) { - private var intersectingItems: List = emptyList() - private val colocatedWorkloads = TreeMap() - - /** - * Indicate that a VM has started. - */ - public fun onStart(name: String) { - colocatedWorkloads.merge(name, 1, Int::plus) - intersectingItems = items.filter { item -> doesMatch(item) } - } - - /** - * Indicate that a VM has stopped. - */ - public fun onStop(name: String) { - colocatedWorkloads.computeIfPresent(name) { _, v -> (v - 1).takeUnless { it == 0 } } - intersectingItems = items.filter { item -> doesMatch(item) } - } - - /** - * Compute the performance interference based on the current server load. - */ - public fun apply(currentServerLoad: Double): Double { - if (intersectingItems.isEmpty()) { - return 1.0 - } - val score = intersectingItems - .firstOrNull { it.minServerLoad <= currentServerLoad } - - // Apply performance penalty to (on average) only one of the VMs - return if (score != null && random.nextInt(score.workloadNames.size) == 0) { - score.performanceScore - } else { - 1.0 - } - } - - private fun doesMatch(item: Item): Boolean { - var count = 0 - for ( - name in item.workloadNames.subSet( - colocatedWorkloads.firstKey(), - colocatedWorkloads.lastKey() + "\u0000" - ) - ) { - count += colocatedWorkloads.getOrDefault(name, 0) - if (count > 1) - return true - } - return false - } - - /** - * Model describing how a specific set of workloads causes performance variability for each workload. - * - * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. - * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. - * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no - * influence, <1 means that performance degrades, and >1 means that performance improves. - */ - public data class Item( - public val workloadNames: SortedSet, - public val minServerLoad: Double, - public val performanceScore: Double - ) : Comparable { - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as Item - - if (workloadNames != other.workloadNames) return false - - return true - } - - override fun hashCode(): Int = workloadNames.hashCode() - - override fun compareTo(other: Item): Int { - var cmp = performanceScore.compareTo(other.performanceScore) - if (cmp != 0) { - return cmp - } - - cmp = minServerLoad.compareTo(other.minServerLoad) - if (cmp != 0) { - return cmp - } - - return hashCode().compareTo(other.hashCode()) - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index fb46dab4..d287312f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -23,9 +23,9 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* @@ -39,7 +39,8 @@ import org.opendc.simulator.resources.SimResourceSwitch */ public abstract class SimAbstractHypervisor( private val interpreter: SimResourceInterpreter, - private val scalingGovernor: ScalingGovernor? + private val scalingGovernor: ScalingGovernor? = null, + protected val interferenceDomain: VmInterferenceDomain? = null ) : SimHypervisor { /** * The machine on which the hypervisor runs. @@ -87,12 +88,9 @@ public abstract class SimAbstractHypervisor( return canFit(model, switch) } - override fun createMachine( - model: MachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? - ): SimMachine { + override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine { require(canFit(model)) { "Machine does not fit" } - val vm = VirtualMachine(model, performanceInterferenceModel) + val vm = VirtualMachine(model, interferenceId) _vms.add(vm) return vm } @@ -116,17 +114,18 @@ public abstract class SimAbstractHypervisor( /** * A virtual machine running on the hypervisor. * - * @property model The machine model of the virtual machine. - * @property performanceInterferenceModel The performance interference model to utilize. + * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine( - model: MachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimAbstractMachine(interpreter, parent = null, model) { + private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(interpreter, parent = null, model) { + /** + * The interference key of this virtual machine. + */ + private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) } + /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) } + override val cpus = model.cpus.map { VCpu(switch.newOutput(interferenceKey), it) } override fun close() { super.close() @@ -136,6 +135,9 @@ public abstract class SimAbstractHypervisor( } _vms.remove(this) + if (interferenceKey != null) { + interferenceDomain?.leave(interferenceKey) + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 2ce51ea6..17130d34 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -22,8 +22,10 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceInterpreter @@ -32,20 +34,22 @@ import org.opendc.simulator.resources.SimResourceSwitchMaxMin import org.opendc.simulator.resources.SimResourceSystem /** - * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single - * [SimBareMetalMachine] concurrently using weighted fair sharing. + * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] + * concurrently using weighted fair sharing. * * @param interpreter The interpreter to manage the machine's resources. * @param parent The parent simulation system. * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. + * @param interferenceDomain The resource interference domain to which the hypervisor belongs. * @param listener The hypervisor listener to use. */ public class SimFairShareHypervisor( private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null, scalingGovernor: ScalingGovernor? = null, + interferenceDomain: VmInterferenceDomain? = null, private val listener: SimHypervisor.Listener? = null -) : SimAbstractHypervisor(interpreter, scalingGovernor) { +) : SimAbstractHypervisor(interpreter, scalingGovernor, interferenceDomain) { override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean = true @@ -54,7 +58,7 @@ public class SimFairShareHypervisor( } private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem { - val switch = SimResourceSwitchMaxMin(interpreter, this) + val switch = SimResourceSwitchMaxMin(interpreter, this, interferenceDomain) override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index 542cd0d2..8d0592ec 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSystem @@ -34,6 +36,14 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override fun create( interpreter: SimResourceInterpreter, parent: SimResourceSystem?, + scalingGovernor: ScalingGovernor?, + interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? - ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener = listener) + ): SimHypervisor = SimFairShareHypervisor( + interpreter, + parent, + scalingGovernor = scalingGovernor, + interferenceDomain = interferenceDomain, + listener = listener + ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 40402f5c..e398ab36 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -23,13 +23,12 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload - * to a [SimBareMetalMachine]. + * to another [SimMachine]. */ public interface SimHypervisor : SimWorkload { /** @@ -46,12 +45,9 @@ public interface SimHypervisor : SimWorkload { * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. - * @param performanceInterferenceModel The performance interference model to use. + * @param interferenceId An identifier for the interference model. */ - public fun createMachine( - model: MachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? = null - ): SimMachine + public fun createMachine(model: MachineModel, interferenceId: String? = null): SimMachine /** * Event listener for hypervisor events. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index cafd1ffc..b307a34d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSystem @@ -43,6 +45,8 @@ public interface SimHypervisorProvider { public fun create( interpreter: SimResourceInterpreter, parent: SimResourceSystem? = null, + scalingGovernor: ScalingGovernor? = null, + interferenceDomain: VmInterferenceDomain? = null, listener: SimHypervisor.Listener? = null ): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index 3ceebb9a..ac1c0250 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -31,7 +31,7 @@ import org.opendc.simulator.resources.SimResourceSwitchExclusive /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter, null) { +public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) { override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index fb47d9e5..3906cb9a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSystem @@ -34,6 +36,8 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override fun create( interpreter: SimResourceInterpreter, parent: SimResourceSystem?, + scalingGovernor: ScalingGovernor?, + interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? ): SimHypervisor = SimSpaceSharedHypervisor(interpreter) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt new file mode 100644 index 00000000..1801fcd0 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +import org.opendc.simulator.resources.interference.InterferenceDomain +import org.opendc.simulator.resources.interference.InterferenceKey + +/** + * The interference domain of a hypervisor. + */ +public interface VmInterferenceDomain : InterferenceDomain { + /** + * Join this interference domain. + * + * @param id The identifier of the virtual machine. + */ + public fun join(id: String): InterferenceKey + + /** + * Leave this interference domain. + */ + public fun leave(key: InterferenceKey) +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt new file mode 100644 index 00000000..708ddede --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +/** + * A group of virtual machines that together can interfere when operating on the same resources, causing performance + * variability. + */ +public data class VmInterferenceGroup( + /** + * The minimum load of the host before the interference occurs. + */ + public val targetLoad: Double, + + /** + * A score in [0, 1] representing the performance variability as a result of resource interference. + */ + public val score: Double, + + /** + * The members of this interference group. + */ + public val members: Set +) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt new file mode 100644 index 00000000..c2e00c8e --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +import org.opendc.simulator.resources.interference.InterferenceKey +import java.util.* + +/** + * An interference model that models the resource interference between virtual machines on a host. + * + * @param groups The groups of virtual machines that interfere with each other. + * @param random The [Random] instance to select the affected virtual machines. + */ +public class VmInterferenceModel( + private val groups: List, + private val random: Random = Random(0) +) { + /** + * Construct a new [VmInterferenceDomain]. + */ + public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain { + /** + * The stateful groups of this domain. + */ + private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) } + + /** + * The set of keys active in this domain. + */ + private val keys = mutableSetOf() + + override fun join(id: String): InterferenceKey { + val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad }) + keys += key + return key + } + + override fun leave(key: InterferenceKey) { + if (key is InterferenceKeyImpl) { + keys -= key + key.leave() + } + } + + override fun apply(key: InterferenceKey?, load: Double): Double { + if (key == null || key !is InterferenceKeyImpl) { + return 1.0 + } + + val ctx = key.findGroup(load) + val group = ctx?.group + + // Apply performance penalty to (on average) only one of the VMs + return if (group != null && random.nextInt(group.members.size) == 0) { + group.score + } else { + 1.0 + } + } + + override fun toString(): String = "VmInterferenceDomain" + } + + /** + * An interference key. + * + * @param id The identifier of the member. + * @param groups The groups to which the key belongs. + */ + private inner class InterferenceKeyImpl(val id: String, private val groups: List) : InterferenceKey { + init { + for (group in groups) { + group.join(this) + } + } + + /** + * Find the active group that applies for the interference member. + */ + fun findGroup(load: Double): GroupContext? { + // Find the first active group whose target load is lower than the current load + val index = groups.binarySearchBy(load) { it.group.targetLoad } + val target = if (index >= 0) index else -(index + 1) + + // Check whether there are active groups ahead of the index + for (i in target until groups.size) { + val group = groups[i] + if (group.group.targetLoad > load) { + break + } else if (group.isActive) { + return group + } + } + + // Check whether there are active groups before the index + for (i in (target - 1) downTo 0) { + val group = groups[i] + if (group.isActive) { + return group + } + } + + return null + } + + /** + * Leave all the groups. + */ + fun leave() { + for (group in groups) { + group.leave(this) + } + } + } + + /** + * A group context is used to track the active keys per interference group. + */ + private inner class GroupContext(val group: VmInterferenceGroup) { + /** + * The active keys that are part of this group. + */ + private val keys = mutableSetOf() + + /** + * A flag to indicate that the group is active. + */ + val isActive + get() = keys.size > 1 + + /** + * Determine whether the specified [id] is part of this group. + */ + operator fun contains(id: String): Boolean = id in group.members + + /** + * Join this group with the specified [key]. + */ + fun join(key: InterferenceKeyImpl) { + keys += key + } + + /** + * Leave this group with the specified [key]. + */ + fun leave(key: InterferenceKeyImpl) { + keys -= key + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 892d5223..a6d955ca 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -24,12 +24,9 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertArrayEquals import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel @@ -157,8 +154,10 @@ class SimMachineTest { try { coroutineScope { launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - assertEquals(100.0, machine.psu.powerDraw) - assertEquals(100.0, source.powerDraw) + assertAll( + { assertEquals(100.0, machine.psu.powerDraw) }, + { assertEquals(100.0, source.powerDraw) } + ) } } finally { machine.close() @@ -284,6 +283,7 @@ class SimMachineTest { } } + @Test fun testDiskWriteUsage() = runBlockingSimulation { val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index 71d48a31..a61cba8d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -34,6 +34,8 @@ import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -223,4 +225,63 @@ internal class SimHypervisorTest { machine.close() } + + @Test + fun testInterference() = runBlockingSimulation { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val model = MachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + + val groups = listOf( + VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")), + VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")), + VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) + ) + val interferenceModel = VmInterferenceModel(groups) + + val platform = SimResourceInterpreter(coroutineContext, clock) + val machine = SimBareMetalMachine( + platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) + ) + val hypervisor = SimFairShareHypervisor(platform, interferenceDomain = interferenceModel.newDomain()) + + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + val workloadB = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) + ) + ) + + launch { + machine.run(hypervisor) + } + + coroutineScope { + launch { + val vm = hypervisor.createMachine(model, "a") + vm.run(workloadA) + vm.close() + } + val vm = hypervisor.createMachine(model, "b") + vm.run(workloadB) + vm.close() + } + + machine.close() + } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt index f5d0b65e..d0b97d90 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -48,12 +48,15 @@ import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File import kotlin.random.Random +import kotlin.random.asJavaRandom private val logger = KotlinLogging.logger {} @@ -61,7 +64,7 @@ private val logger = KotlinLogging.logger {} * Represents the CLI command for starting the OpenDC web runner. */ @OptIn(ExperimentalCoroutinesApi::class) -public class RunnerCli : CliktCommand(name = "runner") { +class RunnerCli : CliktCommand(name = "runner") { /** * The name of the database to use. */ @@ -167,14 +170,27 @@ public class RunnerCli : CliktCommand(name = "runner") { scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) ) val traceReader = RawParquetTraceReader(traceDir) + val interferenceGroups = let { + val path = File(traceDir, "performance-interference-model.json") + val operational = scenario.get("operational", Document::class.java) + val enabled = operational.getBoolean("performanceInterferenceEnabled") + + if (!enabled || !path.exists()) { + return@let null + } + + PerformanceInterferenceReader(path.inputStream()).use { reader -> reader.read() } + } + val targets = portfolio.get("targets", Document::class.java) val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = topologyParser.read(topologyId) - val results = (0 until targets.getInteger("repeatsPerScenario")).map { - logger.info { "Starting repeat $it" } + val results = (0 until targets.getInteger("repeatsPerScenario")).map { repeat -> + logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - runRepeat(scenario, it, environment, traceReader) + val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) } + runRepeat(scenario, repeat, environment, traceReader, interferenceModel) } } @@ -191,6 +207,7 @@ public class RunnerCli : CliktCommand(name = "runner") { repeat: Int, environment: EnvironmentReader, traceReader: RawParquetTraceReader, + interferenceModel: VmInterferenceModel? ): WebExperimentMonitor.Result { val monitor = WebExperimentMonitor() @@ -260,7 +277,7 @@ public class RunnerCli : CliktCommand(name = "runner") { ) val failureFrequency = if (operational.getBoolean("failuresEnabled", false)) 24.0 * 7 else 0.0 - withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler -> val failureDomain = if (failureFrequency > 0) { logger.debug { "ENABLING failures" } createFailureDomain( @@ -361,4 +378,4 @@ public class RunnerCli : CliktCommand(name = "runner") { /** * Main entry point of the runner. */ -public fun main(args: Array): Unit = RunnerCli().main(args) +fun main(args: Array): Unit = RunnerCli().main(args) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index 413112af..38c774a9 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -25,7 +25,6 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -57,7 +56,6 @@ import kotlin.math.max * Integration test suite for the [WorkflowServiceImpl]. */ @DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) internal class WorkflowServiceIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -70,7 +68,7 @@ internal class WorkflowServiceIntegrationTest { .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json"))) .use { it.read() } .map { def -> SimHost( @@ -106,7 +104,7 @@ internal class WorkflowServiceIntegrationTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) + val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf"))) var offset = Long.MIN_VALUE coroutineScope { -- cgit v1.2.3