summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-15 03:02:16 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-15 03:02:16 +0200
commitae23970faa77c89408a4e98cb9259fb53e222bd3 (patch)
tree7ef0599deb04b4523b4fda881b395d9c0948a936 /opendc
parentb5fab8f707d4aeb0d045b53f571c3dc826c69570 (diff)
perf: Convert output format to parquet
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts5
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt58
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt2
3 files changed, 51 insertions, 14 deletions
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index 28b8ae12..d23456a8 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -40,7 +40,10 @@ dependencies {
implementation(kotlin("stdlib"))
implementation("com.xenomachina:kotlin-argparser:2.0.7")
api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
-
+ implementation("org.apache.parquet:parquet-avro:1.11.0")
+ implementation("org.apache.hadoop:hadoop-client:3.2.1") {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index e6c36e5d..02a982dc 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -6,23 +6,44 @@ import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import kotlinx.coroutines.flow.first
-import java.io.BufferedWriter
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.io.Closeable
-import java.io.FileWriter
class Sc20Monitor(
destination: String
) : Closeable {
- private val outputFile = BufferedWriter(FileWriter(destination))
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
-
- init {
- outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw\n")
- }
+ private val schema = SchemaBuilder
+ .record("slice")
+ .namespace("com.atlarge.opendc.experiments.sc20")
+ .fields()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("requestedBurst").type().longType().noDefault()
+ .name("grantedBurst").type().longType().noDefault()
+ .name("overcommisionedBurst").type().longType().noDefault()
+ .name("interferedBurst").type().longType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("cpuDemand").type().doubleType().noDefault()
+ .name("numberOfDeployedImages").type().intType().noDefault()
+ .name("server").type().stringType().noDefault()
+ .name("hostState").type().stringType().noDefault()
+ .name("hostUsage").type().doubleType().noDefault()
+ .name("powerDraw").type().doubleType().noDefault()
+ .endRecord()
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
suspend fun onVmStateChanged(server: Server) {}
-
suspend fun serverStateChanged(driver: VirtDriver, server: Server) {
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
@@ -63,12 +84,25 @@ class Sc20Monitor(
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
- outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw")
- outputFile.newLine()
+ val record = GenericData.Record(schema)
+ record.put("time", time)
+ record.put("duration", duration)
+ record.put("requestedBurst", requestedBurst)
+ record.put("grantedBurst", grantedBurst)
+ record.put("overcommisionedBurst", overcommissionedBurst)
+ record.put("interferedBurst", interferedBurst)
+ record.put("cpuUsage", cpuUsage)
+ record.put("cpuDemand", cpuDemand)
+ record.put("numberOfDeployedImages", numberOfDeployedImages)
+ record.put("server", hostServer.uid)
+ record.put("hostState", hostServer.state)
+ record.put("hostUsage", usage)
+ record.put("powerDraw", powerDraw)
+
+ writer.write(record)
}
override fun close() {
- outputFile.flush()
- outputFile.close()
+ writer.close()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index c75bde30..f3b5061c 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -70,7 +70,7 @@ class ExperimentParameters(parser: ArgParser) {
val environmentFile by parser.storing("path to the environment file")
val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null }
val outputFile by parser.storing("path to where the output should be stored")
- .default { "sc20-experiment-results.csv" }
+ .default { "data/results-${System.currentTimeMillis()}.parquet" }
val selectedVms by parser.storing("the VMs to run") { parseVMs(this) }
.default { emptyList() }
val selectedVmsFile by parser.storing("path to a file containing the VMs to run") {