summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-tools
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-28 15:18:38 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-11-02 13:56:50 +0100
commit79a2f31c800c23539c413771a3f893c423275372 (patch)
tree014c96695dd0889db041c6f73f5332d22004e3fc /opendc-trace/opendc-trace-tools
parenta8e2d460a3b6803845687585ae0b34e67a9445a3 (diff)
refactor(trace): Support gaps in trace data
This change updates the implementation of the trace converter and SimTrace implementation to support cases where there is a gap between samples in the trace data. This change allows users to specify what to do in case samples are missing in the trace. The available options are specified in `SimTrace.FillMode`. Currently, we support either carrying the previous value forward or set the usage to zero.
Diffstat (limited to 'opendc-trace/opendc-trace-tools')
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt114
1 files changed, 73 insertions, 41 deletions
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index a9bc9480..69fc79bb 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -194,6 +194,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
*/
private val logger = KotlinLogging.logger {}
+ /**
+ * The interval at which the samples where taken.
+ */
+ private val SAMPLE_INTERVAL = Duration.ofMinutes(5)
+
+ /**
+ * The difference in CPU usage for the algorithm to cascade samples.
+ */
+ private val SAMPLE_CASCADE_DIFF = 0.1
+
override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
@@ -242,7 +252,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
logger.info { "Selecting VM $id" }
- val startInstant = Instant.ofEpochMilli(startTime)
+ val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval
val stopInstant = Instant.ofEpochMilli(stopTime)
selectedVms.computeIfAbsent(id) {
@@ -264,6 +274,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+ val sampleInterval = SAMPLE_INTERVAL.toMillis()
val idCol = reader.resolve(RESOURCE_ID)
val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
@@ -272,8 +283,6 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var hasNextRow = reader.nextRow()
var count = 0
- var lastId: String? = null
- var lastTimestamp = 0L
while (hasNextRow) {
val id = reader.get(idCol) as String
@@ -287,41 +296,43 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val cpuUsage = reader.getDouble(cpuUsageCol)
val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
- var timestamp = startTimestamp
- var duration: Long
+ var timestamp: Long = startTimestamp
+ var duration: Long = sampleInterval
- // Check whether the previous entry is from a different VM
- if (id != lastId) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
- }
+ // Attempt to cascade further samples into one if they share the same CPU usage
+ while (reader.nextRow().also { hasNextRow = it }) {
+ val shouldCascade = id == reader.get(idCol) &&
+ abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF &&
+ cpuCount == reader.getInt(cpuCountCol)
- do {
- timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
+ // Check whether the next sample can be cascaded with the current sample:
+ // (1) The VM identifier of both samples matches
+ // (2) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`
+ // (3) The CPU count of both samples is identical
+ if (!shouldCascade) {
+ break
+ }
- duration = timestamp - lastTimestamp
- hasNextRow = reader.nextRow()
+ val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
- if (!hasNextRow) {
+ // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL`
+ if ((nextTimestamp - timestamp) > sampleInterval) {
break
}
- val shouldContinue = id == reader.get(idCol) &&
- abs(cpuUsage - reader.getDouble(cpuUsageCol)) < 0.01 &&
- cpuCount == reader.getInt(cpuCountCol)
- } while (shouldContinue)
+ duration += nextTimestamp - timestamp
+ timestamp = nextTimestamp
+ }
writer.startRow()
writer.set(RESOURCE_ID, id)
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp))
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp))
writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.endRow()
count++
-
- lastId = id
- lastTimestamp = timestamp
}
return count
@@ -342,6 +353,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
*/
private val CPU_CAPACITY = 2500.0
+ /**
+ * The interval at which the samples where taken.
+ */
+ private val SAMPLE_INTERVAL = Duration.ofMinutes(5)
+
+ /**
+ * The difference in CPU usage for the algorithm to cascade samples.
+ */
+ private val SAMPLE_CASCADE_DIFF = 0.1
+
override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
@@ -379,10 +400,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
writer.startRow()
writer.set(RESOURCE_ID, id)
- writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime))
- writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime))
+ writer.set(RESOURCE_START_TIME, startInstant)
+ writer.set(RESOURCE_STOP_TIME, stopInstant)
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
- writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCount * CPU_CAPACITY)
+ writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity)
writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity)
writer.endRow()
}
@@ -393,6 +414,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val states = HashMap<String, State>()
+ val sampleInterval = SAMPLE_INTERVAL.toMillis()
val idCol = reader.resolve(RESOURCE_ID)
val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
@@ -405,39 +427,49 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val resource = selected[id] ?: continue
val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz
- val state = states.computeIfAbsent(id) { State(resource, cpuUsage) }
+ val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) }
val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
- val duration = (timestamp - state.startTime)
-
- state.duration = duration
-
- if (abs(cpuUsage - state.cpuUsage) > 0.01) {
- state.write(writer)
-
- state.startTime = timestamp
- state.duration = 0
- state.cpuUsage = cpuUsage
+ val delta = (timestamp - state.time)
+
+ // Check whether the next sample can be cascaded with the current sample:
+ // (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`)
+ // (2) The interval between both samples is not higher than `SAMPLE_INTERVAL`
+ if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) {
+ state.time = timestamp
+ state.duration += delta
+ continue
}
+ state.write(writer)
+ // Reset the state fields
+ state.time = timestamp
+ state.duration = sampleInterval
+ // Count write
count++
}
for ((_, state) in states) {
- state.duration += state.startTime - state.resource.stopTime.toEpochMilli()
state.write(writer)
+ count++
}
return count
}
- private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double) {
- @JvmField var startTime: Long = resource.startTime.toEpochMilli()
- @JvmField var duration: Long = 30000L
+ private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) {
+ @JvmField var time: Long = resource.startTime.toEpochMilli()
+ private var lastWrite: Long = Long.MIN_VALUE
fun write(writer: TableWriter) {
+ // Check whether this timestamp was already written
+ if (lastWrite == time) {
+ return
+ }
+ lastWrite = time
+
writer.startRow()
writer.set(RESOURCE_ID, resource.id)
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTime))
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time))
writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount)