summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt86
2 files changed, 56 insertions, 36 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index afc0fce9..c92b212f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -65,7 +65,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val id = reader.get(RESOURCE_STATE_ID)
val time = reader.get(RESOURCE_STATE_TIMESTAMP)
val duration = reader.get(RESOURCE_STATE_DURATION)
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT)
val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val fragment = SimTraceWorkload.Fragment(
@@ -75,7 +75,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
cores
)
- fragments.getOrPut(id) { mutableListOf() }.add(fragment)
+ fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment)
}
fragments
@@ -103,7 +103,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val submissionTime = reader.get(RESOURCE_START_TIME)
val endTime = reader.get(RESOURCE_STOP_TIME)
- val maxCores = reader.getInt(RESOURCE_NCPUS)
+ val maxCores = reader.getInt(RESOURCE_CPU_COUNT)
val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
index 50f3a669..2d570787 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.workload.vm.trace
+package org.opendc.compute.workload.trace
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
@@ -42,6 +42,7 @@ import org.opendc.trace.opendc.OdcVmTraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.File
import java.util.*
+import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
import kotlin.math.roundToLong
@@ -112,16 +113,21 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val selectedVms = metaWriter.use { convertResources(trace, it) }
+ if (selectedVms.isEmpty()) {
+ logger.warn { "No VMs selected" }
+ return
+ }
+
logger.info { "Wrote ${selectedVms.size} rows" }
logger.info { "Building resource states table" }
val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
.withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
.withCompressionCodec(CompressionCodecName.ZSTD)
- .enableDictionaryEncoding()
- .enablePageWriteChecksum()
+ .withDictionaryEncoding("id", true)
.withBloomFilterEnabled("id", true)
.withBloomFilterNDV("id", selectedVms.size.toLong())
+ .enableValidation()
.build()
val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
@@ -154,7 +160,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
startTime = min(startTime, timestamp)
stopTime = max(stopTime, timestamp)
- numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
+ numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT))
memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
@@ -172,10 +178,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA)
builder["id"] = id
- builder["submissionTime"] = startTime
- builder["endTime"] = stopTime
- builder["maxCores"] = numCpus
- builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
+ builder["start_time"] = startTime
+ builder["stop_time"] = stopTime
+ builder["cpu_count"] = numCpus
+ builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong()
logger.info { "Selecting VM $id" }
@@ -194,44 +200,58 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var hasNextRow = reader.nextRow()
var count = 0
+ var lastId: String? = null
+ var lastTimestamp = 0L
while (hasNextRow) {
- var lastTimestamp = Long.MIN_VALUE
+ val id = reader.get(RESOURCE_STATE_ID)
- do {
- val id = reader.get(RESOURCE_STATE_ID)
+ if (id !in selectedVms) {
+ hasNextRow = reader.nextRow()
+ continue
+ }
- if (id !in selectedVms) {
- hasNextRow = reader.nextRow()
- continue
- }
+ val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- builder["id"] = id
+ val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ var timestamp = startTimestamp
+ var duration: Long
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
- if (lastTimestamp < 0) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
+ // Check whether the previous entry is from a different VM
+ if (id != lastId) {
+ lastTimestamp = timestamp - 5 * 60 * 1000L
+ }
+
+ do {
+ timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+
+ duration = timestamp - lastTimestamp
+ hasNextRow = reader.nextRow()
+
+ if (!hasNextRow) {
+ break
}
- val duration = timestamp - lastTimestamp
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val flops = (cpuUsage * duration / 1000.0).roundToLong()
+ val shouldContinue = id == reader.get(RESOURCE_STATE_ID) &&
+ abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 &&
+ cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ } while (shouldContinue)
- builder["time"] = timestamp
- builder["duration"] = duration
- builder["cores"] = cores
- builder["cpuUsage"] = cpuUsage
- builder["flops"] = flops
+ val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- writer.write(builder.build())
+ builder["id"] = id
+ builder["timestamp"] = startTimestamp
+ builder["duration"] = duration
+ builder["cpu_count"] = cpuCount
+ builder["cpu_usage"] = cpuUsage
- lastTimestamp = timestamp
- hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+ writer.write(builder.build())
count++
+
+ lastId = id
+ lastTimestamp = timestamp
}
return count