From efd62bbc3bbe236503095e5cb27af42423500d85 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 13 May 2020 23:36:39 +0200 Subject: feat: Add workload sampling --- .../opendc/experiments/sc20/ExperimentRunner.kt | 12 ++- .../opendc/experiments/sc20/ExperimentRunnerCli.kt | 32 ++++++-- .../atlarge/opendc/experiments/sc20/Portfolios.kt | 4 +- .../opendc/experiments/sc20/WorkloadSampler.kt | 62 ++++++++++++++ .../sc20/reporter/PostgresMetricsWriter.kt | 6 +- .../sc20/trace/Sc20FilteringParquetTraceReader.kt | 94 ---------------------- .../sc20/trace/Sc20ParquetTraceReader.kt | 92 +++++++++++++++++++++ .../sc20/trace/Sc20RawParquetTraceReader.kt | 26 ++++-- .../src/main/resources/log4j2.xml | 3 + 9 files changed, 212 insertions(+), 119 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index 6e6da2c8..5e16b5e6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider -import com.atlarge.opendc.experiments.sc20.trace.Sc20FilteringParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper import com.atlarge.opendc.format.environment.EnvironmentReader @@ -44,7 +44,6 @@ import java.io.File import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger import javax.sql.DataSource -import kotlin.random.Random import kotlin.system.measureTimeMillis /** @@ -132,14 +131,13 @@ public class ExperimentRunner( private fun createTraceReader( name: String, performanceInterferenceModel: PerformanceInterferenceModel?, - seed: Int + run: Run ): TraceReader { val raw = rawTraceReaders.getOrPut(name) { Sc20RawParquetTraceReader(File(tracePath, name)) } - return Sc20FilteringParquetTraceReader( + return Sc20ParquetTraceReader( raw, performanceInterferenceModel, - emptySet(), - Random(seed) + run ) } @@ -155,7 +153,7 @@ public class ExperimentRunner( */ private fun run(run: Run) { val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id) - val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run.seed) + val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run) val environmentReader = createEnvironmentReader(run.scenario.topology.name) try { run.scenario(run, reporter, environmentReader, traceReader) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt index 9dfed03b..d5990c01 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt @@ -43,6 +43,7 @@ import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int import com.zaxxer.hikari.HikariDataSource import mu.KotlinLogging import java.io.File @@ -83,7 +84,6 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { private val performanceInterferenceStream by option("--performance-interference-model", help = "path to the performance interference file") .file() .convert { it.inputStream() as InputStream } - .defaultLazy { ExperimentCli::class.java.getResourceAsStream("/env/performance-interference.json") } /** * The path to the original VM placements file. @@ -109,21 +109,39 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { private val portfolios by option("--portfolio") .choice( "hor-ver" to HorVerPortfolio, - "more-velocitory" to MoreVelocityPortfolio, + "more-velocity" to MoreVelocityPortfolio, "more-hpc" to MoreHpcPortfolio, "operational-phenomena" to OperationalPhenomenaPortfolio, ignoreCase = true ) .multiple() + /** + * The maximum number of threads to use. + */ + private val parallelism by option("--parallelism") + .int() + .default(Runtime.getRuntime().availableProcessors()) + + /** + * The batch size for writing results. + */ + private val batchSize by option("--batch-size") + .int() + .default(4096) + override fun run() { val ds = HikariDataSource() ds.jdbcUrl = jdbcUrl ds.addDataSourceProperty("reWriteBatchedInserts", "true") - val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel) + + reporter.batchSize = batchSize + + val performanceInterferenceModel = + performanceInterferenceStream?.let { Sc20PerformanceInterferenceReader(it).construct() } + + val runner = ExperimentRunner(portfolios, ds, reporter, environmentPath, tracePath, performanceInterferenceModel, parallelism) try { runner.run() @@ -138,6 +156,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { * An option for specifying the type of reporter to use. */ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentReporterProvider { + var batchSize = 4096 + class Parquet : Reporter("Options for reporting using Parquet") { private val path by option("--parquet-directory", help = "path to where the output should be stored") .file() @@ -153,7 +173,7 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo lateinit var hostWriter: PostgresHostMetricsWriter override fun init(ds: DataSource) { - hostWriter = PostgresHostMetricsWriter(ds, 4096) + hostWriter = PostgresHostMetricsWriter(ds, batchSize) } override fun createReporter(scenario: Long, run: Int): ExperimentReporter = diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt index 58acd168..04bddce3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt @@ -30,7 +30,7 @@ abstract class AbstractSc20Portfolio(name: String) : Portfolio(name) { abstract val operationalPhenomena: List> abstract val allocationPolicies: List - open val repetitions = 2 + open val repetitions = 4 override val scenarios: Sequence = sequence { for (topology in topologies) { @@ -72,7 +72,7 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") { // Workload("solvinity", 0.1), // Workload("solvinity", 0.25), // Workload("small-parquet", 0.5), - Workload("small-parquet", 1.0) + Workload("small-parquet", 0.5) ) override val operationalPhenomena = listOf( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt new file mode 100644 index 00000000..6089271e --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/WorkloadSampler.kt @@ -0,0 +1,62 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.format.trace.TraceEntry +import kotlin.random.Random + +/** + * Sample the workload for the specified [run]. + */ +fun sampleWorkload(trace: List>, run: Run): List> { + return sampleRegularWorkload(trace, run) +} + +/** + * Sample a regular (non-HPC) workload. + */ +fun sampleRegularWorkload(trace: List>, run: Run): List> { + if (run.scenario.workload.fraction >= 1) { + return trace + } + + val shuffled = trace.shuffled(Random(run.seed)) + val res = mutableListOf>() + val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + var currentLoad = 0.0 + + for (entry in shuffled) { + val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + if ((currentLoad + entryLoad) / totalLoad > run.scenario.workload.fraction) { + break + } + + currentLoad += entryLoad + res += entry + } + + return res +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt index a47258b4..bfa89e3a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt @@ -43,7 +43,7 @@ public abstract class PostgresMetricsWriter( /** * The queue of commands to process. */ - private val queue: BlockingQueue = ArrayBlockingQueue(batchSize) + private val queue: BlockingQueue = ArrayBlockingQueue(12 * batchSize) /** * The thread for the actual writer. @@ -93,9 +93,8 @@ public abstract class PostgresMetricsWriter( if (queue.isEmpty()) { actions.add(queue.take()) - } else { - queue.drainTo(actions) } + queue.drainTo(actions) for (action in actions) { when (action) { @@ -110,6 +109,7 @@ public abstract class PostgresMetricsWriter( stmt.executeBatch() conn.commit() } + } } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt deleted file mode 100644 index 1e9950de..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt +++ /dev/null @@ -1,94 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20.trace - -import com.atlarge.opendc.compute.core.image.VmImage -import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.format.trace.TraceEntry -import com.atlarge.opendc.format.trace.TraceReader -import kotlin.random.Random - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -@OptIn(ExperimentalStdlibApi::class) -class Sc20FilteringParquetTraceReader( - raw: Sc20RawParquetTraceReader, - performanceInterferenceModel: PerformanceInterferenceModel?, - selectedVms: Set, - random: Random -) : TraceReader { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator> = - raw.read() - .run { - // Apply VM selection filter - if (selectedVms.isEmpty()) - this - else - filter { it.workload.image.name in selectedVms } - } - .run { - // Apply performance interference model - if (performanceInterferenceModel == null) - this - else - map { entry -> - val image = entry.workload.image - val id = image.name - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { id in it.workloadNames }.toSet(), - Random(random.nextInt()) - ) - val newImage = - VmImage( - image.uid, - image.name, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - image.flopsHistory, - image.maxCores, - image.requiredMemory - ) - val newWorkload = entry.workload.copy(image = newImage) - Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) - } - } - .iterator() - - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..7cc713bc --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -0,0 +1,92 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.Run +import com.atlarge.opendc.experiments.sc20.sampleWorkload +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param reader The internal trace reader to use. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + * @param run The run to which this reader belongs. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20ParquetTraceReader( + raw: Sc20RawParquetTraceReader, + performanceInterferenceModel: PerformanceInterferenceModel?, + run: Run +) : TraceReader { + /** + * The iterator over the actual trace. + */ + private val iterator: Iterator> = + raw.read() + .run { sampleWorkload(this, run) } + .run { + // Apply performance interference model + if (performanceInterferenceModel == null) + this + else { + val random = Random(run.seed) + map { entry -> + val image = entry.workload.image + val id = image.name + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { id in it.workloadNames }.toSet(), + Random(random.nextInt()) + ) + val newImage = + VmImage( + image.uid, + image.name, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + image.flopsHistory, + image.maxCores, + image.requiredMemory + ) + val newWorkload = entry.workload.copy(image = newImage) + Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + } + } + } + .iterator() + + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 632746a2..f19c9275 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -94,29 +94,36 @@ class Sc20RawParquetTraceReader(private val path: File) { var counter = 0 val entries = mutableListOf() + val loadCache = mutableListOf() return try { while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long val maxCores = record["maxCores"] as Int val requiredMemory = record["requiredMemory"] as Long val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + val vmFragments = fragments.getValue(id).asSequence() + val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val vmWorkload = VmWorkload( - uid, "VM Workload $id", + uid, id, UnnamedUser, VmImage( uid, id, - emptyMap(), - fragments.getValue(id).asSequence(), + mapOf( + "submit-time" to submissionTime, + "end-time" to endTime, + "total-load" to totalLoad + ), + vmFragments, maxCores, requiredMemory ) ) - entries.add(TraceEntryImpl(submissionTime, vmWorkload)) } @@ -129,17 +136,17 @@ class Sc20RawParquetTraceReader(private val path: File) { /** * The entries in the trace. */ - private val entries: Sequence + private val entries: List init { val fragments = parseFragments(path) - entries = parseMeta(path, fragments).asSequence() + entries = parseMeta(path, fragments) } /** * Read the entries in the trace. */ - public fun read(): Sequence> = entries + public fun read(): List> = entries /** * An unnamed user. @@ -156,4 +163,9 @@ class Sc20RawParquetTraceReader(private val path: File) { override var submissionTime: Long, override val workload: VmWorkload ) : TraceEntry + + /** + * A load cache entry. + */ + data class LoadCacheEntry(val vm: String, val totalLoad: Double, val start: Long, val end: Long) } diff --git a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml index f9a5a79e..f47a6da8 100644 --- a/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ b/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -36,6 +36,9 @@ + + + -- cgit v1.2.3