diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-13 14:04:01 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-13 14:04:01 +0200 |
| commit | 5300e0f1df51a2b41be0e76d7c2061315ffea467 (patch) | |
| tree | b8920d81ec9f84e8fecce270a78a96375d393601 | |
| parent | 5c2270d058c312c94ee0970560009e8008042d10 (diff) | |
refactor: Share trace across simulations
3 files changed, 267 insertions, 7 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt index a4b26ddb..a9ae7c6d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt @@ -27,7 +27,8 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider -import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20FilteringParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader @@ -63,7 +64,7 @@ public class ExperimentRunner( private val reporterProvider: ExperimentReporterProvider, private val environmentPath: File, private val tracePath: File, - private val performanceInterferenceModel: PerformanceInterferenceModel + private val performanceInterferenceModel: PerformanceInterferenceModel? ) : Closeable { /** * The database helper to write the execution plan. @@ -120,17 +121,23 @@ public class ExperimentRunner( } /** + * The raw parquet trace readers that are shared across simulations. + */ + private val rawTraceReaders = mutableMapOf<String, Sc20RawParquetTraceReader>() + + /** * Create a trace reader for the specified trace. */ private fun createTraceReader( name: String, - performanceInterferenceModel: PerformanceInterferenceModel, + performanceInterferenceModel: PerformanceInterferenceModel?, seed: Int ): TraceReader<VmWorkload> { - return Sc20StreamingParquetTraceReader( - File(tracePath, name), + val raw = rawTraceReaders.getOrPut(name) { Sc20RawParquetTraceReader(File(tracePath, name)) } + return Sc20FilteringParquetTraceReader( + raw, performanceInterferenceModel, - emptyList(), + emptySet(), Random(seed) ) } @@ -167,7 +174,7 @@ public class ExperimentRunner( val plan = createPlan() val total = plan.size val finished = AtomicInteger() - val dispatcher = Executors.newWorkStealingPool().asCoroutineDispatcher() + val dispatcher = Executors.newWorkStealingPool(2).asCoroutineDispatcher() runBlocking { val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!! diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt new file mode 100644 index 00000000..1e9950de --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20FilteringParquetTraceReader.kt @@ -0,0 +1,94 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20FilteringParquetTraceReader( + raw: Sc20RawParquetTraceReader, + performanceInterferenceModel: PerformanceInterferenceModel?, + selectedVms: Set<String>, + random: Random +) : TraceReader<VmWorkload> { + /** + * The iterator over the actual trace. + */ + private val iterator: Iterator<TraceEntry<VmWorkload>> = + raw.read() + .run { + // Apply VM selection filter + if (selectedVms.isEmpty()) + this + else + filter { it.workload.image.name in selectedVms } + } + .run { + // Apply performance interference model + if (performanceInterferenceModel == null) + this + else + map { entry -> + val image = entry.workload.image + val id = image.name + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { id in it.workloadNames }.toSet(), + Random(random.nextInt()) + ) + val newImage = + VmImage( + image.uid, + image.name, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + image.flopsHistory, + image.maxCores, + image.requiredMemory + ) + val newWorkload = entry.workload.copy(image = newImage) + Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + } + } + .iterator() + + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<VmWorkload> = iterator.next() + + override fun close() {} +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt new file mode 100644 index 00000000..632746a2 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -0,0 +1,159 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20.trace + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import mu.KotlinLogging +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import java.io.File +import java.util.UUID + +private val logger = KotlinLogging.logger {} + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param path The directory of the traces. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20RawParquetTraceReader(private val path: File) { + /** + * Read the fragments into memory. + */ + private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> { + val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) + .disableCompatibility() + .build() + + val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>() + + return try { + while (true) { + val record = reader.read() ?: break + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + fragments.getOrPut(id) { mutableListOf() }.add(fragment) + } + + fragments + } finally { + reader.close() + } + } + + /** + * Read the metadata into a workload. + */ + private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> { + val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) + .disableCompatibility() + .build() + + var counter = 0 + val entries = mutableListOf<TraceEntryImpl>() + + return try { + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) + + val vmWorkload = VmWorkload( + uid, "VM Workload $id", + UnnamedUser, + VmImage( + uid, + id, + emptyMap(), + fragments.getValue(id).asSequence(), + maxCores, + requiredMemory + ) + ) + + entries.add(TraceEntryImpl(submissionTime, vmWorkload)) + } + + entries + } finally { + metaReader.close() + } + } + + /** + * The entries in the trace. + */ + private val entries: Sequence<TraceEntryImpl> + + init { + val fragments = parseFragments(path) + entries = parseMeta(path, fragments).asSequence() + } + + /** + * Read the entries in the trace. + */ + public fun read(): Sequence<TraceEntry<VmWorkload>> = entries + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "<unnamed>" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + internal data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry<VmWorkload> +} |
