diff options
47 files changed, 671 insertions, 855 deletions
diff --git a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts index 7fda64a2..6e4cab89 100644 --- a/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/kotlin-conventions.gradle.kts @@ -40,4 +40,5 @@ java { tasks.withType<KotlinCompile>().configureEach { kotlinOptions.jvmTarget = Libs.jvmTarget.toString() kotlinOptions.freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn" + kotlinOptions.freeCompilerArgs += "-Xjvm-default=all" } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index be7bc667..5ea577f3 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -31,17 +31,15 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimHypervisorProvider import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver -import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.failures.FailureDomain import org.opendc.simulator.resources.SimResourceInterpreter @@ -61,24 +59,11 @@ public class SimHost( interpreter: SimResourceInterpreter, meter: Meter, hypervisor: SimHypervisorProvider, - scalingGovernor: ScalingGovernor, - scalingDriver: PowerDriver, + scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), + powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), + interferenceDomain: VmInterferenceDomain? = null ) : Host, FailureDomain, AutoCloseable { - - public constructor( - uid: UUID, - name: String, - model: MachineModel, - meta: Map<String, Any>, - context: CoroutineContext, - interpreter: SimResourceInterpreter, - meter: Meter, - hypervisor: SimHypervisorProvider, - powerModel: PowerModel = ConstantPowerModel(0.0), - mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, interpreter, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper) - /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ @@ -102,13 +87,15 @@ public class SimHost( /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, scalingDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver) /** * The hypervisor to run multiple workloads. */ public val hypervisor: SimHypervisor = hypervisor.create( interpreter, + scalingGovernor = scalingGovernor, + interferenceDomain = interferenceDomain, listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, @@ -260,7 +247,7 @@ public class SimHost( } require(canFit(server)) { "Server does not fit" } - val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) + val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel(), server.name)) guests[server] = guest _guests.add(1) @@ -317,23 +304,11 @@ public class SimHost( } private fun onGuestStart(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStart(vm.server.image.name) - } - } - _activeGuests.add(1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } private fun onGuestStop(vm: Guest) { - guests.forEach { (_, guest) -> - if (guest.state == ServerState.RUNNING) { - vm.performanceInterferenceModel?.onStop(vm.server.image.name) - } - } - _activeGuests.add(-1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } @@ -350,8 +325,6 @@ public class SimHost( * A virtual machine instance that the driver manages. */ private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - var state: ServerState = ServerState.TERMINATED suspend fun start() { diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5414d042..5a6fb03d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -33,11 +33,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider @@ -50,7 +46,7 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.util.UUID +import java.util.* import kotlin.coroutines.resume /** @@ -85,7 +81,16 @@ internal class SimHostTest { .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val virtDriver = SimHost( + uid = UUID.randomUUID(), + name = "test", + model = machineModel, + meta = emptyMap(), + coroutineContext, + interpreter, + meterProvider.get("opendc-compute-simulator"), + SimFairShareHypervisorProvider() + ) val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 47f5f71e..9548253d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -41,11 +41,11 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector @@ -53,7 +53,6 @@ import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock -import java.io.File import java.time.Clock import kotlin.coroutines.resume import kotlin.math.ln @@ -68,7 +67,7 @@ private val logger = KotlinLogging.logger {} /** * Construct the failure domain for the experiments. */ -public fun createFailureDomain( +fun createFailureDomain( coroutineScope: CoroutineScope, clock: Clock, seed: Int, @@ -100,7 +99,7 @@ public fun createFailureDomain( /** * Obtain the [FaultInjector] to use for the experiments. */ -public fun createFaultInjector( +fun createFaultInjector( coroutineScope: CoroutineScope, clock: Clock, random: Random, @@ -119,30 +118,14 @@ public fun createFaultInjector( } /** - * Create the trace reader from which the VM workloads are read. - */ -public fun createTraceReader( - path: File, - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List<String>, - seed: Int -): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - -/** * Construct the environment for a simulated compute service.. */ -public suspend fun withComputeService( +suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, environmentReader: EnvironmentReader, scheduler: ComputeScheduler, + interferenceModel: VmInterferenceModel? = null, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val interpreter = SimResourceInterpreter(coroutineContext, clock) @@ -158,7 +141,8 @@ public suspend fun withComputeService( interpreter, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), - def.powerModel + powerDriver = SimplePowerDriver(def.powerModel), + interferenceDomain = interferenceModel?.newDomain() ) } @@ -181,16 +165,13 @@ public suspend fun withComputeService( /** * Attach the specified monitor to the VM provisioner. */ -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun withMonitor( +suspend fun withMonitor( monitor: ExperimentMonitor, clock: Clock, metricProducer: MetricProducer, scheduler: ComputeService, block: suspend CoroutineScope.() -> Unit ): Unit = coroutineScope { - val monitorJobs = mutableSetOf<Job>() - // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -211,24 +192,23 @@ public suspend fun withMonitor( try { block(this) } finally { - monitorJobs.forEach(Job::cancel) reader.close() monitor.close() } } -public class ComputeMetrics { - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var unscheduledVms: Int = 0 - public var finishedVms: Int = 0 +class ComputeMetrics { + var submittedVms: Int = 0 + var queuedVms: Int = 0 + var runningVms: Int = 0 + var unscheduledVms: Int = 0 + var finishedVms: Int = 0 } /** * Collect the metrics of the compute service. */ -public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { +fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { val metrics = metricProducer.collectAllMetrics().associateBy { it.name } val res = ComputeMetrics() try { @@ -247,7 +227,7 @@ public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { /** * Process the trace. */ -public suspend fun processTrace( +suspend fun processTrace( clock: Clock, reader: TraceReader<SimWorkload>, scheduler: ComputeService, @@ -306,7 +286,7 @@ public suspend fun processTrace( /** * Create a [MeterProvider] instance for the experiment. */ -public fun createMeterProvider(clock: Clock): MeterProvider { +fun createMeterProvider(clock: Clock): MeterProvider { val powerSelector = InstrumentSelector.builder() .setInstrumentNameRegex("power\\.usage") .setInstrumentType(InstrumentType.VALUE_RECORDER) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index b70eefb2..cbb5bfd9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -32,29 +32,30 @@ import org.opendc.compute.service.scheduler.* import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.* +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.PerformanceInterferenceModelReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import java.io.File +import java.io.FileInputStream import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. * * @param name The name of the portfolio. */ -public abstract class Portfolio(name: String) : Experiment(name) { +abstract class Portfolio(name: String) : Experiment(name) { /** * The logger for this portfolio instance. */ @@ -71,34 +72,29 @@ public abstract class Portfolio(name: String) : Experiment(name) { private val vmPlacements by anyOf(emptyMap<String, String>()) /** - * The path to the performance interference model. - */ - private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null) - - /** * The topology to test. */ - public abstract val topology: Topology + abstract val topology: Topology /** * The workload to test. */ - public abstract val workload: Workload + abstract val workload: Workload /** * The operational phenomenas to consider. */ - public abstract val operationalPhenomena: OperationalPhenomena + abstract val operationalPhenomena: OperationalPhenomena /** * The allocation policies to consider. */ - public abstract val allocationPolicy: String + abstract val allocationPolicy: String /** * A map of trace readers. */ - private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>() + private val traceReaders = ConcurrentHashMap<String, RawParquetTraceReader>() /** * Perform a single trial for this portfolio. @@ -106,7 +102,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingSimulation { val seeder = Random(repeat.toLong()) - val environment = Sc20ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) + val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = createComputeScheduler(seeder) @@ -122,14 +118,17 @@ public abstract class Portfolio(name: String) : Experiment(name) { val rawReaders = workloadNames.map { workloadName -> traceReaders.computeIfAbsent(workloadName) { logger.info { "Loading trace $workloadName" } - Sc20RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) + RawParquetTraceReader(File(config.getString("trace-path"), workloadName)) } } - val performanceInterferenceModel = performanceInterferenceModel - ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder.asKotlinRandom()) ?: emptyMap() - val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) + val performanceInterferenceModel = if (operationalPhenomena.hasInterference) + PerformanceInterferenceReader(FileInputStream(config.getString("interference-model"))) + .use { VmInterferenceModel(it.read(), Random(seeder.nextLong())) } + else + null + + val trace = ParquetTraceReader(rawReaders, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( File(config.getString("output-path")), @@ -137,7 +136,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt index 1efd2ddf..d73d14f5 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/env/ClusterEnvironmentReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.format.environment.sc20 +package org.opendc.experiments.capelin.env import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef @@ -37,22 +37,22 @@ import java.util.* /** * A [EnvironmentReader] for the internal environment format. * - * @param environmentFile The file describing the physical cluster. + * @param input The input stream describing the physical cluster. */ -public class Sc20ClusterEnvironmentReader( - private val input: InputStream -) : EnvironmentReader { +class ClusterEnvironmentReader(private val input: InputStream) : EnvironmentReader { + /** + * Construct a [ClusterEnvironmentReader] for the specified [file]. + */ + constructor(file: File) : this(FileInputStream(file)) - public constructor(file: File) : this(FileInputStream(file)) - - public override fun read(): List<MachineDef> { + override fun read(): List<MachineDef> { var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 var memoryPerHostCol = 0 var coresPerHostCol = 0 - var clusterIdx: Int = 0 + var clusterIdx = 0 var clusterId: String var speed: Double var numberOfHosts: Int @@ -116,5 +116,7 @@ public class Sc20ClusterEnvironmentReader( return nodes } - override fun close() {} + override fun close() { + input.close() + } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 7f25137e..5ad75565 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -26,21 +26,17 @@ import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload -import java.util.TreeSet /** * A [TraceReader] for the internal VM workload trace format. * - * @param reader The internal trace reader to use. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - * @param run The run to which this reader belongs. + * @param rawReaders The internal raw trace readers to use. + * @param workload The workload to read. + * @param seed The seed to use for sampling. */ -public class Sc20ParquetTraceReader( - rawReaders: List<Sc20RawParquetTraceReader>, - performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, +public class ParquetTraceReader( + rawReaders: List<RawParquetTraceReader>, workload: Workload, seed: Int ) : TraceReader<SimWorkload> { @@ -59,20 +55,6 @@ public class Sc20ParquetTraceReader( } .map { sampleWorkload(it.first, workload, it.second, seed) } .flatten() - .run { - // Apply performance interference model - if (performanceInterferenceModel.isEmpty()) - this - else { - map { entry -> - val id = entry.name - val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - - entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) - } - } - } .iterator() override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt index 4267737d..a19f5699 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,46 +20,46 @@ * SOFTWARE. */ -package org.opendc.format.trace.sc20 +package org.opendc.experiments.capelin.trace +import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.PerformanceInterferenceModelReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup import java.io.InputStream -import java.util.* -import kotlin.random.Random /** - * A parser for the JSON performance interference setup files used for the SC20 paper. + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. * * @param input The input stream to read from. * @param mapper The Jackson object mapper to use. */ -public class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - PerformanceInterferenceModelReader { - /** - * The computed value from the file. - */ - private val items: Map<String, TreeSet<PerformanceInterferenceModel.Item>> - +class PerformanceInterferenceReader( + private val input: InputStream, + private val mapper: ObjectMapper = jacksonObjectMapper() +) : AutoCloseable { init { - val entries: List<PerformanceInterferenceEntry> = mapper.readValue(input) - val res = mutableMapOf<String, TreeSet<PerformanceInterferenceModel.Item>>() - for (entry in entries) { - val item = PerformanceInterferenceModel.Item(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore) - for (workload in entry.vms) { - res.computeIfAbsent(workload) { TreeSet() }.add(item) - } - } + mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) + } - items = res + /** + * Read the performance interface model from the input. + */ + fun read(): List<VmInterferenceGroup> { + return mapper.readValue(input) } - override fun construct(random: Random): Map<String, PerformanceInterferenceModel> { - return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) } + override fun close() { + input.close() } - override fun close() {} + private data class GroupMixin( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set<String>, + ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index 54151c9f..94193780 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin.trace -import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -32,14 +31,12 @@ import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.util.UUID -private val logger = KotlinLogging.logger {} - /** * A [TraceReader] for the internal VM workload trace format. * * @param path The directory of the traces. */ -public class Sc20RawParquetTraceReader(private val path: File) { +class RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ @@ -136,14 +133,5 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<SimWorkload>> = entries - - /** - * Create a [TraceReader] instance. - */ - public fun createReader(): TraceReader<SimWorkload> { - return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() { - override fun close() {} - } - } + fun read(): List<TraceEntry<SimWorkload>> = entries } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt index 6792c2ab..a3b45f47 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/StreamingParquetTraceReader.kt @@ -33,8 +33,6 @@ import org.apache.parquet.io.api.Binary import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.format.util.LocalInputFile -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.File @@ -44,7 +42,6 @@ import java.util.TreeSet import java.util.UUID import java.util.concurrent.ArrayBlockingQueue import kotlin.concurrent.thread -import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -52,14 +49,9 @@ private val logger = KotlinLogging.logger {} * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. * * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + * @param selectedVms The list of VMs to read from the trace. */ -public class Sc20StreamingParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel? = null, - selectedVms: List<String> = emptyList(), - random: Random -) : TraceReader<SimWorkload> { +class StreamingParquetTraceReader(traceFile: File, selectedVms: List<String> = emptyList()) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ @@ -227,14 +219,6 @@ public class Sc20StreamingParquetTraceReader( buffers.remove(id) } - val relevantPerformanceInterferenceModelItems = - if (performanceInterferenceModel != null) - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) - else - null val workload = SimTraceWorkload(fragments) val meta = mapOf( "cores" to maxCores, @@ -242,13 +226,7 @@ public class Sc20StreamingParquetTraceReader( "workload" to workload ) - TraceEntry( - uid, id, submissionTime, workload, - if (performanceInterferenceModel != null) - meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any) - else - meta - ) + TraceEntry(uid, id, submissionTime, workload, meta) } .sortedBy { it.start } .toList() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index d0031a66..7cd1f159 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -41,7 +41,6 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.sc20.Sc20VmPlacementReader import org.opendc.format.util.LocalOutputFile import java.io.BufferedReader import java.io.File @@ -53,7 +52,7 @@ import kotlin.math.min /** * Represents the command for converting traces */ -public class TraceConverterCli : CliktCommand(name = "trace-converter") { +class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The directory where the trace should be stored. */ @@ -149,24 +148,24 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The supported trace conversions. */ -public sealed class TraceConversion(name: String) : OptionGroup(name) { +sealed class TraceConversion(name: String) : OptionGroup(name) { /** * Read the fragments of the trace. */ - public abstract fun read( + abstract fun read( traceDirectory: File, metaSchema: Schema, metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> } -public class SolvinityConversion : TraceConversion("Solvinity") { +class SolvinityConversion : TraceConversion("Solvinity") { private val clusters by option() .split(",") private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") .file(canBeDir = false) - .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } + .convert { VmPlacementReader(it.inputStream()).use { reader -> reader.read() } } .required() override fun read( @@ -335,7 +334,7 @@ public class SolvinityConversion : TraceConversion("Solvinity") { /** * Conversion of the Bitbrains public trace. */ -public class BitbrainsConversion : TraceConversion("Bitbrains") { +class BitbrainsConversion : TraceConversion("Bitbrains") { override fun read( traceDirectory: File, metaSchema: Schema, @@ -447,7 +446,7 @@ public class BitbrainsConversion : TraceConversion("Bitbrains") { /** * Conversion of the Azure public VM trace. */ -public class AzureConversion : TraceConversion("Azure") { +class AzureConversion : TraceConversion("Azure") { private val seed by option(help = "seed for trace sampling") .long() .default(0) @@ -604,18 +603,18 @@ public class AzureConversion : TraceConversion("Azure") { } } -public data class Fragment( - public val id: String, - public val tick: Long, - public val flops: Long, - public val duration: Long, - public val usage: Double, - public val cores: Int +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int ) -public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) +class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt index 61bdea60..7a1683f0 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/VmPlacementReader.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,32 +20,33 @@ * SOFTWARE. */ -package org.opendc.format.trace.sc20 +package org.opendc.experiments.capelin.trace import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.VmPlacementReader import java.io.InputStream /** - * A parser for the JSON VM placement data files used for the SC20 paper. + * A parser for the JSON VM placement data files used for the TPDS article on Capelin. * * @param input The input stream to read from. * @param mapper The Jackson object mapper to use. */ -public class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - VmPlacementReader { +public class VmPlacementReader( + private val input: InputStream, + private val mapper: ObjectMapper = jacksonObjectMapper() +) : AutoCloseable { /** - * The environment that was read from the file. + * Read the VM placements from the input. */ - private val placements = mapper.readValue<Map<String, String>>(input) - - override fun construct(): Map<String, String> { - return placements + public fun read(): Map<String, String> { + return mapper.readValue<Map<String, String>>(input) .mapKeys { "vm__workload__${it.key}.txt" } .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 } - override fun close() {} + override fun close() { + input.close() + } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 4b21b4f7..08e04ddf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,12 +34,12 @@ import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher +import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation @@ -161,9 +161,8 @@ class CapelinIntegrationTest { * Obtain the trace reader for the test. */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { - return Sc20ParquetTraceReader( - listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), - emptyMap(), + return ParquetTraceReader( + listOf(RawParquetTraceReader(File("src/test/resources/trace"))), Workload("test", fraction), seed ) @@ -173,8 +172,8 @@ class CapelinIntegrationTest { * Obtain the environment reader for the test. */ private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") - return Sc20ClusterEnvironmentReader(stream) + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt")) + return ClusterEnvironmentReader(stream) } class TestExperimentReporter : ExperimentMonitor { diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt new file mode 100644 index 00000000..9b1513dc --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll + +/** + * Test suite for the [PerformanceInterferenceReader] class. + */ +class PerformanceInterferenceReaderTest { + @Test + fun testSmoke() { + val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) + val reader = PerformanceInterferenceReader(input) + + val result = reader.use { reader.read() } + + assertAll( + { assertEquals(2, result.size) }, + { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, + { assertEquals(0.0, result[0].targetLoad, 0.001) }, + { assertEquals(0.8830158730158756, result[0].score, 0.001) } + ) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json new file mode 100644 index 00000000..1be5852b --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/perf-interference.json @@ -0,0 +1,22 @@ +[ + { + "vms": [ + "vm_a", + "vm_c", + "vm_x", + "vm_y" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "vm_a", + "vm_b", + "vm_c", + "vm_d" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index 28928dcb..8fc4f6b8 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -38,7 +38,7 @@ import org.opendc.compute.service.scheduler.weights.RandomWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader +import org.opendc.experiments.capelin.trace.StreamingParquetTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider @@ -53,7 +53,6 @@ import org.opendc.simulator.resources.SimResourceInterpreter import java.io.File import java.time.Clock import java.util.* -import kotlin.random.asKotlinRandom /** * Experiments for the OpenDC project on Energy modeling. @@ -88,7 +87,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { val meterProvider: MeterProvider = createMeterProvider(clock) val monitor = ParquetExperimentMonitor(File(config.getString("output-path")), "power_model=$powerModel/run_id=$repeat", 4096) - val trace = Sc20StreamingParquetTraceReader(File(config.getString("trace-path"), trace), random = Random(1).asKotlinRandom()) + val trace = StreamingParquetTraceReader(File(config.getString("trace-path"), trace)) withComputeService(clock, meterProvider, allocationPolicy) { scheduler -> withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt deleted file mode 100644 index 58af8453..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List<Room>) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List<RoomObject>) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List<Machine>) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - * @property memories The memories in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List<Int>, val memories: List<Int>) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt deleted file mode 100644 index 9b77702e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List<MachineDef> { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - val memories = machine.memories.map { id -> - when (id) { - 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, memories), - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, idlePower = 200.0) - ) - } - } - } - } - } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt index 7df1acd3..797a88d5 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -22,8 +22,6 @@ package org.opendc.format.trace -import java.io.Closeable - /** * An interface for reading workloads into memory. * @@ -31,4 +29,4 @@ import java.io.Closeable * * @param T The shape of the workloads supported by this reader. */ -public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable +public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 769b2b13..aaf8a240 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -24,8 +24,6 @@ package org.opendc.format.trace.bitbrains import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader @@ -38,12 +36,8 @@ import kotlin.math.min * A [TraceReader] for the public VM workload trace format. * * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ -public class BitbrainsTraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader<SimWorkload> { +public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ @@ -123,12 +117,6 @@ public class BitbrainsTraceReader( val uuid = UUID(0L, vmId) - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) } - .toSortedSet() - ) - val workload = SimTraceWorkload(flopsHistory.asSequence()) entries[vmId] = TraceEntry( uuid, @@ -136,7 +124,6 @@ public class BitbrainsTraceReader( startTime, workload, mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to cores, "required-memory" to requiredMemory, "workload" to workload diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt deleted file mode 100644 index 0da1f7c2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt +++ /dev/null @@ -1,7 +0,0 @@ -package org.opendc.format.trace.sc20 - -internal data class PerformanceInterferenceEntry( - val vms: List<String>, - val minServerLoad: Double, - val performanceScore: Double -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt deleted file mode 100644 index 1eb4bac2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -public class Sc20TraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List<String>, - random: Random -) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>() - - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - val vms = if (selectedVms.isEmpty()) { - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - } else { - selectedVms.map { - File(traceDirectory, it) - } - } - - vms - .forEachIndexed { idx, vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var timestamp: Long - var cores = -1 - var minTime = Long.MAX_VALUE - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEach { line -> - val values = line.split(" ") - - vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - } - } - - val flopsFragments = sequence { - var last: SimTraceWorkload.Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(" ") - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) { - val oldFragment = last!! - SimTraceWorkload.Fragment( - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - - if (last != null) { - yield(last!!) - } - } - } - - val uuid = UUID(0, idx.toLong()) - - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), - Random(random.nextInt()) - ) - val workload = SimTraceWorkload(flopsFragments.asSequence()) - entries[uuid] = TraceEntry( - uuid, - vmId, - minTime, - workload, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt deleted file mode 100644 index 4c409887..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/interference/PerformanceInterferenceModel.kt +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.interference - -import java.util.* -import kotlin.random.Random - -/** - * Meta-data key for the [PerformanceInterferenceModel] of an image. - */ -public const val IMAGE_PERF_INTERFERENCE_MODEL: String = "image:performance-interference" - -/** - * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. - * - * @param items The [PerformanceInterferenceModel.Item]s that make up this model. - */ -public class PerformanceInterferenceModel( - public val items: SortedSet<Item>, - private val random: Random = Random(0) -) { - private var intersectingItems: List<Item> = emptyList() - private val colocatedWorkloads = TreeMap<String, Int>() - - /** - * Indicate that a VM has started. - */ - public fun onStart(name: String) { - colocatedWorkloads.merge(name, 1, Int::plus) - intersectingItems = items.filter { item -> doesMatch(item) } - } - - /** - * Indicate that a VM has stopped. - */ - public fun onStop(name: String) { - colocatedWorkloads.computeIfPresent(name) { _, v -> (v - 1).takeUnless { it == 0 } } - intersectingItems = items.filter { item -> doesMatch(item) } - } - - /** - * Compute the performance interference based on the current server load. - */ - public fun apply(currentServerLoad: Double): Double { - if (intersectingItems.isEmpty()) { - return 1.0 - } - val score = intersectingItems - .firstOrNull { it.minServerLoad <= currentServerLoad } - - // Apply performance penalty to (on average) only one of the VMs - return if (score != null && random.nextInt(score.workloadNames.size) == 0) { - score.performanceScore - } else { - 1.0 - } - } - - private fun doesMatch(item: Item): Boolean { - var count = 0 - for ( - name in item.workloadNames.subSet( - colocatedWorkloads.firstKey(), - colocatedWorkloads.lastKey() + "\u0000" - ) - ) { - count += colocatedWorkloads.getOrDefault(name, 0) - if (count > 1) - return true - } - return false - } - - /** - * Model describing how a specific set of workloads causes performance variability for each workload. - * - * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. - * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. - * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no - * influence, <1 means that performance degrades, and >1 means that performance improves. - */ - public data class Item( - public val workloadNames: SortedSet<String>, - public val minServerLoad: Double, - public val performanceScore: Double - ) : Comparable<Item> { - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as Item - - if (workloadNames != other.workloadNames) return false - - return true - } - - override fun hashCode(): Int = workloadNames.hashCode() - - override fun compareTo(other: Item): Int { - var cmp = performanceScore.compareTo(other.performanceScore) - if (cmp != 0) { - return cmp - } - - cmp = minServerLoad.compareTo(other.minServerLoad) - if (cmp != 0) { - return cmp - } - - return hashCode().compareTo(other.hashCode()) - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index fb46dab4..d287312f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -23,9 +23,9 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* @@ -39,7 +39,8 @@ import org.opendc.simulator.resources.SimResourceSwitch */ public abstract class SimAbstractHypervisor( private val interpreter: SimResourceInterpreter, - private val scalingGovernor: ScalingGovernor? + private val scalingGovernor: ScalingGovernor? = null, + protected val interferenceDomain: VmInterferenceDomain? = null ) : SimHypervisor { /** * The machine on which the hypervisor runs. @@ -87,12 +88,9 @@ public abstract class SimAbstractHypervisor( return canFit(model, switch) } - override fun createMachine( - model: MachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? - ): SimMachine { + override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine { require(canFit(model)) { "Machine does not fit" } - val vm = VirtualMachine(model, performanceInterferenceModel) + val vm = VirtualMachine(model, interferenceId) _vms.add(vm) return vm } @@ -116,17 +114,18 @@ public abstract class SimAbstractHypervisor( /** * A virtual machine running on the hypervisor. * - * @property model The machine model of the virtual machine. - * @property performanceInterferenceModel The performance interference model to utilize. + * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine( - model: MachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimAbstractMachine(interpreter, parent = null, model) { + private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(interpreter, parent = null, model) { + /** + * The interference key of this virtual machine. + */ + private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) } + /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) } + override val cpus = model.cpus.map { VCpu(switch.newOutput(interferenceKey), it) } override fun close() { super.close() @@ -136,6 +135,9 @@ public abstract class SimAbstractHypervisor( } _vms.remove(this) + if (interferenceKey != null) { + interferenceDomain?.leave(interferenceKey) + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 2ce51ea6..17130d34 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -22,8 +22,10 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceInterpreter @@ -32,20 +34,22 @@ import org.opendc.simulator.resources.SimResourceSwitchMaxMin import org.opendc.simulator.resources.SimResourceSystem /** - * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single - * [SimBareMetalMachine] concurrently using weighted fair sharing. + * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] + * concurrently using weighted fair sharing. * * @param interpreter The interpreter to manage the machine's resources. * @param parent The parent simulation system. * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. + * @param interferenceDomain The resource interference domain to which the hypervisor belongs. * @param listener The hypervisor listener to use. */ public class SimFairShareHypervisor( private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null, scalingGovernor: ScalingGovernor? = null, + interferenceDomain: VmInterferenceDomain? = null, private val listener: SimHypervisor.Listener? = null -) : SimAbstractHypervisor(interpreter, scalingGovernor) { +) : SimAbstractHypervisor(interpreter, scalingGovernor, interferenceDomain) { override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean = true @@ -54,7 +58,7 @@ public class SimFairShareHypervisor( } private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem { - val switch = SimResourceSwitchMaxMin(interpreter, this) + val switch = SimResourceSwitchMaxMin(interpreter, this, interferenceDomain) override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index 542cd0d2..8d0592ec 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSystem @@ -34,6 +36,14 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override fun create( interpreter: SimResourceInterpreter, parent: SimResourceSystem?, + scalingGovernor: ScalingGovernor?, + interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? - ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener = listener) + ): SimHypervisor = SimFairShareHypervisor( + interpreter, + parent, + scalingGovernor = scalingGovernor, + interferenceDomain = interferenceDomain, + listener = listener + ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 40402f5c..e398ab36 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -23,13 +23,12 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload - * to a [SimBareMetalMachine]. + * to another [SimMachine]. */ public interface SimHypervisor : SimWorkload { /** @@ -46,12 +45,9 @@ public interface SimHypervisor : SimWorkload { * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. - * @param performanceInterferenceModel The performance interference model to use. + * @param interferenceId An identifier for the interference model. */ - public fun createMachine( - model: MachineModel, - performanceInterferenceModel: PerformanceInterferenceModel? = null - ): SimMachine + public fun createMachine(model: MachineModel, interferenceId: String? = null): SimMachine /** * Event listener for hypervisor events. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index cafd1ffc..b307a34d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSystem @@ -43,6 +45,8 @@ public interface SimHypervisorProvider { public fun create( interpreter: SimResourceInterpreter, parent: SimResourceSystem? = null, + scalingGovernor: ScalingGovernor? = null, + interferenceDomain: VmInterferenceDomain? = null, listener: SimHypervisor.Listener? = null ): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index 3ceebb9a..ac1c0250 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -31,7 +31,7 @@ import org.opendc.simulator.resources.SimResourceSwitchExclusive /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter, null) { +public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) { override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index fb47d9e5..3906cb9a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.kernel +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSystem @@ -34,6 +36,8 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override fun create( interpreter: SimResourceInterpreter, parent: SimResourceSystem?, + scalingGovernor: ScalingGovernor?, + interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? ): SimHypervisor = SimSpaceSharedHypervisor(interpreter) } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index f30e64cf..1801fcd0 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,18 +20,24 @@ * SOFTWARE. */ -package org.opendc.format.trace +package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.io.Closeable -import kotlin.random.Random +import org.opendc.simulator.resources.interference.InterferenceDomain +import org.opendc.simulator.resources.interference.InterferenceKey /** - * An interface for reading descriptions of performance interference models into memory. + * The interference domain of a hypervisor. */ -public interface PerformanceInterferenceModelReader : Closeable { +public interface VmInterferenceDomain : InterferenceDomain { /** - * Construct a [PerformanceInterferenceModel]. + * Join this interference domain. + * + * @param id The identifier of the virtual machine. */ - public fun construct(random: Random): Map<String, PerformanceInterferenceModel> + public fun join(id: String): InterferenceKey + + /** + * Leave this interference domain. + */ + public fun leave(key: InterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt new file mode 100644 index 00000000..708ddede --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +/** + * A group of virtual machines that together can interfere when operating on the same resources, causing performance + * variability. + */ +public data class VmInterferenceGroup( + /** + * The minimum load of the host before the interference occurs. + */ + public val targetLoad: Double, + + /** + * A score in [0, 1] representing the performance variability as a result of resource interference. + */ + public val score: Double, + + /** + * The members of this interference group. + */ + public val members: Set<String> +) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt new file mode 100644 index 00000000..c2e00c8e --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +import org.opendc.simulator.resources.interference.InterferenceKey +import java.util.* + +/** + * An interference model that models the resource interference between virtual machines on a host. + * + * @param groups The groups of virtual machines that interfere with each other. + * @param random The [Random] instance to select the affected virtual machines. + */ +public class VmInterferenceModel( + private val groups: List<VmInterferenceGroup>, + private val random: Random = Random(0) +) { + /** + * Construct a new [VmInterferenceDomain]. + */ + public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain { + /** + * The stateful groups of this domain. + */ + private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) } + + /** + * The set of keys active in this domain. + */ + private val keys = mutableSetOf<InterferenceKeyImpl>() + + override fun join(id: String): InterferenceKey { + val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad }) + keys += key + return key + } + + override fun leave(key: InterferenceKey) { + if (key is InterferenceKeyImpl) { + keys -= key + key.leave() + } + } + + override fun apply(key: InterferenceKey?, load: Double): Double { + if (key == null || key !is InterferenceKeyImpl) { + return 1.0 + } + + val ctx = key.findGroup(load) + val group = ctx?.group + + // Apply performance penalty to (on average) only one of the VMs + return if (group != null && random.nextInt(group.members.size) == 0) { + group.score + } else { + 1.0 + } + } + + override fun toString(): String = "VmInterferenceDomain" + } + + /** + * An interference key. + * + * @param id The identifier of the member. + * @param groups The groups to which the key belongs. + */ + private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey { + init { + for (group in groups) { + group.join(this) + } + } + + /** + * Find the active group that applies for the interference member. + */ + fun findGroup(load: Double): GroupContext? { + // Find the first active group whose target load is lower than the current load + val index = groups.binarySearchBy(load) { it.group.targetLoad } + val target = if (index >= 0) index else -(index + 1) + + // Check whether there are active groups ahead of the index + for (i in target until groups.size) { + val group = groups[i] + if (group.group.targetLoad > load) { + break + } else if (group.isActive) { + return group + } + } + + // Check whether there are active groups before the index + for (i in (target - 1) downTo 0) { + val group = groups[i] + if (group.isActive) { + return group + } + } + + return null + } + + /** + * Leave all the groups. + */ + fun leave() { + for (group in groups) { + group.leave(this) + } + } + } + + /** + * A group context is used to track the active keys per interference group. + */ + private inner class GroupContext(val group: VmInterferenceGroup) { + /** + * The active keys that are part of this group. + */ + private val keys = mutableSetOf<InterferenceKeyImpl>() + + /** + * A flag to indicate that the group is active. + */ + val isActive + get() = keys.size > 1 + + /** + * Determine whether the specified [id] is part of this group. + */ + operator fun contains(id: String): Boolean = id in group.members + + /** + * Join this group with the specified [key]. + */ + fun join(key: InterferenceKeyImpl) { + keys += key + } + + /** + * Leave this group with the specified [key]. + */ + fun leave(key: InterferenceKeyImpl) { + keys -= key + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 892d5223..a6d955ca 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -24,12 +24,9 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertArrayEquals import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel @@ -157,8 +154,10 @@ class SimMachineTest { try { coroutineScope { launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - assertEquals(100.0, machine.psu.powerDraw) - assertEquals(100.0, source.powerDraw) + assertAll( + { assertEquals(100.0, machine.psu.powerDraw) }, + { assertEquals(100.0, source.powerDraw) } + ) } } finally { machine.close() @@ -284,6 +283,7 @@ class SimMachineTest { } } + @Test fun testDiskWriteUsage() = runBlockingSimulation { val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index 71d48a31..a61cba8d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -34,6 +34,8 @@ import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -223,4 +225,63 @@ internal class SimHypervisorTest { machine.close() } + + @Test + fun testInterference() = runBlockingSimulation { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val model = MachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + + val groups = listOf( + VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")), + VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")), + VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) + ) + val interferenceModel = VmInterferenceModel(groups) + + val platform = SimResourceInterpreter(coroutineContext, clock) + val machine = SimBareMetalMachine( + platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) + ) + val hypervisor = SimFairShareHypervisor(platform, interferenceDomain = interferenceModel.newDomain()) + + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + val workloadB = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 1) + ) + ) + + launch { + machine.run(hypervisor) + } + + coroutineScope { + launch { + val vm = hypervisor.createMachine(model, "a") + vm.run(workloadA) + vm.close() + } + val vm = hypervisor.createMachine(model, "b") + vm.run(workloadB) + vm.close() + } + + machine.close() + } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 84217278..8a24b3e7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -116,8 +116,8 @@ public abstract class SimAbstractResourceAggregator( updateCounters(ctx, work) } - override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return _inputConsumers.sumOf { it.remainingWork } + override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return work - _inputConsumers.sumOf { it.remainingWork } } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index 6bfbfc99..f384582f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceKey + /** * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ @@ -33,6 +35,8 @@ public interface SimResourceDistributor : SimResourceConsumer { /** * Create a new output for the distributor. + * + * @param key The key of the interference member to which the output belongs. */ - public fun newOutput(): SimResourceCloseableProvider + public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index d8fc8cb6..398797cf 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -22,15 +22,21 @@ package org.opendc.simulator.resources -import kotlin.math.max +import org.opendc.simulator.resources.interference.InterferenceDomain +import org.opendc.simulator.resources.interference.InterferenceKey import kotlin.math.min /** * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. + * + * @param interpreter The interpreter for managing the resource contexts. + * @param parent The parent resource system of the distributor. + * @param interferenceDomain The interference domain of the distributor. */ public class SimResourceDistributorMaxMin( private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null + private val parent: SimResourceSystem? = null, + private val interferenceDomain: InterferenceDomain? = null ) : SimResourceDistributor { override val outputs: Set<SimResourceCloseableProvider> get() = _outputs @@ -56,9 +62,14 @@ public class SimResourceDistributorMaxMin( */ private var totalAllocatedSpeed = 0.0 + /** + * The total requested speed for the output resources. + */ + private var totalRequestedSpeed = 0.0 + /* SimResourceDistributor */ - override fun newOutput(): SimResourceCloseableProvider { - val provider = Output(ctx?.capacity ?: 0.0) + override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { + val provider = Output(ctx?.capacity ?: 0.0, key) _outputs.add(provider) return provider } @@ -148,6 +159,7 @@ public class SimResourceDistributorMaxMin( assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } this.totalRequestedWork = totalRequestedWork + this.totalRequestedSpeed = totalRequestedSpeed this.totalAllocatedSpeed = capacity - availableSpeed val totalAllocatedWork = min( totalRequestedWork, @@ -169,7 +181,7 @@ public class SimResourceDistributorMaxMin( /** * An internal [SimResourceProvider] implementation for switch outputs. */ - private inner class Output(capacity: Double) : + private inner class Output(capacity: Double, private val key: InterferenceKey?) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceCloseableProvider, SimResourceProviderLogic, @@ -216,7 +228,6 @@ public class SimResourceDistributorMaxMin( check(!isClosed) { "Cannot re-use closed output" } activeOutputs += this - interpreter.batch { ctx.start() // Interrupt the input to re-schedule the resources @@ -262,19 +273,22 @@ public class SimResourceDistributorMaxMin( lastCommandTimestamp = ctx.clock.millis() } - override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - return if (work > 0.0) { - // Compute the fraction of compute time allocated to the output - val fraction = actualSpeed / totalAllocatedSpeed + // Compute the fraction of compute time allocated to the output + val fraction = actualSpeed / totalAllocatedSpeed - // Compute the work that was actually granted to the output. - val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction - max(0.0, work - processingAvailable) + // Compute the performance penalty due to resource interference + val perfScore = if (interferenceDomain != null) { + val load = totalAllocatedSpeed / requireNotNull(this@SimResourceDistributorMaxMin.ctx).capacity + interferenceDomain.apply(key, load) } else { - 0.0 + 1.0 } + + // Compute the work that was actually granted to the output. + return (totalRequestedWork - totalRemainingWork) * fraction * perfScore } /* Comparable */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 5231ecf5..17045557 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import kotlin.math.max - /** * The logic of a resource provider. */ @@ -63,19 +61,18 @@ public interface SimResourceProviderLogic { public fun onFinish(ctx: SimResourceControllableContext) /** - * Get the remaining work to process after a resource consumption. + * Compute the amount of work that was consumed over the specified [duration]. * - * @param work The size of the resource consumption. - * @param speed The speed of consumption. + * @param work The total size of the resource consumption. + * @param speed The speed of the resource provider. * @param duration The duration from the start of the consumption until now. - * @return The amount of work remaining. + * @return The amount of work that was consumed by the resource provider. */ - public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + public fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { return if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, work - processed) + return (duration / 1000.0) * speed } else { - 0.0 + work } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index f6e7b22f..d2aab634 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceKey + /** * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. */ @@ -43,8 +45,10 @@ public interface SimResourceSwitch : AutoCloseable { /** * Create a new output on the switch. + * + * @param key The key of the interference member to which the output belongs. */ - public fun newOutput(): SimResourceCloseableProvider + public fun newOutput(key: InterferenceKey? = null): SimResourceCloseableProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 4ff741ed..fbb541e5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceKey import java.util.ArrayDeque /** @@ -61,7 +62,10 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" } - override fun newOutput(): SimResourceCloseableProvider { + /** + * Add an output to the switch. + */ + override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 50d58798..ceb5a1a4 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -22,13 +22,21 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.interference.InterferenceDomain +import org.opendc.simulator.resources.interference.InterferenceKey + /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. + * + * @param interpreter The interpreter for managing the resource contexts. + * @param parent The parent resource system of the switch. + * @param interferenceDomain The interference domain of the switch. */ public class SimResourceSwitchMaxMin( interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null + parent: SimResourceSystem? = null, + interferenceDomain: InterferenceDomain? = null ) : SimResourceSwitch { /** * The output resource providers to which resource consumers can be attached. @@ -61,7 +69,7 @@ public class SimResourceSwitchMaxMin( /** * The distributor to distribute the aggregated resources. */ - private val distributor = SimResourceDistributorMaxMin(interpreter, parent) + private val distributor = SimResourceDistributorMaxMin(interpreter, parent, interferenceDomain) init { aggregator.startConsumer(distributor) @@ -70,10 +78,10 @@ public class SimResourceSwitchMaxMin( /** * Add an output to the switch. */ - override fun newOutput(): SimResourceCloseableProvider { + override fun newOutput(key: InterferenceKey?): SimResourceCloseableProvider { check(!isClosed) { "Switch has been closed" } - return distributor.newOutput() + return distributor.newOutput(key) } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 90c7bc75..98fad068 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.resources.impl import org.opendc.simulator.resources.* import java.time.Clock +import kotlin.math.max import kotlin.math.min /** @@ -318,7 +319,7 @@ internal class SimResourceContextImpl( */ private fun computeRemainingWork(now: Long): Double { return if (_work > 0.0) - logic.getRemainingWork(this, _work, speed, now - _timestamp) + max(0.0, _work - logic.getConsumedWork(this, _work, speed, now - _timestamp)) else 0.0 } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt new file mode 100644 index 00000000..1066777f --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt @@ -0,0 +1,19 @@ +package org.opendc.simulator.resources.interference + +import org.opendc.simulator.resources.SimResourceConsumer + +/** + * An interference domain represents a system of resources where [resource consumers][SimResourceConsumer] may incur + * performance variability due to operating on the same resources and therefore causing interference. + */ +public interface InterferenceDomain { + /** + * Compute the performance score of a participant in this interference domain. + * + * @param key The participant to obtain the score of or `null` if the participant has no key. + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the resource consumer, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(key: InterferenceKey?, load: Double): Double +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt index 6861affe..8b12e7b4 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,16 +20,9 @@ * SOFTWARE. */ -package org.opendc.format.trace - -import java.io.Closeable +package org.opendc.simulator.resources.interference /** - * An interface for reading VM placement data into memory. + * A key that uniquely identifies a participant of an interference domain. */ -public interface VmPlacementReader : Closeable { - /** - * Construct a map of VMs to clusters. - */ - public fun construct(): Map<String, String> -} +public interface InterferenceKey diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt index 09f7de35..d0b97d90 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -47,14 +47,16 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader +import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader +import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.sdk.toOtelClock import java.io.File import kotlin.random.Random +import kotlin.random.asJavaRandom private val logger = KotlinLogging.logger {} @@ -62,7 +64,7 @@ private val logger = KotlinLogging.logger {} * Represents the CLI command for starting the OpenDC web runner. */ @OptIn(ExperimentalCoroutinesApi::class) -public class RunnerCli : CliktCommand(name = "runner") { +class RunnerCli : CliktCommand(name = "runner") { /** * The name of the database to use. */ @@ -167,8 +169,8 @@ public class RunnerCli : CliktCommand(name = "runner") { tracePath, scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) ) - val traceReader = Sc20RawParquetTraceReader(traceDir) - val performanceInterferenceReader = let { + val traceReader = RawParquetTraceReader(traceDir) + val interferenceGroups = let { val path = File(traceDir, "performance-interference-model.json") val operational = scenario.get("operational", Document::class.java) val enabled = operational.getBoolean("performanceInterferenceEnabled") @@ -177,17 +179,18 @@ public class RunnerCli : CliktCommand(name = "runner") { return@let null } - path.inputStream().use { Sc20PerformanceInterferenceReader(it) } + PerformanceInterferenceReader(path.inputStream()).use { reader -> reader.read() } } val targets = portfolio.get("targets", Document::class.java) val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = topologyParser.read(topologyId) - val results = (0 until targets.getInteger("repeatsPerScenario")).map { - logger.info { "Starting repeat $it" } + val results = (0 until targets.getInteger("repeatsPerScenario")).map { repeat -> + logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { - runRepeat(scenario, it, environment, traceReader, performanceInterferenceReader) + val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) } + runRepeat(scenario, repeat, environment, traceReader, interferenceModel) } } @@ -203,8 +206,8 @@ public class RunnerCli : CliktCommand(name = "runner") { scenario: Document, repeat: Int, environment: EnvironmentReader, - traceReader: Sc20RawParquetTraceReader, - performanceInterferenceReader: Sc20PerformanceInterferenceReader? + traceReader: RawParquetTraceReader, + interferenceModel: VmInterferenceModel? ): WebExperimentMonitor.Result { val monitor = WebExperimentMonitor() @@ -267,16 +270,14 @@ public class RunnerCli : CliktCommand(name = "runner") { else -> throw IllegalArgumentException("Unknown policy $policyName") } - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( + val trace = ParquetTraceReader( listOf(traceReader), - performanceInterferenceModel, Workload(workloadName, workloadFraction), seed ) val failureFrequency = if (operational.getBoolean("failuresEnabled", false)) 24.0 * 7 else 0.0 - withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler -> val failureDomain = if (failureFrequency > 0) { logger.debug { "ENABLING failures" } createFailureDomain( @@ -377,4 +378,4 @@ public class RunnerCli : CliktCommand(name = "runner") { /** * Main entry point of the runner. */ -public fun main(args: Array<String>): Unit = RunnerCli().main(args) +fun main(args: Array<String>): Unit = RunnerCli().main(args) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index 413112af..38c774a9 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -25,7 +25,6 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -57,7 +56,6 @@ import kotlin.math.max * Integration test suite for the [WorkflowServiceImpl]. */ @DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) internal class WorkflowServiceIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -70,7 +68,7 @@ internal class WorkflowServiceIntegrationTest { .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json"))) .use { it.read() } .map { def -> SimHost( @@ -106,7 +104,7 @@ internal class WorkflowServiceIntegrationTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) + val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf"))) var offset = Long.MIN_VALUE coroutineScope { |
