From 269860ba2616c32ca8a81ac66b6fbf95c2f1c77d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 15 May 2020 13:29:25 +0200 Subject: perf: Reduce memory consumption of perf interference model --- .../com/atlarge/opendc/experiments/sc20/Main.kt | 2 +- .../experiments/sc20/experiment/Experiment.kt | 4 +-- .../opendc/experiments/sc20/experiment/Run.kt | 3 +- .../sc20/trace/Sc20ParquetTraceReader.kt | 20 ++++++------ .../sc20/trace/Sc20RawParquetTraceReader.kt | 5 --- .../sc20/trace/Sc20StreamingParquetTraceReader.kt | 2 +- .../experiments/sc20/trace/WorkloadSampler.kt | 12 ++++---- .../resources/env/performance-interference.json | 7 ----- .../src/main/resources/env/setup-small.json | 21 ------------- .../src/main/resources/env/setup-test.json | 36 ---------------------- .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 20 ++++++------ 11 files changed, 30 insertions(+), 102 deletions(-) delete mode 100644 opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json delete mode 100644 opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json delete mode 100644 opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index e17a145c..ca06bcbb 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -128,7 +128,7 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { logger.info { "Constructing performance interference model" } val performanceInterferenceModel = - performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } + performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it) } logger.info { "Creating experiment descriptor" } val descriptor = object : Experiment(environmentPath, tracePath, output, performanceInterferenceModel, vmPlacements, bufferSize) { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt index 5feb5917..f3ac2554 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Experiment.kt @@ -24,13 +24,13 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.experiments.sc20.runner.ContainerExperimentDescriptor import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetRunEventWriter +import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader import java.io.File /** @@ -47,7 +47,7 @@ public abstract class Experiment( val environments: File, val traces: File, val output: File, - val performanceInterferenceModel: PerformanceInterferenceModel?, + val performanceInterferenceModel: PerformanceInterferenceModelReader?, val vmPlacements: Map, val bufferSize: Int ) : ContainerExperimentDescriptor() { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 6d53fd17..985e98c8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -91,7 +91,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) Sc20RawParquetTraceReader(File(experiment.traces, name)) } } - val trace = Sc20ParquetTraceReader(raw, experiment.performanceInterferenceModel, this) + val performanceInterferenceModel = experiment.performanceInterferenceModel?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader(raw, performanceInterferenceModel, parent.workload, seed) val monitor = ParquetExperimentMonitor(this) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index 96b6b426..ad50bf18 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -28,10 +28,10 @@ import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader -import kotlin.random.Random +import java.util.TreeSet /** * A [TraceReader] for the internal VM workload trace format. @@ -43,29 +43,27 @@ import kotlin.random.Random @OptIn(ExperimentalStdlibApi::class) class Sc20ParquetTraceReader( raw: Sc20RawParquetTraceReader, - performanceInterferenceModel: PerformanceInterferenceModel?, - run: Run + performanceInterferenceModel: Map, + workload: Workload, + seed: Int ) : TraceReader { /** * The iterator over the actual trace. */ private val iterator: Iterator> = raw.read() - .run { sampleWorkload(this, run) } + .run { sampleWorkload(this, workload, seed) } .run { // Apply performance interference model - if (performanceInterferenceModel == null) + if (performanceInterferenceModel.isEmpty()) this else { - val random = Random(run.seed) map { entry -> val image = entry.workload.image val id = image.name val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.workloadToItem[id] ?: emptySet(), - Random(random.nextInt()) - ) + performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) + val newImage = VmImage( image.uid, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 485c2922..3b480d33 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -164,9 +164,4 @@ class Sc20RawParquetTraceReader(private val path: File) { override var submissionTime: Long, override val workload: VmWorkload ) : TraceEntry - - /** - * A load cache entry. - */ - data class LoadCacheEntry(val vm: String, val totalLoad: Double, val start: Long, val end: Long) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index aa06ce65..f6d6e6fd 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -231,7 +231,7 @@ class Sc20StreamingParquetTraceReader( } val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), Random(random.nextInt()) ) val vmWorkload = VmWorkload( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index e03c59bc..a8580686 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -25,7 +25,7 @@ package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.experiments.sc20.experiment.Run +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry import mu.KotlinLogging import kotlin.random.Random @@ -35,20 +35,20 @@ private val logger = KotlinLogging.logger {} /** * Sample the workload for the specified [run]. */ -fun sampleWorkload(trace: List>, run: Run): List> { - return sampleRegularWorkload(trace, run) +fun sampleWorkload(trace: List>, workload: Workload, seed: Int): List> { + return sampleRegularWorkload(trace, workload, seed) } /** * Sample a regular (non-HPC) workload. */ -fun sampleRegularWorkload(trace: List>, run: Run): List> { - val fraction = run.parent.workload.fraction +fun sampleRegularWorkload(trace: List>, workload: Workload, seed: Int): List> { + val fraction = workload.fraction if (fraction >= 1) { return trace } - val shuffled = trace.shuffled(Random(run.seed)) + val shuffled = trace.shuffled(Random(seed)) val res = mutableListOf>() val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } var currentLoad = 0.0 diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json b/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json deleted file mode 100644 index 87a4f8af..00000000 --- a/opendc/opendc-experiments-sc20/src/main/resources/env/performance-interference.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "vms": [545, 223], - "minServerLoad": 0.84, - "performanceScore": 0.6 - } -] diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json deleted file mode 100644 index 80b24dba..00000000 --- a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-small.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "name": "Experimental Setup 2", - "rooms": [ - { - "type": "SERVER", - "objects": [ - { - "type": "RACK", - "machines": [ - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]} - ] - } - ] - } - ] -} diff --git a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json b/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json deleted file mode 100644 index 02a1d25b..00000000 --- a/opendc/opendc-experiments-sc20/src/main/resources/env/setup-test.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "Experimental Setup 2", - "rooms": [ - { - "type": "SERVER", - "objects": [ - { - "type": "RACK", - "machines": [ - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]}, - {"cpus": [2], "memories": [1, 1, 1, 1]}, {"cpus": [2], "memories": [1, 1, 1, 1]} - ] - }, - { - "type": "RACK", - "machines": [ - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]}, - {"cpus": [1], "memories": [1, 1, 1, 1]}, {"cpus": [1], "memories": [1, 1, 1, 1]} - ] - } - ] - } - ] -} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index ae3d6db1..abd5c961 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -34,13 +34,14 @@ import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAll import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner -import com.atlarge.opendc.experiments.sc20.experiment.createTraceReader +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.experiment.processTrace +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -141,8 +142,8 @@ class Sc20IntegrationTest { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") assertEquals(207379117949, monitor.totalRequestedBurst) - assertEquals(207378478631, monitor.totalGrantedBurst) - assertEquals(639360, monitor.totalOvercommissionedBurst) + assertEquals(207102919834, monitor.totalGrantedBurst) + assertEquals(276198896, monitor.totalOvercommissionedBurst) assertEquals(0, monitor.totalInterferedBurst) } @@ -157,13 +158,10 @@ class Sc20IntegrationTest { * Obtain the trace reader for the test. */ private fun createTestTraceReader(): TraceReader { - val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - return createTraceReader( - File("src/test/resources/trace"), - performanceInterferenceModel, - emptyList(), + return Sc20ParquetTraceReader( + Sc20RawParquetTraceReader(File("src/test/resources/trace")), + emptyMap(), + Workload("test", 1.0), 0 ) } -- cgit v1.2.3