From b5fab8f707d4aeb0d045b53f571c3dc826c69570 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 00:19:06 +0200 Subject: bug: Report shutdown state of all machines --- .../src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 7e6398bb..e6c36e5d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -22,6 +22,7 @@ class Sc20Monitor( suspend fun onVmStateChanged(server: Server) {} + suspend fun serverStateChanged(driver: VirtDriver, server: Server) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { @@ -57,8 +58,6 @@ class Sc20Monitor( hostServer: Server, duration: Long = 5 * 60 * 1000L ) { - lastServerStates.remove(hostServer) - // Assume for now that the host is not virtualized and measure the current power draw val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() -- cgit v1.2.3 From ae23970faa77c89408a4e98cb9259fb53e222bd3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 03:02:16 +0200 Subject: perf: Convert output format to parquet --- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 58 +++++++++++++++++----- .../opendc/experiments/sc20/TestExperiment.kt | 2 +- 2 files changed, 47 insertions(+), 13 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index e6c36e5d..02a982dc 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -6,23 +6,44 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first -import java.io.BufferedWriter +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable -import java.io.FileWriter class Sc20Monitor( destination: String ) : Closeable { - private val outputFile = BufferedWriter(FileWriter(destination)) private val lastServerStates = mutableMapOf>() - - init { - outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw\n") - } + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommisionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() suspend fun onVmStateChanged(server: Server) {} - suspend fun serverStateChanged(driver: VirtDriver, server: Server) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { @@ -63,12 +84,25 @@ class Sc20Monitor( val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw") - outputFile.newLine() + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommisionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + + writer.write(record) } override fun close() { - outputFile.flush() - outputFile.close() + writer.close() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index c75bde30..f3b5061c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -70,7 +70,7 @@ class ExperimentParameters(parser: ArgParser) { val environmentFile by parser.storing("path to the environment file") val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } val outputFile by parser.storing("path to where the output should be stored") - .default { "sc20-experiment-results.csv" } + .default { "data/results-${System.currentTimeMillis()}.parquet" } val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } .default { emptyList() } val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { -- cgit v1.2.3 From a77829c7a8cf300a99a695423d85f905e0209286 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 12:16:42 +0200 Subject: perf: Optimize performance interference --- .../com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 16 ++++++++++++++++ .../atlarge/opendc/experiments/sc20/TestExperiment.kt | 18 ++++++++---------- 2 files changed, 24 insertions(+), 10 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 02a982dc..464f4fc6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -34,6 +34,10 @@ class Sc20Monitor( .name("hostState").type().stringType().noDefault() .name("hostUsage").type().doubleType().noDefault() .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() .endRecord() private val writer = AvroParquetWriter.builder(Path(destination)) .withSchema(schema) @@ -58,6 +62,10 @@ class Sc20Monitor( 0.0, 0, server, + 0, + 0, + 0, + 0, duration ) } @@ -77,6 +85,10 @@ class Sc20Monitor( cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, duration: Long = 5 * 60 * 1000L ) { // Assume for now that the host is not virtualized and measure the current power draw @@ -98,6 +110,10 @@ class Sc20Monitor( record.put("hostState", hostServer.state) record.put("hostUsage", usage) record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) writer.write(record) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index f3b5061c..08815720 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -204,7 +204,11 @@ fun main(args: Array) { event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms ) } } @@ -229,15 +233,14 @@ fun main(args: Array) { null } + var submitted = 0L val finish = Channel(Channel.RENDEZVOUS) - var submitted = 0 - var finished = 0 val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() - delay(max(0, time - simulationContext.clock.millis())) submitted++ + delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) val server = scheduler.deploy( @@ -250,12 +253,7 @@ fun main(args: Array) { if (it is ServerEvent.StateChanged) monitor.onVmStateChanged(it.server) - // Detect whether the VM has finished running - if (it.server.state == ServerState.SHUTOFF) { - finished++ - } - - if (finished == submitted && !reader.hasNext()) { + if (scheduler.submittedVms == submitted && scheduler.runningVms <= 1 && !reader.hasNext()) { finish.send(Unit) } } -- cgit v1.2.3 From eab4c190142f54291ed235e4e18f3a35385a541c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 16:12:25 +0200 Subject: perf: Optimize trace loading for memory usage --- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 21 ++++++++++++++------- .../opendc/experiments/sc20/TestExperiment.kt | 9 ++------- 2 files changed, 16 insertions(+), 14 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 464f4fc6..7c198e56 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -25,7 +25,7 @@ class Sc20Monitor( .name("duration").type().longType().noDefault() .name("requestedBurst").type().longType().noDefault() .name("grantedBurst").type().longType().noDefault() - .name("overcommisionedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() .name("interferedBurst").type().longType().noDefault() .name("cpuUsage").type().doubleType().noDefault() .name("cpuDemand").type().doubleType().noDefault() @@ -48,7 +48,14 @@ class Sc20Monitor( suspend fun onVmStateChanged(server: Server) {} - suspend fun serverStateChanged(driver: VirtDriver, server: Server) { + suspend fun serverStateChanged( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { val duration = simulationContext.clock.millis() - lastServerState.second @@ -62,10 +69,10 @@ class Sc20Monitor( 0.0, 0, server, - 0, - 0, - 0, - 0, + submittedVms, + queuedVms, + runningVms, + finishedVms, duration ) } @@ -101,7 +108,7 @@ class Sc20Monitor( record.put("duration", duration) record.put("requestedBurst", requestedBurst) record.put("grantedBurst", grantedBurst) - record.put("overcommisionedBurst", overcommissionedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) record.put("interferedBurst", interferedBurst) record.put("cpuUsage", cpuUsage) record.put("cpuDemand", cpuDemand) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 08815720..3aef80e6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -29,7 +29,6 @@ import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent @@ -174,20 +173,16 @@ fun main(args: Array) { delay(10) val hypervisors = scheduler.drivers() - var availableHypervisors = hypervisors.size // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server) + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> when (event) { is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server) - - if (event.server.state == ServerState.ERROR) - availableHypervisors -= 1 + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } -- cgit v1.2.3 From 9a7bfac2475b1169c4aa9dee820dd30f412a39c1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 17:46:31 +0200 Subject: feat: Add support for seeding experiments --- .../kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 3aef80e6..79e749b5 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -76,6 +76,8 @@ class ExperimentParameters(parser: ArgParser) { parseVMs(FileReader(File(this)).readText()) } .default { emptyList() } + val seed by parser.storing("the random seed") { toInt() } + .default(0) val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") @@ -134,7 +136,7 @@ fun main(args: Array) { "active-servers-inv" to NumberOfActiveServersAllocationPolicy(true), "provisioned-cores" to ProvisionedCoresAllocationPolicy(), "provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true), - "random" to RandomAllocationPolicy() + "random" to RandomAllocationPolicy(Random(seed)) ) if (allocationPolicy !in allocationPolicies) { @@ -215,7 +217,7 @@ fun main(args: Array) { val domain = root.newDomain(name = "failures") domain.launch { chan.receive() - val random = Random(0) + val random = Random(seed) val injectors = mutableMapOf() for (node in bareMetalProvisioner.nodes()) { val cluster = node.metadata[NODE_CLUSTER] as String @@ -231,7 +233,7 @@ fun main(args: Array) { var submitted = 0L val finish = Channel(Channel.RENDEZVOUS) - val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() submitted++ -- cgit v1.2.3 From 0a8ec3216642b0991d53baff5498916fc859009a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 17:54:12 +0200 Subject: feat: Print settings to console --- .../kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 79e749b5..552dcb63 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -119,6 +119,14 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector { @OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array) { ArgParser(args).parseInto(::ExperimentParameters).run { + println("trace-directory: $traceDirectory") + println("environment-file: $environmentFile") + println("performance-interference-file: $performanceInterferenceFile") + println("selected-vms-file: $selectedVmsFile") + println("seed: $seed") + println("failures: $failures") + println("allocation-policy: $allocationPolicy") + val start = System.currentTimeMillis() val monitor = Sc20Monitor(outputFile) -- cgit v1.2.3 From e097aeb16d77c260126b65c7f13330076d800d52 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 01:35:33 +0200 Subject: perf: Convert traces to Parquet format --- .../experiments/sc20/Sc20ParquetTraceReader.kt | 229 +++++++++++++++++++++ .../opendc/experiments/sc20/TestExperiment.kt | 7 +- .../opendc/experiments/sc20/TraceConverter.kt | 190 +++++++++++++++++ 3 files changed, 423 insertions(+), 3 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..30456204 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -0,0 +1,229 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.io.api.Binary +import java.io.File +import java.io.Serializable +import java.util.Deque +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.LinkedBlockingDeque +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20ParquetTraceReader( + traceFile: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List, + random: Random +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Fill the buffers of the VMs + */ + private fun pull(reader: ParquetReader, buffers: Map>) { + if (!hasNext) { + return + } + + repeat(buffers.size) { + val record = reader.read() + + if (record == null) { + hasNext = false + reader.close() + return + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + buffers[id]?.add(fragment) + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + val buffers = mutableMapOf>() + + val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + var idx = 0 + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + + println(id) + + val buffer = LinkedBlockingDeque() + buffers[id] = buffer + val fragments = sequence { + while (true) { + if (buffer.isEmpty()) { + if (hasNext) { + pull(reader, buffers) + continue + } else { + break + } + } + yield(buffer.poll()) + } + } + val uuid = UUID(0, (idx++).toLong()) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uuid, "VM Workload $id", UnnamedUser, + VmImage( + uuid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) + + entries[uuid] = TraceEntryImpl( + submissionTime, + vmWorkload + ) + } + + metaReader.close() + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 552dcb63..19055ad3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -44,7 +44,6 @@ import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser @@ -241,7 +240,7 @@ fun main(args: Array) { var submitted = 0L val finish = Channel(Channel.RENDEZVOUS) - val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) + val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() submitted++ @@ -267,8 +266,10 @@ fun main(args: Array) { } finish.receive() - scheduler.terminate() failureDomain?.cancel() + launch { + scheduler.terminate() + } println(simulationContext.clock.instant()) println("${System.currentTimeMillis() - start} milliseconds") } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt new file mode 100644 index 00000000..7f429b89 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -0,0 +1,190 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import kotlin.math.max +import kotlin.math.min + +/** + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +fun main() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + val dest = File("../traces/solvinity/small-parquet") + val traceDirectory = File("../traces/solvinity/small") + val vms = + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + + val metaWriter = AvroParquetWriter.builder(Path(dest.absolutePath, "meta.parquet")) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val allFragments = mutableListOf() + + vms + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + + vmId = vmFile.name + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + val writer = AvroParquetWriter.builder(Path(dest.absolutePath, "trace.parquet")) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + + writer.close() + metaWriter.close() +} + +data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) -- cgit v1.2.3 From 6cd93b57945b289b2e14556f7ceaa193326eff78 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 15:30:34 +0200 Subject: bug: Fix issues related to early termination --- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 23 +++- .../experiments/sc20/Sc20ParquetTraceReader.kt | 121 ++++++++++++++------- .../opendc/experiments/sc20/TestExperiment.kt | 27 +++-- .../opendc/experiments/sc20/TraceConverter.kt | 7 +- 4 files changed, 122 insertions(+), 56 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 7c198e56..bac0de21 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -12,6 +12,8 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread class Sc20Monitor( destination: String @@ -45,6 +47,19 @@ class Sc20Monitor( .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() + private val queue = ArrayBlockingQueue(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } suspend fun onVmStateChanged(server: Server) {} @@ -122,10 +137,14 @@ class Sc20Monitor( record.put("totalRunningVms", runningVms) record.put("totalFinishedVms", finishedVms) - writer.write(record) + queue.put(record) } override fun close() { - writer.close() + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 30456204..24ff9eed 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -39,7 +39,6 @@ import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.io.api.Binary import java.io.File import java.io.Serializable @@ -47,7 +46,9 @@ import java.util.Deque import java.util.SortedSet import java.util.TreeSet import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.LinkedBlockingDeque +import kotlin.concurrent.thread import kotlin.random.Random /** @@ -69,37 +70,82 @@ class Sc20ParquetTraceReader( private val iterator: Iterator> /** - * Fill the buffers of the VMs + * The intermediate buffer to store the read records in. */ - private fun pull(reader: ParquetReader, buffers: Map>) { + private val queue = ArrayBlockingQueue>(128) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map>) { if (!hasNext) { return } - repeat(buffers.size) { - val record = reader.read() + repeat(16) { + val (id, fragment) = queue.take() - if (record == null) { + if (id == poison.first) { hasNext = false - reader.close() return } - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = FlopsHistoryFragment( - tick, - flops, - duration, - cpuUsage, - cores - ) - buffers[id]?.add(fragment) } } @@ -116,30 +162,20 @@ class Sc20ParquetTraceReader( val entries = mutableMapOf>() val buffers = mutableMapOf>() - val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) - val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } .build() - val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - var idx = 0 while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long val maxCores = record["maxCores"] as Int val requiredMemory = record["requiredMemory"] as Long - + val uuid = UUID(0, (idx++).toLong()) println(id) val buffer = LinkedBlockingDeque() @@ -148,16 +184,23 @@ class Sc20ParquetTraceReader( while (true) { if (buffer.isEmpty()) { if (hasNext) { - pull(reader, buffers) + pull(buffers) continue } else { break } } - yield(buffer.poll()) + + val fragment = buffer.poll() + yield(fragment) + + if (fragment.tick >= endTime) { + break + } } + + buffers.remove(id) } - val uuid = UUID(0, (idx++).toLong()) val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), @@ -191,7 +234,9 @@ class Sc20ParquetTraceReader( override fun next(): TraceEntry = iterator.next() - override fun close() {} + override fun close() { + readerThread.interrupt() + } private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 19055ad3..8478b592 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -237,13 +237,10 @@ fun main(args: Array) { null } - var submitted = 0L - val finish = Channel(Channel.RENDEZVOUS) - + val finished = Channel(Channel.RENDEZVOUS) val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() - submitted++ delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) @@ -254,24 +251,26 @@ fun main(args: Array) { // Monitor server events server.events .onEach { - if (it is ServerEvent.StateChanged) + if (it is ServerEvent.StateChanged) { monitor.onVmStateChanged(it.server) - - if (scheduler.submittedVms == submitted && scheduler.runningVms <= 1 && !reader.hasNext()) { - finish.send(Unit) } + + finished.send(Unit) } .collect() } } - finish.receive() - failureDomain?.cancel() - launch { - scheduler.terminate() + while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) { + finished.receive() } - println(simulationContext.clock.instant()) - println("${System.currentTimeMillis() - start} milliseconds") + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + reader.close() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") } runBlocking { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt index 7f429b89..d005a157 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -46,6 +46,7 @@ fun main() { .fields() .name("id").type().stringType().noDefault() .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() .name("maxCores").type().intType().noDefault() .name("requiredMemory").type().longType().noDefault() .endRecord() @@ -92,7 +93,6 @@ fun main() { var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L var cores = -1 var minTime = Long.MAX_VALUE @@ -112,7 +112,7 @@ fun main() { val values = line.split(" ") vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) @@ -150,13 +150,16 @@ fun main() { } } + var maxTime = Long.MIN_VALUE flopsFragments.forEach { fragment -> allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) } val metaRecord = GenericData.Record(metaSchema) metaRecord.put("id", vmId) metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) metaRecord.put("maxCores", maxCores) metaRecord.put("requiredMemory", requiredMemory) metaWriter.write(metaRecord) -- cgit v1.2.3 From 3e056406616860c77168d827f1ca9d8d3c79c08e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Apr 2020 12:13:09 +0200 Subject: perf: Minor tweaks in trace fetching --- .../experiments/sc20/Sc20ParquetTraceReader.kt | 138 ++++++++++++--------- .../opendc/experiments/sc20/TestExperiment.kt | 4 +- 2 files changed, 80 insertions(+), 62 deletions(-) (limited to 'opendc/opendc-experiments-sc20/src') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 24ff9eed..0a7718e9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -42,12 +42,10 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import java.io.File import java.io.Serializable -import java.util.Deque import java.util.SortedSet import java.util.TreeSet import java.util.UUID import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.LinkedBlockingDeque import kotlin.concurrent.thread import kotlin.random.Random @@ -72,7 +70,7 @@ class Sc20ParquetTraceReader( /** * The intermediate buffer to store the read records in. */ - private val queue = ArrayBlockingQueue>(128) + private val queue = ArrayBlockingQueue>(1024) /** * An optional filter for filtering the selected VMs @@ -133,20 +131,20 @@ class Sc20ParquetTraceReader( /** * Fill the buffers with the VMs */ - private fun pull(buffers: Map>) { + private fun pull(buffers: Map>>) { if (!hasNext) { return } - repeat(16) { - val (id, fragment) = queue.take() + val fragments = mutableListOf>() + queue.drainTo(fragments) + for ((id, fragment) in fragments) { if (id == poison.first) { hasNext = false return } - - buffers[id]?.add(fragment) + buffers[id]?.forEach { it.add(fragment) } } } @@ -159,75 +157,93 @@ class Sc20ParquetTraceReader( * Initialize the reader. */ init { - val entries = mutableMapOf>() - val buffers = mutableMapOf>() + val takenIds = mutableSetOf() + val entries = mutableMapOf() + val buffers = mutableMapOf>>() val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } .build() - var idx = 0 while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uuid = UUID(0, (idx++).toLong()) - println(id) - - val buffer = LinkedBlockingDeque() - buffers[id] = buffer - val fragments = sequence { - while (true) { - if (buffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break + entries[id] = record + } + + metaReader.close() + + val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + println(id) + + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence { + repeat@while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } } - } - val fragment = buffer.poll() - yield(fragment) + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() - if (fragment.tick >= endTime) { - break - } - } + for (fragment in internalBuffer) { + yield(fragment) - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), - Random(random.nextInt()) - ) - val vmWorkload = VmWorkload( - uuid, "VM Workload $id", UnnamedUser, - VmImage( - uuid, - id, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - fragments, - maxCores, - requiredMemory - ) - ) + if (fragment.tick >= endTime) { + break@repeat + } + } - entries[uuid] = TraceEntryImpl( - submissionTime, - vmWorkload - ) - } + internalBuffer.clear() + } - metaReader.close() + buffers.remove(id) + } + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uid, "VM Workload $id", UnnamedUser, + VmImage( + uid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) - // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + TraceEntryImpl(submissionTime, vmWorkload) + } + .sortedBy { it.submissionTime } + .toList() + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 8478b592..028cfb9a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -237,10 +237,12 @@ fun main(args: Array) { null } + var submitted = 0L val finished = Channel(Channel.RENDEZVOUS) val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() + submitted++ delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) @@ -261,7 +263,7 @@ fun main(args: Array) { } } - while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) { + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { finished.receive() } -- cgit v1.2.3