diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-28 15:18:38 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-11-02 13:56:50 +0100 |
| commit | 79a2f31c800c23539c413771a3f893c423275372 (patch) | |
| tree | 014c96695dd0889db041c6f73f5332d22004e3fc /opendc-trace/opendc-trace-tools | |
| parent | a8e2d460a3b6803845687585ae0b34e67a9445a3 (diff) | |
refactor(trace): Support gaps in trace data
This change updates the implementation of the trace converter and
SimTrace implementation to support cases where there is a gap between
samples in the trace data.
This change allows users to specify what to do in case samples are
missing in the trace. The available options are specified in
`SimTrace.FillMode`. Currently, we support either carrying the previous
value forward or set the usage to zero.
Diffstat (limited to 'opendc-trace/opendc-trace-tools')
| -rw-r--r-- | opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt | 114 |
1 files changed, 73 insertions, 41 deletions
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index a9bc9480..69fc79bb 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -194,6 +194,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { */ private val logger = KotlinLogging.logger {} + /** + * The interval at which the samples where taken. + */ + private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + + /** + * The difference in CPU usage for the algorithm to cascade samples. + */ + private val SAMPLE_CASCADE_DIFF = 0.1 + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 @@ -242,7 +252,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Selecting VM $id" } - val startInstant = Instant.ofEpochMilli(startTime) + val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval val stopInstant = Instant.ofEpochMilli(stopTime) selectedVms.computeIfAbsent(id) { @@ -264,6 +274,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + val sampleInterval = SAMPLE_INTERVAL.toMillis() val idCol = reader.resolve(RESOURCE_ID) val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) @@ -272,8 +283,6 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var hasNextRow = reader.nextRow() var count = 0 - var lastId: String? = null - var lastTimestamp = 0L while (hasNextRow) { val id = reader.get(idCol) as String @@ -287,41 +296,43 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val cpuUsage = reader.getDouble(cpuUsageCol) val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - var timestamp = startTimestamp - var duration: Long + var timestamp: Long = startTimestamp + var duration: Long = sampleInterval - // Check whether the previous entry is from a different VM - if (id != lastId) { - lastTimestamp = timestamp - 5 * 60 * 1000L - } + // Attempt to cascade further samples into one if they share the same CPU usage + while (reader.nextRow().also { hasNextRow = it }) { + val shouldCascade = id == reader.get(idCol) && + abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF && + cpuCount == reader.getInt(cpuCountCol) - do { - timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + // Check whether the next sample can be cascaded with the current sample: + // (1) The VM identifier of both samples matches + // (2) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF` + // (3) The CPU count of both samples is identical + if (!shouldCascade) { + break + } - duration = timestamp - lastTimestamp - hasNextRow = reader.nextRow() + val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - if (!hasNextRow) { + // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL` + if ((nextTimestamp - timestamp) > sampleInterval) { break } - val shouldContinue = id == reader.get(idCol) && - abs(cpuUsage - reader.getDouble(cpuUsageCol)) < 0.01 && - cpuCount == reader.getInt(cpuCountCol) - } while (shouldContinue) + duration += nextTimestamp - timestamp + timestamp = nextTimestamp + } writer.startRow() writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.endRow() count++ - - lastId = id - lastTimestamp = timestamp } return count @@ -342,6 +353,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { */ private val CPU_CAPACITY = 2500.0 + /** + * The interval at which the samples where taken. + */ + private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + + /** + * The difference in CPU usage for the algorithm to cascade samples. + */ + private val SAMPLE_CASCADE_DIFF = 0.1 + override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 @@ -379,10 +400,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { writer.startRow() writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) - writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.set(RESOURCE_START_TIME, startInstant) + writer.set(RESOURCE_STOP_TIME, stopInstant) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCount * CPU_CAPACITY) + writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) writer.endRow() } @@ -393,6 +414,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val states = HashMap<String, State>() + val sampleInterval = SAMPLE_INTERVAL.toMillis() val idCol = reader.resolve(RESOURCE_ID) val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) @@ -405,39 +427,49 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val resource = selected[id] ?: continue val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz - val state = states.computeIfAbsent(id) { State(resource, cpuUsage) } + val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() - val duration = (timestamp - state.startTime) - - state.duration = duration - - if (abs(cpuUsage - state.cpuUsage) > 0.01) { - state.write(writer) - - state.startTime = timestamp - state.duration = 0 - state.cpuUsage = cpuUsage + val delta = (timestamp - state.time) + + // Check whether the next sample can be cascaded with the current sample: + // (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`) + // (2) The interval between both samples is not higher than `SAMPLE_INTERVAL` + if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) { + state.time = timestamp + state.duration += delta + continue } + state.write(writer) + // Reset the state fields + state.time = timestamp + state.duration = sampleInterval + // Count write count++ } for ((_, state) in states) { - state.duration += state.startTime - state.resource.stopTime.toEpochMilli() state.write(writer) + count++ } return count } - private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double) { - @JvmField var startTime: Long = resource.startTime.toEpochMilli() - @JvmField var duration: Long = 30000L + private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) { + @JvmField var time: Long = resource.startTime.toEpochMilli() + private var lastWrite: Long = Long.MIN_VALUE fun write(writer: TableWriter) { + // Check whether this timestamp was already written + if (lastWrite == time) { + return + } + lastWrite = time + writer.startRow() writer.set(RESOURCE_ID, resource.id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) |
