diff options
Diffstat (limited to 'opendc-trace/opendc-trace-tools')
| -rw-r--r-- | opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt | 403 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-tools/src/main/resources/log4j2.xml | 38 |
2 files changed, 333 insertions, 108 deletions
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 75472dff..a9bc9480 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -27,6 +27,8 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument import com.github.ajalt.clikt.parameters.groups.OptionGroup import com.github.ajalt.clikt.parameters.groups.cooccurring +import com.github.ajalt.clikt.parameters.groups.defaultByName +import com.github.ajalt.clikt.parameters.groups.groupChoice import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import mu.KotlinLogging @@ -35,6 +37,7 @@ import java.io.File import java.time.Duration import java.time.Instant import java.util.* +import kotlin.collections.HashMap import kotlin.math.abs import kotlin.math.max import kotlin.math.min @@ -83,6 +86,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { */ private val samplingOptions by SamplingOptions().cooccurring() + /** + * The converter strategy to use. + */ + private val converter by option("-c", "--converter", help = "converter strategy to use").groupChoice( + "default" to DefaultTraceConverter(), + "azure" to AzureTraceConverter(), + ).defaultByName("default") + override fun run() { val metaParquet = File(output, "meta.parquet") val traceParquet = File(output, "trace.parquet") @@ -101,7 +112,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() - val selectedVms = metaWriter.use { convertResources(inputTrace, it) } + val selectedVms = metaWriter.use { converter.convertResources(inputTrace, it, samplingOptions) } if (selectedVms.isEmpty()) { logger.warn { "No VMs selected" } @@ -113,149 +124,325 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() - val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) } + val statesCount = writer.use { converter.convertResourceStates(inputTrace, it, selectedVms) } logger.info { "Wrote $statesCount rows" } } /** - * Convert the resources table for the trace. + * Options for sampling the workload trace. */ - private fun convertResources(trace: Trace, writer: TableWriter): Set<String> { - val random = samplingOptions?.let { Random(it.seed) } - val samplingFraction = samplingOptions?.fraction ?: 1.0 - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - - var hasNextRow = reader.nextRow() - val selectedVms = mutableSetOf<String>() - - while (hasNextRow) { - var id: String - var cpuCount = 0 - var cpuCapacity = 0.0 - var memCapacity = 0.0 - var memUsage = 0.0 - var startTime = Long.MAX_VALUE - var stopTime = Long.MIN_VALUE - - do { - id = reader.get(RESOURCE_ID) - - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - - startTime = min(startTime, timestamp) - stopTime = max(stopTime, timestamp) - cpuCount = max(cpuCount, reader.getInt(RESOURCE_CPU_COUNT)) - cpuCapacity = max(cpuCapacity, reader.getDouble(RESOURCE_CPU_CAPACITY)) - memCapacity = max(memCapacity, reader.getDouble(RESOURCE_MEM_CAPACITY)) - - if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { - memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) - } + private class SamplingOptions : OptionGroup() { + /** + * The fraction of VMs to sample + */ + val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") + .double() + .restrictTo(0.0001, 1.0) + .required() - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_ID)) + /** + * The seed for sampling the trace. + */ + val seed by option("--sampling-seed", help = "seed for sampling the workload") + .long() + .default(0) + } - // Sample only a fraction of the VMs - if (random != null && random.nextDouble() > samplingFraction) { - continue - } + /** + * A trace conversion strategy. + */ + private sealed class TraceConverter(name: String) : OptionGroup(name) { + /** + * Convert the resources table for the trace. + * + * @param trace The trace to convert. + * @param writer The table writer for the target format. + * @param samplingOptions The sampling options to use. + * @return The map of resources that have been selected. + */ + abstract fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> - logger.info { "Selecting VM $id" } - selectedVms.add(id) - - writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) - writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) - writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) - writer.endRow() - } + /** + * Convert the resource states table for the trace. + * + * @param trace The trace to convert. + * @param writer The table writer for the target format. + * @param selected The set of virtual machines that have been selected. + * @return The number of rows written. + */ + abstract fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int - return selectedVms + /** + * A resource in the resource table. + */ + data class Resource( + val id: String, + val startTime: Instant, + val stopTime: Instant, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double + ) } /** - * Convert the resource states table for the trace. + * Default implementation of [TraceConverter]. */ - private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + private class DefaultTraceConverter : TraceConverter("default") { + /** + * The logger instance for the converter. + */ + private val logger = KotlinLogging.logger {} + + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> { + val random = samplingOptions?.let { Random(it.seed) } + val samplingFraction = samplingOptions?.fraction ?: 1.0 + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + var hasNextRow = reader.nextRow() + val selectedVms = mutableMapOf<String, Resource>() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) + val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) + val memUsageCol = reader.resolve(RESOURCE_STATE_MEM_USAGE) + + while (hasNextRow) { + var id: String + var cpuCount = 0 + var cpuCapacity = 0.0 + var memCapacity = 0.0 + var memUsage = 0.0 + var startTime = Long.MAX_VALUE + var stopTime = Long.MIN_VALUE + + do { + id = reader.get(idCol) as String + + val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + startTime = min(startTime, timestamp) + stopTime = max(stopTime, timestamp) + + cpuCount = max(cpuCount, reader.getInt(cpuCountCol)) + cpuCapacity = max(cpuCapacity, reader.getDouble(cpuCapacityCol)) + memCapacity = max(memCapacity, reader.getDouble(memCapacityCol)) + if (memUsageCol > 0) { + memUsage = max(memUsage, reader.getDouble(memUsageCol)) + } + + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_ID)) + + // Sample only a fraction of the VMs + if (random != null && random.nextDouble() > samplingFraction) { + continue + } - var hasNextRow = reader.nextRow() - var count = 0 - var lastId: String? = null - var lastTimestamp = 0L + logger.info { "Selecting VM $id" } - while (hasNextRow) { - val id = reader.get(RESOURCE_ID) + val startInstant = Instant.ofEpochMilli(startTime) + val stopInstant = Instant.ofEpochMilli(stopTime) - if (id !in selectedVms) { - hasNextRow = reader.nextRow() - continue - } + selectedVms.computeIfAbsent(id) { + Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, max(memCapacity, memUsage)) + } - val cpuCount = reader.getInt(RESOURCE_CPU_COUNT) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, startInstant) + writer.set(RESOURCE_STOP_TIME, stopInstant) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) + writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.endRow() + } - val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - var timestamp = startTimestamp - var duration: Long + return selectedVms + } - // Check whether the previous entry is from a different VM - if (id != lastId) { - lastTimestamp = timestamp - 5 * 60 * 1000L - } + override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + + var hasNextRow = reader.nextRow() + var count = 0 + var lastId: String? = null + var lastTimestamp = 0L + + while (hasNextRow) { + val id = reader.get(idCol) as String + val resource = selected[id] + if (resource == null) { + hasNextRow = reader.nextRow() + continue + } - do { - timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + val cpuCount = reader.getInt(cpuCountCol) + val cpuUsage = reader.getDouble(cpuUsageCol) - duration = timestamp - lastTimestamp - hasNextRow = reader.nextRow() + val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + var timestamp = startTimestamp + var duration: Long - if (!hasNextRow) { - break + // Check whether the previous entry is from a different VM + if (id != lastId) { + lastTimestamp = timestamp - 5 * 60 * 1000L } - val shouldContinue = id == reader.get(RESOURCE_ID) && - abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 && - cpuCount == reader.getInt(RESOURCE_CPU_COUNT) - } while (shouldContinue) + do { + timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) - writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) - writer.endRow() + duration = timestamp - lastTimestamp + hasNextRow = reader.nextRow() - count++ + if (!hasNextRow) { + break + } - lastId = id - lastTimestamp = timestamp - } + val shouldContinue = id == reader.get(idCol) && + abs(cpuUsage - reader.getDouble(cpuUsageCol)) < 0.01 && + cpuCount == reader.getInt(cpuCountCol) + } while (shouldContinue) + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.endRow() + + count++ + + lastId = id + lastTimestamp = timestamp + } - return count + return count + } } /** - * Options for sampling the workload trace. + * Implementation of [TraceConverter] for the Azure trace format. */ - private class SamplingOptions : OptionGroup() { + private class AzureTraceConverter : TraceConverter("default") { /** - * The fraction of VMs to sample + * The logger instance for the converter. */ - val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") - .double() - .restrictTo(0.0001, 1.0) - .required() + private val logger = KotlinLogging.logger {} /** - * The seed for sampling the trace. + * CPU capacity of the machines used by Azure. */ - val seed by option("--sampling-seed", help = "seed for sampling the workload") - .long() - .default(0) + private val CPU_CAPACITY = 2500.0 + + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> { + val random = samplingOptions?.let { Random(it.seed) } + val samplingFraction = samplingOptions?.fraction ?: 1.0 + val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + + val idCol = reader.resolve(RESOURCE_ID) + val startTimeCol = reader.resolve(RESOURCE_START_TIME) + val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) + + val selectedVms = mutableMapOf<String, Resource>() + + while (reader.nextRow()) { + // Sample only a fraction of the VMs + if (random != null && random.nextDouble() > samplingFraction) { + continue + } + + val id = reader.get(idCol) as String + val startTime = (reader.get(startTimeCol) as Instant).toEpochMilli() + val stopTime = (reader.get(stopTimeCol) as Instant).toEpochMilli() + val cpuCount = reader.getInt(cpuCountCol) + val memCapacity = reader.getDouble(memCapacityCol) + + logger.info { "Selecting VM $id" } + + val startInstant = Instant.ofEpochMilli(startTime) + val stopInstant = Instant.ofEpochMilli(stopTime) + val cpuCapacity = cpuCount * CPU_CAPACITY + + selectedVms.computeIfAbsent(id) { + Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, memCapacity) + } + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCount * CPU_CAPACITY) + writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) + writer.endRow() + } + + return selectedVms + } + + override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + val states = HashMap<String, State>() + + val idCol = reader.resolve(RESOURCE_ID) + val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) + val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE_PCT) + + var count = 0 + + while (reader.nextRow()) { + val id = reader.get(idCol) as String + val resource = selected[id] ?: continue + + val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz + val state = states.computeIfAbsent(id) { State(resource, cpuUsage) } + val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val duration = (timestamp - state.startTime) + + state.duration = duration + + if (abs(cpuUsage - state.cpuUsage) > 0.01) { + state.write(writer) + + state.startTime = timestamp + state.duration = 0 + state.cpuUsage = cpuUsage + } + + count++ + } + + for ((_, state) in states) { + state.duration += state.startTime - state.resource.stopTime.toEpochMilli() + state.write(writer) + } + + return count + } + + private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double) { + @JvmField var startTime: Long = resource.startTime.toEpochMilli() + @JvmField var duration: Long = 30000L + + fun write(writer: TableWriter) { + writer.startRow() + writer.set(RESOURCE_ID, resource.id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) + writer.endRow() + } + } } } diff --git a/opendc-trace/opendc-trace-tools/src/main/resources/log4j2.xml b/opendc-trace/opendc-trace-tools/src/main/resources/log4j2.xml new file mode 100644 index 00000000..32d81416 --- /dev/null +++ b/opendc-trace/opendc-trace-tools/src/main/resources/log4j2.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright (c) 2021 AtLarge Research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> + </Console> + </Appenders> + <Loggers> + <Logger name="org.opendc" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Root level="error"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> |
