summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-16 12:34:53 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-19 14:02:52 +0200
commit474044649a67cfcc857615b6a0f8387a2954abbd (patch)
tree5bb6bf9b0ca0978a47ac50ced85c245588465daa /opendc-compute/opendc-compute-workload
parent9b25eef67911d0aec6a36c82a34cd0e39b13b073 (diff)
feat(trace): Update OpenDC VM trace format
This change optimizes the OpenDC VM trace format by removing unnecessary columns as well as optimizing the writer settings. The new implementation still supports reading the old trace format in case users run OpenDC with older workload traces.
Diffstat (limited to 'opendc-compute/opendc-compute-workload')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt86
2 files changed, 56 insertions, 36 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index afc0fce9..c92b212f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -65,7 +65,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val id = reader.get(RESOURCE_STATE_ID)
val time = reader.get(RESOURCE_STATE_TIMESTAMP)
val duration = reader.get(RESOURCE_STATE_DURATION)
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT)
val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val fragment = SimTraceWorkload.Fragment(
@@ -75,7 +75,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
cores
)
- fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment)
}
fragments
@@ -103,7 +103,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val submissionTime = reader.get(RESOURCE_START_TIME)
val endTime = reader.get(RESOURCE_STOP_TIME)
- val maxCores = reader.getInt(RESOURCE_NCPUS)
+ val maxCores = reader.getInt(RESOURCE_CPU_COUNT)
val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
index 50f3a669..2d570787 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.workload.vm.trace
+package org.opendc.compute.workload.trace
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
@@ -42,6 +42,7 @@ import org.opendc.trace.opendc.OdcVmTraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.File
import java.util.*
+import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
import kotlin.math.roundToLong
@@ -112,16 +113,21 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val selectedVms = metaWriter.use { convertResources(trace, it) }
+ if (selectedVms.isEmpty()) {
+ logger.warn { "No VMs selected" }
+ return
+ }
+
logger.info { "Wrote ${selectedVms.size} rows" }
logger.info { "Building resource states table" }
val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
.withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
.withCompressionCodec(CompressionCodecName.ZSTD)
- .enableDictionaryEncoding()
- .enablePageWriteChecksum()
+ .withDictionaryEncoding("id", true)
.withBloomFilterEnabled("id", true)
.withBloomFilterNDV("id", selectedVms.size.toLong())
+ .enableValidation()
.build()
val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
@@ -154,7 +160,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
startTime = min(startTime, timestamp)
stopTime = max(stopTime, timestamp)
- numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
+ numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT))
memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
@@ -172,10 +178,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA)
builder["id"] = id
- builder["submissionTime"] = startTime
- builder["endTime"] = stopTime
- builder["maxCores"] = numCpus
- builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
+ builder["start_time"] = startTime
+ builder["stop_time"] = stopTime
+ builder["cpu_count"] = numCpus
+ builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong()
logger.info { "Selecting VM $id" }
@@ -194,44 +200,58 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var hasNextRow = reader.nextRow()
var count = 0
+ var lastId: String? = null
+ var lastTimestamp = 0L
while (hasNextRow) {
- var lastTimestamp = Long.MIN_VALUE
+ val id = reader.get(RESOURCE_STATE_ID)
- do {
- val id = reader.get(RESOURCE_STATE_ID)
+ if (id !in selectedVms) {
+ hasNextRow = reader.nextRow()
+ continue
+ }
- if (id !in selectedVms) {
- hasNextRow = reader.nextRow()
- continue
- }
+ val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- builder["id"] = id
+ val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ var timestamp = startTimestamp
+ var duration: Long
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
+ // Check whether the previous entry is from a different VM
+ if (id != lastId) {
+ lastTimestamp = timestamp - 5 * 60 * 1000L
+ }
+
+ do {
+ timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+
+ duration = timestamp - lastTimestamp
+ hasNextRow = reader.nextRow()
+
+ if (!hasNextRow) {
+ break
}
- val duration = timestamp - lastTimestamp
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val flops = (cpuUsage * duration / 1000.0).roundToLong()
+ val shouldContinue = id == reader.get(RESOURCE_STATE_ID) &&
+ abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 &&
+ cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ } while (shouldContinue)
- builder["time"] = timestamp
- builder["duration"] = duration
- builder["cores"] = cores
- builder["cpuUsage"] = cpuUsage
- builder["flops"] = flops
+ val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- writer.write(builder.build())
+ builder["id"] = id
+ builder["timestamp"] = startTimestamp
+ builder["duration"] = duration
+ builder["cpu_count"] = cpuCount
+ builder["cpu_usage"] = cpuUsage
- lastTimestamp = timestamp
- hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+ writer.write(builder.build())
count++
+
+ lastId = id
+ lastTimestamp = timestamp
}
return count