diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-15 03:02:16 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-15 03:02:16 +0200 |
| commit | ae23970faa77c89408a4e98cb9259fb53e222bd3 (patch) | |
| tree | 7ef0599deb04b4523b4fda881b395d9c0948a936 | |
| parent | b5fab8f707d4aeb0d045b53f571c3dc826c69570 (diff) | |
perf: Convert output format to parquet
4 files changed, 55 insertions, 14 deletions
@@ -1,5 +1,9 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + +# data +data/ + ### JetBrains # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 28b8ae12..d23456a8 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -40,7 +40,10 @@ dependencies { implementation(kotlin("stdlib")) implementation("com.xenomachina:kotlin-argparser:2.0.7") api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") - + implementation("org.apache.parquet:parquet-avro:1.11.0") + implementation("org.apache.hadoop:hadoop-client:3.2.1") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + } runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") runtimeOnly(project(":odcsim:odcsim-engine-omega")) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index e6c36e5d..02a982dc 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -6,23 +6,44 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first -import java.io.BufferedWriter +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable -import java.io.FileWriter class Sc20Monitor( destination: String ) : Closeable { - private val outputFile = BufferedWriter(FileWriter(destination)) private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - - init { - outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw\n") - } + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommisionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() suspend fun onVmStateChanged(server: Server) {} - suspend fun serverStateChanged(driver: VirtDriver, server: Server) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { @@ -63,12 +84,25 @@ class Sc20Monitor( val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw") - outputFile.newLine() + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommisionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + + writer.write(record) } override fun close() { - outputFile.flush() - outputFile.close() + writer.close() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index c75bde30..f3b5061c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -70,7 +70,7 @@ class ExperimentParameters(parser: ArgParser) { val environmentFile by parser.storing("path to the environment file") val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } val outputFile by parser.storing("path to where the output should be stored") - .default { "sc20-experiment-results.csv" } + .default { "data/results-${System.currentTimeMillis()}.parquet" } val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } .default { emptyList() } val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { |
