From 79a2f31c800c23539c413771a3f893c423275372 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 28 Oct 2021 15:18:38 +0200 Subject: refactor(trace): Support gaps in trace data This change updates the implementation of the trace converter and SimTrace implementation to support cases where there is a gap between samples in the trace data. This change allows users to specify what to do in case samples are missing in the trace. The available options are specified in `SimTrace.FillMode`. Currently, we support either carrying the previous value forward or set the usage to zero. --- .../compute/workload/ComputeWorkloadLoader.kt | 4 +- .../experiments/capelin/CapelinIntegrationTest.kt | 28 ++--- .../resources/trace/bitbrains-small/meta.parquet | Bin 2099 -> 2723 bytes .../resources/trace/bitbrains-small/trace.parquet | Bin 1125930 -> 2163354 bytes .../opendc/simulator/compute/workload/SimTrace.kt | 31 +++++- .../org/opendc/trace/ResourceStateColumns.kt | 2 +- .../org/opendc/trace/tools/TraceConverter.kt | 114 +++++++++++++-------- traces/bitbrains-small/meta.parquet | Bin 2099 -> 2723 bytes traces/bitbrains-small/trace.parquet | Bin 1125930 -> 2163354 bytes 9 files changed, 117 insertions(+), 62 deletions(-) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index f23becda..36a76f68 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -71,8 +71,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val timeMs = time.toEpochMilli() - val deadlineMs = timeMs + duration.toMillis() + val deadlineMs = time.toEpochMilli() + val timeMs = (time - duration).toEpochMilli() val builder = fragments.computeIfAbsent(id) { Builder() } builder.add(timeMs, deadlineMs, cpuUsage, cores) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index f3a6ed1a..1a8948f3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -115,11 +115,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840862926294953E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -159,11 +159,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.010642279990053E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -209,10 +209,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(468522, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -252,11 +252,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } + { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet index da6e5330..9cded35f 100644 Binary files a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet and b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet differ diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet index fe0a254c..9d953956 100644 Binary files a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet and b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet differ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt index 4f567b55..4cf60605 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -104,9 +104,28 @@ public class SimTrace( /** * Construct a new [FlowSource] for the specified [cpu]. + * + * @param cpu The [ProcessingUnit] for which to create the source. + * @param offset The time offset to use for the trace. + * @param fillMode The [FillMode] for filling missing data. */ - public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource { - return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size) + public fun newSource(cpu: ProcessingUnit, offset: Long, fillMode: FillMode = FillMode.None): FlowSource { + return CpuConsumer(cpu, offset, fillMode, usageCol, timestampCol, deadlineCol, coresCol, size) + } + + /** + * An enumeration describing the modes for filling missing data. + */ + public enum class FillMode { + /** + * When a gap in the trace data occurs, the CPU usage will be set to zero. + */ + None, + + /** + * When a gap in the trace data occurs, the previous CPU usage will be used. + */ + Previous } /** @@ -183,6 +202,7 @@ public class SimTrace( private class CpuConsumer( cpu: ProcessingUnit, private val offset: Long, + private val fillMode: FillMode, private val usageCol: DoubleArray, private val timestampCol: LongArray, private val deadlineCol: LongArray, @@ -217,9 +237,12 @@ public class SimTrace( _idx = idx val timestamp = timestampCol[idx] - // Fragment is in the future + // There is a gap in the trace, since the next fragment starts in the future. if (timestamp > nowOffset) { - conn.push(0.0) + when (fillMode) { + FillMode.None -> conn.push(0.0) // Reset rate to zero + FillMode.Previous -> {} // Keep previous rate + } return timestamp - nowOffset } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index 44762da5..244352ae 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -27,7 +27,7 @@ import java.time.Duration import java.time.Instant /** - * Timestamp for the state. + * The timestamp at which the state was recorded. */ @JvmField public val RESOURCE_STATE_TIMESTAMP: TableColumn = column("resource_state:timestamp") diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index a9bc9480..69fc79bb 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -194,6 +194,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { */ private val logger = KotlinLogging.logger {} + /** + * The interval at which the samples where taken. + */ + private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + + /** + * The difference in CPU usage for the algorithm to cascade samples. + */ + private val SAMPLE_CASCADE_DIFF = 0.1 + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 @@ -242,7 +252,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Selecting VM $id" } - val startInstant = Instant.ofEpochMilli(startTime) + val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval val stopInstant = Instant.ofEpochMilli(stopTime) selectedVms.computeIfAbsent(id) { @@ -264,6 +274,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + val sampleInterval = SAMPLE_INTERVAL.toMillis() val idCol = reader.resolve(RESOURCE_ID) val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) @@ -272,8 +283,6 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var hasNextRow = reader.nextRow() var count = 0 - var lastId: String? = null - var lastTimestamp = 0L while (hasNextRow) { val id = reader.get(idCol) as String @@ -287,41 +296,43 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val cpuUsage = reader.getDouble(cpuUsageCol) val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - var timestamp = startTimestamp - var duration: Long + var timestamp: Long = startTimestamp + var duration: Long = sampleInterval - // Check whether the previous entry is from a different VM - if (id != lastId) { - lastTimestamp = timestamp - 5 * 60 * 1000L - } + // Attempt to cascade further samples into one if they share the same CPU usage + while (reader.nextRow().also { hasNextRow = it }) { + val shouldCascade = id == reader.get(idCol) && + abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF && + cpuCount == reader.getInt(cpuCountCol) - do { - timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + // Check whether the next sample can be cascaded with the current sample: + // (1) The VM identifier of both samples matches + // (2) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF` + // (3) The CPU count of both samples is identical + if (!shouldCascade) { + break + } - duration = timestamp - lastTimestamp - hasNextRow = reader.nextRow() + val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - if (!hasNextRow) { + // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL` + if ((nextTimestamp - timestamp) > sampleInterval) { break } - val shouldContinue = id == reader.get(idCol) && - abs(cpuUsage - reader.getDouble(cpuUsageCol)) < 0.01 && - cpuCount == reader.getInt(cpuCountCol) - } while (shouldContinue) + duration += nextTimestamp - timestamp + timestamp = nextTimestamp + } writer.startRow() writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.endRow() count++ - - lastId = id - lastTimestamp = timestamp } return count @@ -342,6 +353,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { */ private val CPU_CAPACITY = 2500.0 + /** + * The interval at which the samples where taken. + */ + private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + + /** + * The difference in CPU usage for the algorithm to cascade samples. + */ + private val SAMPLE_CASCADE_DIFF = 0.1 + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 @@ -379,10 +400,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { writer.startRow() writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) - writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.set(RESOURCE_START_TIME, startInstant) + writer.set(RESOURCE_STOP_TIME, stopInstant) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCount * CPU_CAPACITY) + writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) writer.endRow() } @@ -393,6 +414,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val states = HashMap() + val sampleInterval = SAMPLE_INTERVAL.toMillis() val idCol = reader.resolve(RESOURCE_ID) val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) @@ -405,39 +427,49 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val resource = selected[id] ?: continue val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz - val state = states.computeIfAbsent(id) { State(resource, cpuUsage) } + val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - val duration = (timestamp - state.startTime) - - state.duration = duration - - if (abs(cpuUsage - state.cpuUsage) > 0.01) { - state.write(writer) - - state.startTime = timestamp - state.duration = 0 - state.cpuUsage = cpuUsage + val delta = (timestamp - state.time) + + // Check whether the next sample can be cascaded with the current sample: + // (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`) + // (2) The interval between both samples is not higher than `SAMPLE_INTERVAL` + if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) { + state.time = timestamp + state.duration += delta + continue } + state.write(writer) + // Reset the state fields + state.time = timestamp + state.duration = sampleInterval + // Count write count++ } for ((_, state) in states) { - state.duration += state.startTime - state.resource.stopTime.toEpochMilli() state.write(writer) + count++ } return count } - private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double) { - @JvmField var startTime: Long = resource.startTime.toEpochMilli() - @JvmField var duration: Long = 30000L + private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) { + @JvmField var time: Long = resource.startTime.toEpochMilli() + private var lastWrite: Long = Long.MIN_VALUE fun write(writer: TableWriter) { + // Check whether this timestamp was already written + if (lastWrite == time) { + return + } + lastWrite = time + writer.startRow() writer.set(RESOURCE_ID, resource.id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) diff --git a/traces/bitbrains-small/meta.parquet b/traces/bitbrains-small/meta.parquet index da6e5330..9cded35f 100644 Binary files a/traces/bitbrains-small/meta.parquet and b/traces/bitbrains-small/meta.parquet differ diff --git a/traces/bitbrains-small/trace.parquet b/traces/bitbrains-small/trace.parquet index fe0a254c..9d953956 100644 Binary files a/traces/bitbrains-small/trace.parquet and b/traces/bitbrains-small/trace.parquet differ -- cgit v1.2.3