summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt28
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquetbin2099 -> 2723 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquetbin1125930 -> 2163354 bytes
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt31
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt2
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt114
-rw-r--r--traces/bitbrains-small/meta.parquetbin2099 -> 2723 bytes
-rw-r--r--traces/bitbrains-small/trace.parquetbin1125930 -> 2163354 bytes
9 files changed, 117 insertions, 62 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index f23becda..36a76f68 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -71,8 +71,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
- val timeMs = time.toEpochMilli()
- val deadlineMs = timeMs + duration.toMillis()
+ val deadlineMs = time.toEpochMilli()
+ val timeMs = (time - duration).toEpochMilli()
val builder = fragments.computeIfAbsent(id) { Builder() }
builder.add(timeMs, deadlineMs, cpuUsage, cores)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index f3a6ed1a..1a8948f3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -115,11 +115,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.840862926294953E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -159,11 +159,11 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(7.010642279990053E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
+ { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -209,10 +209,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(468522, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -252,11 +252,11 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
+ { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
index da6e5330..9cded35f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
index fe0a254c..9d953956 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet
Binary files differ
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
index 4f567b55..4cf60605 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
@@ -104,9 +104,28 @@ public class SimTrace(
/**
* Construct a new [FlowSource] for the specified [cpu].
+ *
+ * @param cpu The [ProcessingUnit] for which to create the source.
+ * @param offset The time offset to use for the trace.
+ * @param fillMode The [FillMode] for filling missing data.
*/
- public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource {
- return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size)
+ public fun newSource(cpu: ProcessingUnit, offset: Long, fillMode: FillMode = FillMode.None): FlowSource {
+ return CpuConsumer(cpu, offset, fillMode, usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * An enumeration describing the modes for filling missing data.
+ */
+ public enum class FillMode {
+ /**
+ * When a gap in the trace data occurs, the CPU usage will be set to zero.
+ */
+ None,
+
+ /**
+ * When a gap in the trace data occurs, the previous CPU usage will be used.
+ */
+ Previous
}
/**
@@ -183,6 +202,7 @@ public class SimTrace(
private class CpuConsumer(
cpu: ProcessingUnit,
private val offset: Long,
+ private val fillMode: FillMode,
private val usageCol: DoubleArray,
private val timestampCol: LongArray,
private val deadlineCol: LongArray,
@@ -217,9 +237,12 @@ public class SimTrace(
_idx = idx
val timestamp = timestampCol[idx]
- // Fragment is in the future
+ // There is a gap in the trace, since the next fragment starts in the future.
if (timestamp > nowOffset) {
- conn.push(0.0)
+ when (fillMode) {
+ FillMode.None -> conn.push(0.0) // Reset rate to zero
+ FillMode.Previous -> {} // Keep previous rate
+ }
return timestamp - nowOffset
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
index 44762da5..244352ae 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
@@ -27,7 +27,7 @@ import java.time.Duration
import java.time.Instant
/**
- * Timestamp for the state.
+ * The timestamp at which the state was recorded.
*/
@JvmField
public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp")
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index a9bc9480..69fc79bb 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -194,6 +194,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
*/
private val logger = KotlinLogging.logger {}
+ /**
+ * The interval at which the samples where taken.
+ */
+ private val SAMPLE_INTERVAL = Duration.ofMinutes(5)
+
+ /**
+ * The difference in CPU usage for the algorithm to cascade samples.
+ */
+ private val SAMPLE_CASCADE_DIFF = 0.1
+
override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
@@ -242,7 +252,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
logger.info { "Selecting VM $id" }
- val startInstant = Instant.ofEpochMilli(startTime)
+ val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval
val stopInstant = Instant.ofEpochMilli(stopTime)
selectedVms.computeIfAbsent(id) {
@@ -264,6 +274,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+ val sampleInterval = SAMPLE_INTERVAL.toMillis()
val idCol = reader.resolve(RESOURCE_ID)
val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
@@ -272,8 +283,6 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var hasNextRow = reader.nextRow()
var count = 0
- var lastId: String? = null
- var lastTimestamp = 0L
while (hasNextRow) {
val id = reader.get(idCol) as String
@@ -287,41 +296,43 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val cpuUsage = reader.getDouble(cpuUsageCol)
val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
- var timestamp = startTimestamp
- var duration: Long
+ var timestamp: Long = startTimestamp
+ var duration: Long = sampleInterval
- // Check whether the previous entry is from a different VM
- if (id != lastId) {
- lastTimestamp = timestamp - 5 * 60 * 1000L
- }
+ // Attempt to cascade further samples into one if they share the same CPU usage
+ while (reader.nextRow().also { hasNextRow = it }) {
+ val shouldCascade = id == reader.get(idCol) &&
+ abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF &&
+ cpuCount == reader.getInt(cpuCountCol)
- do {
- timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
+ // Check whether the next sample can be cascaded with the current sample:
+ // (1) The VM identifier of both samples matches
+ // (2) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`
+ // (3) The CPU count of both samples is identical
+ if (!shouldCascade) {
+ break
+ }
- duration = timestamp - lastTimestamp
- hasNextRow = reader.nextRow()
+ val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
- if (!hasNextRow) {
+ // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL`
+ if ((nextTimestamp - timestamp) > sampleInterval) {
break
}
- val shouldContinue = id == reader.get(idCol) &&
- abs(cpuUsage - reader.getDouble(cpuUsageCol)) < 0.01 &&
- cpuCount == reader.getInt(cpuCountCol)
- } while (shouldContinue)
+ duration += nextTimestamp - timestamp
+ timestamp = nextTimestamp
+ }
writer.startRow()
writer.set(RESOURCE_ID, id)
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp))
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp))
writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.endRow()
count++
-
- lastId = id
- lastTimestamp = timestamp
}
return count
@@ -342,6 +353,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
*/
private val CPU_CAPACITY = 2500.0
+ /**
+ * The interval at which the samples where taken.
+ */
+ private val SAMPLE_INTERVAL = Duration.ofMinutes(5)
+
+ /**
+ * The difference in CPU usage for the algorithm to cascade samples.
+ */
+ private val SAMPLE_CASCADE_DIFF = 0.1
+
override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
@@ -379,10 +400,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
writer.startRow()
writer.set(RESOURCE_ID, id)
- writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime))
- writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime))
+ writer.set(RESOURCE_START_TIME, startInstant)
+ writer.set(RESOURCE_STOP_TIME, stopInstant)
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
- writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCount * CPU_CAPACITY)
+ writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity)
writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity)
writer.endRow()
}
@@ -393,6 +414,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val states = HashMap<String, State>()
+ val sampleInterval = SAMPLE_INTERVAL.toMillis()
val idCol = reader.resolve(RESOURCE_ID)
val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
@@ -405,39 +427,49 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val resource = selected[id] ?: continue
val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz
- val state = states.computeIfAbsent(id) { State(resource, cpuUsage) }
+ val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) }
val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
- val duration = (timestamp - state.startTime)
-
- state.duration = duration
-
- if (abs(cpuUsage - state.cpuUsage) > 0.01) {
- state.write(writer)
-
- state.startTime = timestamp
- state.duration = 0
- state.cpuUsage = cpuUsage
+ val delta = (timestamp - state.time)
+
+ // Check whether the next sample can be cascaded with the current sample:
+ // (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`)
+ // (2) The interval between both samples is not higher than `SAMPLE_INTERVAL`
+ if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) {
+ state.time = timestamp
+ state.duration += delta
+ continue
}
+ state.write(writer)
+ // Reset the state fields
+ state.time = timestamp
+ state.duration = sampleInterval
+ // Count write
count++
}
for ((_, state) in states) {
- state.duration += state.startTime - state.resource.stopTime.toEpochMilli()
state.write(writer)
+ count++
}
return count
}
- private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double) {
- @JvmField var startTime: Long = resource.startTime.toEpochMilli()
- @JvmField var duration: Long = 30000L
+ private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) {
+ @JvmField var time: Long = resource.startTime.toEpochMilli()
+ private var lastWrite: Long = Long.MIN_VALUE
fun write(writer: TableWriter) {
+ // Check whether this timestamp was already written
+ if (lastWrite == time) {
+ return
+ }
+ lastWrite = time
+
writer.startRow()
writer.set(RESOURCE_ID, resource.id)
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTime))
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time))
writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount)
diff --git a/traces/bitbrains-small/meta.parquet b/traces/bitbrains-small/meta.parquet
index da6e5330..9cded35f 100644
--- a/traces/bitbrains-small/meta.parquet
+++ b/traces/bitbrains-small/meta.parquet
Binary files differ
diff --git a/traces/bitbrains-small/trace.parquet b/traces/bitbrains-small/trace.parquet
index fe0a254c..9d953956 100644
--- a/traces/bitbrains-small/trace.parquet
+++ b/traces/bitbrains-small/trace.parquet
Binary files differ