summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-12 12:08:55 +0200
committerGitHub <noreply@github.com>2021-09-12 12:08:55 +0200
commit2cd3bd18e548a72d64afe0e7f59487f4747d722f (patch)
treedc9e2fba5ca4d19a90934a8b68dbb8110ee34bb7 /opendc-experiments/opendc-experiments-capelin/src
parentcae193284570d6ee9dbacdde57b3e4e367aa9d9f (diff)
parent992b65396f55c0e12b36823d191dea8e03dd45ba (diff)
merge: Add support for new trace formats
This pull request updates the trace API with the addition of several new trace formats. - Add support for Materna traces from GWA - Keep reader state in own class - Parse last column in Solvinity trace format - Add support Azure VM traces - Add support for WfCommons (WorkflowHub) traces - Add API for accessing available table columns - Add synthetic resource table for Bitbrains format - Support dynamic resolving of trace formats **Breaking API Changes** - Replace `isSupported` by a list of `TableColumns`
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt488
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt127
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt149
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt54
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt169
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt46
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt56
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt55
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt36
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt144
14 files changed, 916 insertions, 456 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
index fa4e9ed8..ca937328 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt
@@ -90,9 +90,9 @@ class RawParquetTraceReader(private val path: File) {
}
val submissionTime = reader.get(RESOURCE_START_TIME)
- val endTime = reader.get(RESOURCE_END_TIME)
+ val endTime = reader.get(RESOURCE_STOP_TIME)
val maxCores = reader.getInt(RESOURCE_NCPUS)
- val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY)
+ val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val vmFragments = fragments.getValue(id).asSequence()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index a021de8d..1f3878eb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -25,80 +25,74 @@ package org.opendc.experiments.capelin.trace
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
import com.github.ajalt.clikt.parameters.groups.OptionGroup
-import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.groups.cooccurring
import com.github.ajalt.clikt.parameters.options.*
-import com.github.ajalt.clikt.parameters.types.file
-import com.github.ajalt.clikt.parameters.types.long
-import me.tongfei.progressbar.ProgressBar
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
+import com.github.ajalt.clikt.parameters.types.*
+import mu.KotlinLogging
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat
+import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA
+import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA
import org.opendc.experiments.capelin.trace.sv.SvTraceFormat
import org.opendc.trace.*
import org.opendc.trace.bitbrains.BitbrainsTraceFormat
-import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.BufferedReader
import java.io.File
-import java.io.FileReader
import java.util.*
import kotlin.math.max
import kotlin.math.min
+import kotlin.math.roundToLong
+
+/**
+ * A script to convert a trace in text format into a Parquet trace.
+ */
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
/**
* Represents the command for converting traces
*/
class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
+ * The logger instance for the converter.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The directory where the trace should be stored.
*/
- private val outputPath by option("-O", "--output", help = "path to store the trace")
+ private val output by option("-O", "--output", help = "path to store the trace")
.file(canBeFile = false, mustExist = false)
.defaultLazy { File("output") }
/**
* The directory where the input trace is located.
*/
- private val inputPath by argument("input", help = "path to the input trace")
+ private val input by argument("input", help = "path to the input trace")
.file(canBeFile = false)
/**
- * The input type of the trace.
+ * The input format of the trace.
*/
- private val type by option("-t", "--type", help = "input type of trace").groupChoice(
- "solvinity" to SolvinityConversion(),
- "bitbrains" to BitbrainsConversion(),
- "azure" to AzureConversion()
- )
+ private val format by option("-f", "--format", help = "input format of trace")
+ .choice(
+ "solvinity" to SvTraceFormat(),
+ "bitbrains" to BitbrainsTraceFormat(),
+ "azure" to AzureTraceFormat()
+ )
+ .required()
+
+ /**
+ * The sampling options.
+ */
+ private val samplingOptions by SamplingOptions().cooccurring()
override fun run() {
- val metaSchema = SchemaBuilder
- .record("meta")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("submissionTime").type().longType().noDefault()
- .name("endTime").type().longType().noDefault()
- .name("maxCores").type().intType().noDefault()
- .name("requiredMemory").type().longType().noDefault()
- .endRecord()
- val schema = SchemaBuilder
- .record("trace")
- .namespace("org.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("cores").type().intType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("flops").type().longType().noDefault()
- .endRecord()
-
- val metaParquet = File(outputPath, "meta.parquet")
- val traceParquet = File(outputPath, "trace.parquet")
+ val metaParquet = File(output, "meta.parquet")
+ val traceParquet = File(output, "trace.parquet")
if (metaParquet.exists()) {
metaParquet.delete()
@@ -107,324 +101,160 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
+ val trace = format.open(input.toURI().toURL())
+
+ logger.info { "Building resources table" }
+
val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(metaSchema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .withSchema(BP_RESOURCES_SCHEMA)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .enablePageWriteChecksum()
.build()
+ val selectedVms = metaWriter.use { convertResources(trace, it) }
+
+ logger.info { "Wrote ${selectedVms.size} rows" }
+ logger.info { "Building resource states table" }
+
val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .withSchema(BP_RESOURCE_STATES_SCHEMA)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .enableDictionaryEncoding()
+ .enablePageWriteChecksum()
+ .withBloomFilterEnabled("id", true)
+ .withBloomFilterNDV("id", selectedVms.size.toLong())
.build()
- try {
- val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
- val allFragments = type.read(inputPath, metaSchema, metaWriter)
- allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
-
- for (fragment in allFragments) {
- val record = GenericData.Record(schema)
- record.put("id", fragment.id)
- record.put("time", fragment.tick)
- record.put("duration", fragment.duration)
- record.put("cores", fragment.cores)
- record.put("cpuUsage", fragment.usage)
- record.put("flops", fragment.flops)
-
- writer.write(record)
- }
- } finally {
- writer.close()
- metaWriter.close()
- }
+ val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
+ logger.info { "Wrote $statesCount rows" }
}
-}
-/**
- * The supported trace conversions.
- */
-sealed class TraceConversion(name: String) : OptionGroup(name) {
/**
- * Read the fragments of the trace.
+ * Convert the resources table for the trace.
*/
- abstract fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment>
-}
-
-/**
- * A [TraceConversion] that uses the Trace API to perform the conversion.
- */
-abstract class AbstractConversion(name: String) : TraceConversion(name) {
- abstract val format: TraceFormat
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val fragments = mutableListOf<Fragment>()
- val trace = format.open(traceDirectory.toURI().toURL())
+ private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
+ val random = samplingOptions?.let { Random(it.seed) }
+ val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
- var lastId: String? = null
- var maxCores = Int.MIN_VALUE
- var requiredMemory = Long.MIN_VALUE
- var minTime = Long.MAX_VALUE
- var maxTime = Long.MIN_VALUE
- var lastTimestamp = Long.MIN_VALUE
-
- while (reader.nextRow()) {
- val id = reader.get(RESOURCE_STATE_ID)
-
- if (lastId != null && lastId != id) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", lastId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
- }
- lastId = id
+ var hasNextRow = reader.nextRow()
+ val selectedVms = mutableSetOf<String>()
- val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
- val timestampMs = timestamp.toEpochMilli()
- val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
- val cores = reader.getInt(RESOURCE_STATE_NCPUS)
- val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ while (hasNextRow) {
+ var id: String
+ var numCpus = Int.MIN_VALUE
+ var memCapacity = Double.MIN_VALUE
+ var memUsage = Double.MIN_VALUE
+ var startTime = Long.MAX_VALUE
+ var stopTime = Long.MIN_VALUE
- maxCores = max(maxCores, cores)
- requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong())
+ do {
+ id = reader.get(RESOURCE_STATE_ID)
- if (lastTimestamp < 0) {
- lastTimestamp = timestampMs - 5 * 60 * 1000L
- minTime = min(minTime, lastTimestamp)
- }
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ startTime = min(startTime, timestamp)
+ stopTime = max(stopTime, timestamp)
+
+ numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS))
- if (fragments.isEmpty()) {
- val duration = 5 * 60 * 1000L
- val flops: Long = (cpuUsage * duration / 1000).toLong()
- fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores))
- } else {
- val last = fragments.last()
- val duration = timestampMs - lastTimestamp
- val flops: Long = (cpuUsage * duration / 1000).toLong()
-
- // Perform run-length encoding
- if (last.id == id && (duration == 0L || last.usage == cpuUsage)) {
- fragments[fragments.size - 1] = last.copy(duration = last.duration + duration)
- } else {
- fragments.add(
- Fragment(
- id,
- lastTimestamp,
- flops,
- duration,
- cpuUsage,
- cores
- )
- )
+ memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
+ if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
+ memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE))
}
+
+ hasNextRow = reader.nextRow()
+ } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+
+ // Sample only a fraction of the VMs
+ if (random != null && random.nextDouble() > samplingFraction) {
+ continue
}
- val last = fragments.last()
- maxTime = max(maxTime, last.tick + last.duration)
- lastTimestamp = timestampMs
+ val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA)
+
+ builder["id"] = id
+ builder["submissionTime"] = startTime
+ builder["endTime"] = stopTime
+ builder["maxCores"] = numCpus
+ builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong()
+
+ logger.info { "Selecting VM $id" }
+
+ writer.write(builder.build())
+ selectedVms.add(id)
}
- return fragments
+
+ return selectedVms
}
-}
-class SolvinityConversion : AbstractConversion("Solvinity") {
- override val format: TraceFormat = SvTraceFormat()
-}
+ /**
+ * Convert the resource states table for the trace.
+ */
+ private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-/**
- * Conversion of the Bitbrains public trace.
- */
-class BitbrainsConversion : AbstractConversion("Bitbrains") {
- override val format: TraceFormat = BitbrainsTraceFormat()
-}
+ var hasNextRow = reader.nextRow()
+ var count = 0
-/**
- * Conversion of the Azure public VM trace.
- */
-class AzureConversion : TraceConversion("Azure") {
- private val seed by option(help = "seed for trace sampling")
- .long()
- .default(0)
-
- override fun read(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>
- ): MutableList<Fragment> {
- val random = Random(seed)
- val fraction = 0.01
-
- // Read VM table
- val vmIdTableCol = 0
- val coreTableCol = 9
- val provisionedMemoryTableCol = 10
-
- var vmId: String
- var cores: Int
- var requiredMemory: Long
-
- val vmIds = mutableSetOf<String>()
- val vmIdToMetadata = mutableMapOf<String, VmInfo>()
-
- BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
- reader.lineSequence()
- .chunked(1024)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
- // Sample only a fraction of the VMs
- if (random.nextDouble() > fraction) {
- continue
- }
-
- val values = line.split(",")
-
- // Exclude VMs with a large number of cores (not specified exactly)
- if (values[coreTableCol].contains(">")) {
- continue
- }
-
- vmId = values[vmIdTableCol].trim()
- cores = values[coreTableCol].trim().toInt()
- requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
-
- vmIds.add(vmId)
- vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
- }
+ while (hasNextRow) {
+ var lastTimestamp = Long.MIN_VALUE
+
+ do {
+ val id = reader.get(RESOURCE_STATE_ID)
+
+ if (id !in selectedVms) {
+ hasNextRow = reader.nextRow()
+ continue
}
- }
- // Read VM metric reading files
- val timestampCol = 0
- val vmIdCol = 1
- val cpuUsageCol = 4
- val traceInterval = 5 * 60 * 1000L
-
- val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
- val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
- val allFragments = mutableListOf<Fragment>()
-
- for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
- val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
- var timestamp: Long
- var cpuUsage: Double
-
- BufferedReader(FileReader(readingsFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(",")
- vmId = values[vmIdCol].trim()
-
- // Ignore readings for VMs not in the sample
- if (!vmIds.contains(vmId)) {
- continue
- }
-
- timestamp = values[timestampCol].trim().toLong() * 1000L
- vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
- cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
- vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
- val lastFragment = vmIdToLastFragment[vmId]
-
- vmIdToLastFragment[vmId] =
- if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
- Fragment(
- vmId,
- lastFragment.tick,
- lastFragment.flops + flops,
- lastFragment.duration + traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- if (lastFragment != null) {
- if (vmIdToFragments[vmId] == null) {
- vmIdToFragments[vmId] = mutableListOf()
- }
- vmIdToFragments[vmId]!!.add(lastFragment)
- allFragments.add(lastFragment)
- }
- fragment
- }
- }
- }
- }
- }
+ val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA)
+ builder["id"] = id
- for (entry in vmIdToLastFragment) {
- if (entry.value != null) {
- if (vmIdToFragments[entry.key] == null) {
- vmIdToFragments[entry.key] = mutableListOf()
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestamp - 5 * 60 * 1000L
}
- vmIdToFragments[entry.key]!!.add(entry.value!!)
- }
- }
- println("Read ${vmIdToLastFragment.size} VMs")
-
- for (entry in vmIdToMetadata) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.key)
- metaRecord.put("submissionTime", entry.value.minTime)
- metaRecord.put("endTime", entry.value.maxTime)
- println("${entry.value.minTime} - ${entry.value.maxTime}")
- metaRecord.put("maxCores", entry.value.cores)
- metaRecord.put("requiredMemory", entry.value.requiredMemory)
- metaWriter.write(metaRecord)
- }
+ val duration = timestamp - lastTimestamp
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
+ val flops = (cpuUsage * duration / 1000.0).roundToLong()
- return allFragments
- }
-}
+ builder["time"] = timestamp
+ builder["duration"] = duration
+ builder["cores"] = cores
+ builder["cpuUsage"] = cpuUsage
+ builder["flops"] = flops
-data class Fragment(
- val id: String,
- val tick: Long,
- val flops: Long,
- val duration: Long,
- val usage: Double,
- val cores: Int
-)
+ writer.write(builder.build())
-class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)
+ lastTimestamp = timestamp
+ hasNextRow = reader.nextRow()
+ } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
-/**
- * A script to convert a trace in text format into a Parquet trace.
- */
-fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+ count++
+ }
+
+ return count
+ }
+
+ /**
+ * Options for sampling the workload trace.
+ */
+ private class SamplingOptions : OptionGroup() {
+ /**
+ * The fraction of VMs to sample
+ */
+ val fraction by option("--sampling-fraction", help = "fraction of the workload to sample")
+ .double()
+ .restrictTo(0.0001, 1.0)
+ .required()
+
+ /**
+ * The seed for sampling the trace.
+ */
+ val seed by option("--sampling-seed", help = "seed for sampling the workload")
+ .long()
+ .default(0)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
new file mode 100644
index 00000000..f98f4b2c
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resource state [Table] for the Azure v1 VM traces.
+ */
+internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
+ /**
+ * The partitions that belong to the table.
+ */
+ private val partitions = Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+
+ override val name: String = TABLE_RESOURCE_STATES
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_CPU_USAGE_PCT
+ )
+
+ override fun newReader(): TableReader {
+ val it = partitions.iterator()
+
+ return object : TableReader {
+ var delegate: TableReader? = nextDelegate()
+
+ override fun nextRow(): Boolean {
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextDelegate()
+ }
+
+ this.delegate = delegate
+ return delegate != null
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(column)
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(column)
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(column)
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(column)
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(column)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ private fun nextDelegate(): TableReader? {
+ return if (it.hasNext()) {
+ val (_, path) = it.next()
+ return AzureResourceStateTableReader(factory.createParser(path.toFile()))
+ } else {
+ null
+ }
+ }
+
+ override fun toString(): String = "AzureCompositeTableReader"
+ }
+ }
+
+ override fun newReader(partition: String): TableReader {
+ val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" }
+ return AzureResourceStateTableReader(factory.createParser(path.toFile()))
+ }
+
+ override fun toString(): String = "AzureResourceStateTable"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
new file mode 100644
index 00000000..f80c0e82
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Azure v1 VM resource state table.
+ */
+internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue)
+ "vm id" -> id = parser.text
+ "avg cpu" -> cpuUsagePct = parser.doubleValue
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_STATE_ID -> true
+ RESOURCE_STATE_TIMESTAMP -> true
+ RESOURCE_STATE_CPU_USAGE_PCT -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_STATE_ID -> id
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var timestamp: Instant? = null
+ private var cpuUsagePct = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cpuUsagePct = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm id", CsvSchema.ColumnType.STRING)
+ .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
new file mode 100644
index 00000000..c9d4f7eb
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * The resource [Table] for the Azure v1 VM traces.
+ */
+internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table {
+ override val name: String = TABLE_RESOURCES
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_NCPUS,
+ RESOURCE_MEM_CAPACITY
+ )
+
+ override fun newReader(): TableReader {
+ return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile()))
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("No partition $partition")
+ }
+
+ override fun toString(): String = "AzureResourceTable"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
new file mode 100644
index 00000000..b712b854
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt
@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import org.apache.parquet.example.Paper.schema
+import org.opendc.trace.*
+import java.time.Instant
+
+/**
+ * A [TableReader] for the Azure v1 VM resources table.
+ */
+internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
+ init {
+ parser.schema = schema
+ }
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ if (!nextStart()) {
+ return false
+ }
+
+ while (true) {
+ val token = parser.nextValue()
+
+ if (token == null || token == JsonToken.END_OBJECT) {
+ break
+ }
+
+ when (parser.currentName) {
+ "vm id" -> id = parser.text
+ "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue)
+ "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue)
+ "vm virtual core count" -> cpuCores = parser.intValue
+ "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB
+ }
+ }
+
+ return true
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ RESOURCE_START_TIME -> true
+ RESOURCE_STOP_TIME -> true
+ RESOURCE_NCPUS -> true
+ RESOURCE_MEM_CAPACITY -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_ID -> id
+ RESOURCE_START_TIME -> startTime
+ RESOURCE_STOP_TIME -> stopTime
+ RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ RESOURCE_NCPUS -> cpuCores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ return when (column) {
+ RESOURCE_MEM_CAPACITY -> memCapacity
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Advance the parser until the next object start.
+ */
+ private fun nextStart(): Boolean {
+ var token = parser.nextValue()
+
+ while (token != null && token != JsonToken.START_OBJECT) {
+ token = parser.nextValue()
+ }
+
+ return token != null
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+ private var startTime: Instant? = null
+ private var stopTime: Instant? = null
+ private var cpuCores = -1
+ private var memCapacity = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ fun reset() {
+ id = null
+ startTime = null
+ stopTime = null
+ cpuCores = -1
+ memCapacity = Double.NaN
+ }
+
+ companion object {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("subscription id", CsvSchema.ColumnType.STRING)
+ .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
+ .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
new file mode 100644
index 00000000..24c60bab
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the Azure v1 VM traces.
+ */
+class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
+
+ override fun containsTable(name: String): Boolean = name in tables
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_RESOURCES -> AzureResourceTable(factory, path)
+ TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "AzureTrace[$path]"
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
new file mode 100644
index 00000000..744e43a0
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.azure
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import com.fasterxml.jackson.dataformat.csv.CsvParser
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A format implementation for the Azure v1 format.
+ */
+class AzureTraceFormat : TraceFormat {
+ /**
+ * The name of this trace format.
+ */
+ override val name: String = "azure-v1"
+
+ /**
+ * The [CsvFactory] used to create the parser.
+ */
+ private val factory = CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
+
+ /**
+ * Open the trace file.
+ */
+ override fun open(url: URL): AzureTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return AzureTrace(factory, path)
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
index 35bfd5ef..f051bf88 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt
@@ -34,16 +34,13 @@ internal class BPResourceStateTable(private val path: Path) : Table {
override val name: String = TABLE_RESOURCE_STATES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_DURATION -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_USAGE -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_DURATION,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_USAGE,
+ )
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
index 74d1e574..5b0f013f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt
@@ -34,16 +34,13 @@ internal class BPResourceTable(private val path: Path) : Table {
override val name: String = TABLE_RESOURCES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_END_TIME -> true
- RESOURCE_NCPUS -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_ID,
+ RESOURCE_START_TIME,
+ RESOURCE_STOP_TIME,
+ RESOURCE_NCPUS,
+ RESOURCE_MEM_CAPACITY
+ )
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
index 0a105783..4416aae8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt
@@ -45,7 +45,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
return when (column) {
RESOURCE_ID -> true
RESOURCE_START_TIME -> true
- RESOURCE_END_TIME -> true
+ RESOURCE_STOP_TIME -> true
RESOURCE_NCPUS -> true
RESOURCE_MEM_CAPACITY -> true
else -> false
@@ -59,9 +59,9 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
val res: Any = when (column) {
RESOURCE_ID -> record["id"].toString()
RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long)
- RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
- RESOURCE_NCPUS -> record["maxCores"]
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long)
+ RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS)
+ RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -90,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader<Gene
val record = checkNotNull(record) { "Reader in invalid state" }
return when (column) {
- RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble()
+ RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB
else -> throw IllegalArgumentException("Invalid column")
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
new file mode 100644
index 00000000..7dd8161d
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.trace.bp
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+
+/**
+ * Schema for the resources table in the trace.
+ */
+val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.trace.capelin")
+ .fields()
+ .requiredString("id")
+ .requiredLong("submissionTime")
+ .requiredLong("endTime")
+ .requiredInt("maxCores")
+ .requiredLong("requiredMemory")
+ .endRecord()
+
+/**
+ * Schema for the resource states table in the trace.
+ */
+val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
+ .record("meta")
+ .namespace("org.opendc.trace.capelin")
+ .fields()
+ .requiredString("id")
+ .requiredLong("time")
+ .requiredLong("duration")
+ .requiredInt("cores")
+ .requiredDouble("cpuUsage")
+ .requiredLong("flops")
+ .endRecord()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
index 24abb109..67140fe9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt
@@ -31,7 +31,7 @@ import kotlin.io.path.extension
import kotlin.io.path.nameWithoutExtension
/**
- * The resource state [Table] in the Bitbrains format.
+ * The resource state [Table] in the extended Bitbrains format.
*/
internal class SvResourceStateTable(path: Path) : Table {
/**
@@ -40,28 +40,26 @@ internal class SvResourceStateTable(path: Path) : Table {
private val partitions = Files.walk(path, 1)
.filter { !Files.isDirectory(it) && it.extension == "txt" }
.collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
override val name: String = TABLE_RESOURCE_STATES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_CLUSTER_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_CPU_DEMAND -> true
- RESOURCE_STATE_CPU_READY_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_CLUSTER_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT,
+ RESOURCE_STATE_CPU_DEMAND,
+ RESOURCE_STATE_CPU_READY_PCT,
+ RESOURCE_STATE_MEM_CAPACITY,
+ RESOURCE_STATE_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE,
+ )
override fun newReader(): TableReader {
val it = partitions.iterator()
@@ -126,7 +124,7 @@ internal class SvResourceStateTable(path: Path) : Table {
}
}
- override fun toString(): String = "BitbrainsCompositeTableReader"
+ override fun toString(): String = "SvCompositeTableReader"
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
index 1a556f8d..6ea403fe 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt
@@ -30,13 +30,8 @@ import java.time.Instant
* A [TableReader] for the Bitbrains resource state table.
*/
internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader {
- /**
- * The current parser state.
- */
- private val state = RowState()
-
override fun nextRow(): Boolean {
- state.reset()
+ reset()
var line: String
var num = 0
@@ -70,23 +65,23 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
end = line.indexOf(' ', start)
if (end < 0) {
- break
+ end = length
}
val field = line.subSequence(start, end) as String
when (col++) {
- COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10))
- COL_CPU_USAGE -> state.cpuUsage = field.toDouble()
- COL_CPU_DEMAND -> state.cpuDemand = field.toDouble()
- COL_DISK_READ -> state.diskRead = field.toDouble()
- COL_DISK_WRITE -> state.diskWrite = field.toDouble()
- COL_CLUSTER_ID -> state.cluster = field.trim()
- COL_NCPUS -> state.cpuCores = field.toInt(10)
- COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble()
- COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1
- COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble()
- COL_ID -> state.id = field.trim()
- COL_MEM_CAPACITY -> state.memCapacity = field.toDouble()
+ COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10))
+ COL_CPU_USAGE -> cpuUsage = field.toDouble()
+ COL_CPU_DEMAND -> cpuDemand = field.toDouble()
+ COL_DISK_READ -> diskRead = field.toDouble()
+ COL_DISK_WRITE -> diskWrite = field.toDouble()
+ COL_CLUSTER_ID -> cluster = field.trim()
+ COL_NCPUS -> cpuCores = field.toInt(10)
+ COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble()
+ COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
+ COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
+ COL_ID -> id = field.trim()
+ COL_MEM_CAPACITY -> memCapacity = field.toDouble()
}
}
@@ -113,16 +108,16 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
override fun <T> get(column: TableColumn<T>): T {
val res: Any? = when (column) {
- RESOURCE_STATE_ID -> state.id
- RESOURCE_STATE_CLUSTER_ID -> state.cluster
- RESOURCE_STATE_TIMESTAMP -> state.timestamp
- RESOURCE_STATE_NCPUS -> state.cpuCores
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_ID -> id
+ RESOURCE_STATE_CLUSTER_ID -> cluster
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS)
+ RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY)
+ RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
+ RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
+ RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY)
+ RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ)
+ RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE)
else -> throw IllegalArgumentException("Invalid column")
}
@@ -132,14 +127,14 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
override fun getBoolean(column: TableColumn<Boolean>): Boolean {
return when (column) {
- RESOURCE_STATE_POWERED_ON -> state.poweredOn
+ RESOURCE_STATE_POWERED_ON -> poweredOn
else -> throw IllegalArgumentException("Invalid column")
}
}
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -150,12 +145,13 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
override fun getDouble(column: TableColumn<Double>): Double {
return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
+ RESOURCE_STATE_CPU_DEMAND -> cpuDemand
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -165,51 +161,37 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) :
}
/**
- * The current row state.
+ * State fields of the reader.
*/
- private class RowState {
- @JvmField
- var id: String? = null
- @JvmField
- var cluster: String? = null
- @JvmField
- var timestamp: Instant? = null
- @JvmField
- var cpuCores = -1
- @JvmField
- var cpuCapacity = Double.NaN
- @JvmField
- var cpuUsage = Double.NaN
- @JvmField
- var cpuDemand = Double.NaN
- @JvmField
- var cpuReadyPct = Double.NaN
- @JvmField
- var memCapacity = Double.NaN
- @JvmField
- var diskRead = Double.NaN
- @JvmField
- var diskWrite = Double.NaN
- @JvmField
- var poweredOn: Boolean = false
-
- /**
- * Reset the state.
- */
- fun reset() {
- id = null
- timestamp = null
- cluster = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuDemand = Double.NaN
- cpuReadyPct = Double.NaN
- memCapacity = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- poweredOn = false
- }
+ private var id: String? = null
+ private var cluster: String? = null
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuDemand = Double.NaN
+ private var cpuReadyPct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var poweredOn: Boolean = false
+
+ /**
+ * Reset the state of the reader.
+ */
+ private fun reset() {
+ id = null
+ timestamp = null
+ cluster = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuDemand = Double.NaN
+ cpuReadyPct = Double.NaN
+ memCapacity = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ poweredOn = false
}
/**