From e097aeb16d77c260126b65c7f13330076d800d52 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 01:35:33 +0200 Subject: perf: Convert traces to Parquet format --- .../atlarge/opendc/compute/core/image/VmImage.kt | 2 +- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 18 +- opendc/opendc-experiments-sc20/build.gradle.kts | 2 +- .../experiments/sc20/Sc20ParquetTraceReader.kt | 229 +++++++++++++++++++++ .../opendc/experiments/sc20/TestExperiment.kt | 7 +- .../opendc/experiments/sc20/TraceConverter.kt | 190 +++++++++++++++++ .../opendc/format/trace/sc20/Sc20TraceReader.kt | 11 +- 7 files changed, 441 insertions(+), 18 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index b37f05a7..e3227540 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -23,7 +23,7 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragments in flopsHistory.chunked(1024)) { + for (fragments in flopsHistory.chunked(128)) { for (fragment in fragments) { job.ensureActive() diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 2c25c0fa..8a32bc43 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -57,6 +57,9 @@ import kotlinx.coroutines.selects.select import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -325,7 +328,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) + vm.chan?.resume(Unit) } } @@ -371,7 +374,7 @@ class SimpleVirtDriver( val vm: VmServerContext, val vcpu: ProcessingUnit, var burst: Long, - val limit: Double + var limit: Double ) { /** * The usage that was actually granted. @@ -395,7 +398,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - val chan: Channel = Channel(Channel.CONFLATED) + var chan: Continuation? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -452,6 +455,7 @@ class SimpleVirtDriver( require(burst.size == limit.size) { "Array dimensions do not match" } this.deadline = deadline this.burst = burst + val requests = cpus.asSequence() .take(burst.size) .mapIndexed { i, cpu -> @@ -466,13 +470,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { - schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - chan.receive() + schedulingQueue.offer(SchedulerCommand.Schedule(this, requests)) + suspendCoroutine { chan = it } } catch (e: CancellationException) { // Deschedule the VM requests.forEach { it.isCancelled = true } - schedulingQueue.send(SchedulerCommand.Interrupt) - chan.receive() + schedulingQueue.offer(SchedulerCommand.Interrupt) + suspendCoroutine { chan = it } e.assertFailure() } } diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 4b73cedd..a78ea745 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -32,7 +32,7 @@ plugins { application { mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" - applicationDefaultJvmArgs = listOf("-Xmx3096M") + applicationDefaultJvmArgs = listOf("-Xmx2200M", "-Xms1800M") } dependencies { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..30456204 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -0,0 +1,229 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.io.api.Binary +import java.io.File +import java.io.Serializable +import java.util.Deque +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.LinkedBlockingDeque +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20ParquetTraceReader( + traceFile: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List, + random: Random +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Fill the buffers of the VMs + */ + private fun pull(reader: ParquetReader, buffers: Map>) { + if (!hasNext) { + return + } + + repeat(buffers.size) { + val record = reader.read() + + if (record == null) { + hasNext = false + reader.close() + return + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + buffers[id]?.add(fragment) + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + val buffers = mutableMapOf>() + + val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + var idx = 0 + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + + println(id) + + val buffer = LinkedBlockingDeque() + buffers[id] = buffer + val fragments = sequence { + while (true) { + if (buffer.isEmpty()) { + if (hasNext) { + pull(reader, buffers) + continue + } else { + break + } + } + yield(buffer.poll()) + } + } + val uuid = UUID(0, (idx++).toLong()) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uuid, "VM Workload $id", UnnamedUser, + VmImage( + uuid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) + + entries[uuid] = TraceEntryImpl( + submissionTime, + vmWorkload + ) + } + + metaReader.close() + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 552dcb63..19055ad3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -44,7 +44,6 @@ import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser @@ -241,7 +240,7 @@ fun main(args: Array) { var submitted = 0L val finish = Channel(Channel.RENDEZVOUS) - val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) + val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() submitted++ @@ -267,8 +266,10 @@ fun main(args: Array) { } finish.receive() - scheduler.terminate() failureDomain?.cancel() + launch { + scheduler.terminate() + } println(simulationContext.clock.instant()) println("${System.currentTimeMillis() - start} milliseconds") } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt new file mode 100644 index 00000000..7f429b89 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -0,0 +1,190 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import kotlin.math.max +import kotlin.math.min + +/** + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +fun main() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + val dest = File("../traces/solvinity/small-parquet") + val traceDirectory = File("../traces/solvinity/small") + val vms = + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + + val metaWriter = AvroParquetWriter.builder(Path(dest.absolutePath, "meta.parquet")) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val allFragments = mutableListOf() + + vms + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + + vmId = vmFile.name + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + val writer = AvroParquetWriter.builder(Path(dest.absolutePath, "trace.parquet")) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + + writer.close() + metaWriter.close() +} + +data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index da678c07..2e2159ba 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -115,9 +115,9 @@ class Sc20TraceReader( BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() - .chunked(1024) + .chunked(128) .forEach { lines -> - val res = ArrayList(lines.size) + // val res = ArrayList(lines.size) for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { @@ -144,13 +144,12 @@ class Sc20TraceReader( val fragment = FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) if (last != null) { - res.add(last!!) + yield(last!!) } fragment } } - - yieldAll(res) + // yieldAll(res) } if (last != null) { @@ -172,7 +171,7 @@ class Sc20TraceReader( uuid, vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsFragments, + flopsFragments.asSequence(), maxCores, requiredMemory ) -- cgit v1.2.3