summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-24 13:15:26 +0200
committerGitHub <noreply@github.com>2021-08-24 13:15:26 +0200
commitac48fa12f36180de31154a7c828b4dc281dac94b (patch)
tree3a1117b62e627094ea2859a8b1cb910ef7046851 /opendc-experiments/opendc-experiments-capelin/src
parent51515bb255b3b32ca3020419a0c84130a4d8d370 (diff)
parent5266ecd476a18f601cb4eb6166f4c8338c440210 (diff)
merge: Add tests for interference and failures in Capelin
This pull request updates the Capelin experiments to test for interference and failure scenarios. This allows us to track regressions in these subsystems more easily. * Clean up Bitbrains trace reader to enable re-use * Keep trace order after sampling * Update Bitbrains trace tests * Add support for reporting interfered work * Add support for SimHost failure * Add tests for interference and failures
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt127
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt117
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin2148 -> 2081 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin1672463 -> 1647189 bytes
8 files changed, 164 insertions, 110 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index fa9fa2fc..d7df4454 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -255,6 +255,8 @@ suspend fun processTrace(
offset = entry.start - clock.millis()
}
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
@@ -275,7 +277,7 @@ suspend fun processTrace(
override fun onStateChanged(server: Server, newState: ServerState) {
monitor.reportVmStateChange(clock.millis(), server, newState)
- if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
+ if (newState == ServerState.TERMINATED) {
cont.resume(Unit)
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index f520a28c..7fb2f83c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -74,7 +74,7 @@ public class ExperimentMetricExporter(
m.overcommissionedBurst = v.toLong()
}
- mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v ->
m.interferedBurst = v.toLong()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 5ad75565..0f49ecd2 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -53,8 +53,7 @@ public class ParquetTraceReader(
this.zip(listOf(workload))
}
}
- .map { sampleWorkload(it.first, workload, it.second, seed) }
- .flatten()
+ .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) }
.iterator()
override fun hasNext(): Boolean = iterator.hasNext()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index 7cd1f159..d7daa35b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
import com.github.ajalt.clikt.parameters.groups.OptionGroup
import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import me.tongfei.progressbar.ProgressBar
@@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
import org.opendc.format.util.LocalOutputFile
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
-import java.util.Random
+import java.util.*
import kotlin.math.max
import kotlin.math.min
@@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
- val timestampCol = 0
- val cpuUsageCol = 3
- val coreCol = 1
- val provisionedMemoryCol = 5
- val traceInterval = 5 * 60 * 1000L
-
- val allFragments = mutableListOf<Fragment>()
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .drop(1)
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(";\t")
-
- vmId = vmFile.name
-
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- cores = values[coreCol].trim().toInt()
- val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
- requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
+ val fragments = mutableListOf<Fragment>()
+ BitbrainsTraceReader(traceDirectory).use { reader ->
+ reader.forEach { entry ->
+ val trace = (entry.workload as SimTraceWorkload).trace
var maxTime = Long.MIN_VALUE
- flopsFragments.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
+ trace.forEach { fragment ->
+ val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ fragments.add(
+ Fragment(
+ entry.name,
+ fragment.timestamp,
+ flops,
+ fragment.duration,
+ fragment.usage,
+ fragment.cores
+ )
+ )
+ maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
+ metaRecord.put("id", entry.name)
+ metaRecord.put("submissionTime", entry.start)
metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
+ metaRecord.put("maxCores", entry.meta["cores"])
+ metaRecord.put("requiredMemory", entry.meta["required-memory"])
metaWriter.write(metaRecord)
}
-
- return allFragments
+ }
+ return fragments
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 393fb88d..a3300b71 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -39,12 +39,15 @@ import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import java.io.File
+import java.util.*
/**
* An integration test suite for the SC20 experiments.
@@ -63,6 +66,9 @@ class CapelinIntegrationTest {
monitor = TestExperimentReporter()
}
+ /**
+ * Test a large simulation setup.
+ */
@Test
fun testLarge() = runBlockingSimulation {
val failures = false
@@ -114,13 +120,16 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(155252275351, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(155086837645, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(725049, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(219751355711, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(206351165081, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(1148906334, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
+ /**
+ * Test a small simulation setup.
+ */
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
@@ -151,9 +160,105 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(0, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ )
+ }
+
+ /**
+ * Test a small simulation setup with interference.
+ */
+ @Test
+ fun testInterference() = runBlockingSimulation {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ val traceReader = createTestTraceReader(0.25, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+
+ val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
+ val performanceInterferenceModel =
+ PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) }
+
+ val meterProvider = createMeterProvider(clock)
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+ }
+
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(13885404, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ )
+ }
+
+ /**
+ * Test a small simulation setup with failures.
+ */
+ @Test
+ fun testFailures() = runBlockingSimulation {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ val traceReader = createTestTraceReader(0.25, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+
+ val meterProvider = createMeterProvider(clock)
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
+ val failureDomain =
+ createFailureDomain(
+ this,
+ clock,
+ seed,
+ 24.0 * 7,
+ scheduler,
+ chan
+ )
+
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+
+ failureDomain.cancel()
+ }
+
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(25336984869, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(23668547517, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(368151656, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
new file mode 100644
index 00000000..51fc6366
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
@@ -0,0 +1,21 @@
+[
+ {
+ "vms": [
+ "141",
+ "379",
+ "851",
+ "116"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "205",
+ "116",
+ "463"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
index ce7a812c..ee76d38f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
index 1d7ce882..9b1cde13 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
Binary files differ