summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt127
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt117
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json21
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquetbin2148 -> 2081 bytes
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquetbin1672463 -> 1647189 bytes
8 files changed, 164 insertions, 110 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index fa9fa2fc..d7df4454 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -255,6 +255,8 @@ suspend fun processTrace(
offset = entry.start - clock.millis()
}
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
@@ -275,7 +277,7 @@ suspend fun processTrace(
override fun onStateChanged(server: Server, newState: ServerState) {
monitor.reportVmStateChange(clock.millis(), server, newState)
- if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
+ if (newState == ServerState.TERMINATED) {
cont.resume(Unit)
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index f520a28c..7fb2f83c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -74,7 +74,7 @@ public class ExperimentMetricExporter(
m.overcommissionedBurst = v.toLong()
}
- mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v ->
m.interferedBurst = v.toLong()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 5ad75565..0f49ecd2 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -53,8 +53,7 @@ public class ParquetTraceReader(
this.zip(listOf(workload))
}
}
- .map { sampleWorkload(it.first, workload, it.second, seed) }
- .flatten()
+ .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) }
.iterator()
override fun hasNext(): Boolean = iterator.hasNext()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index 7cd1f159..d7daa35b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
import com.github.ajalt.clikt.parameters.groups.OptionGroup
import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import me.tongfei.progressbar.ProgressBar
@@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
import org.opendc.format.util.LocalOutputFile
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
-import java.util.Random
+import java.util.*
import kotlin.math.max
import kotlin.math.min
@@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
- val timestampCol = 0
- val cpuUsageCol = 3
- val coreCol = 1
- val provisionedMemoryCol = 5
- val traceInterval = 5 * 60 * 1000L
-
- val allFragments = mutableListOf<Fragment>()
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .drop(1)
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(";\t")
-
- vmId = vmFile.name
-
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- cores = values[coreCol].trim().toInt()
- val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
- requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
+ val fragments = mutableListOf<Fragment>()
+ BitbrainsTraceReader(traceDirectory).use { reader ->
+ reader.forEach { entry ->
+ val trace = (entry.workload as SimTraceWorkload).trace
var maxTime = Long.MIN_VALUE
- flopsFragments.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
+ trace.forEach { fragment ->
+ val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ fragments.add(
+ Fragment(
+ entry.name,
+ fragment.timestamp,
+ flops,
+ fragment.duration,
+ fragment.usage,
+ fragment.cores
+ )
+ )
+ maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
+ metaRecord.put("id", entry.name)
+ metaRecord.put("submissionTime", entry.start)
metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
+ metaRecord.put("maxCores", entry.meta["cores"])
+ metaRecord.put("requiredMemory", entry.meta["required-memory"])
metaWriter.write(metaRecord)
}
-
- return allFragments
+ }
+ return fragments
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 393fb88d..a3300b71 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -39,12 +39,15 @@ import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import java.io.File
+import java.util.*
/**
* An integration test suite for the SC20 experiments.
@@ -63,6 +66,9 @@ class CapelinIntegrationTest {
monitor = TestExperimentReporter()
}
+ /**
+ * Test a large simulation setup.
+ */
@Test
fun testLarge() = runBlockingSimulation {
val failures = false
@@ -114,13 +120,16 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(155252275351, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(155086837645, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(725049, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(219751355711, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(206351165081, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(1148906334, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
+ /**
+ * Test a small simulation setup.
+ */
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
@@ -151,9 +160,105 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(0, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ )
+ }
+
+ /**
+ * Test a small simulation setup with interference.
+ */
+ @Test
+ fun testInterference() = runBlockingSimulation {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ val traceReader = createTestTraceReader(0.25, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+
+ val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json"))
+ val performanceInterferenceModel =
+ PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) }
+
+ val meterProvider = createMeterProvider(clock)
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+ }
+
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(13885404, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ )
+ }
+
+ /**
+ * Test a small simulation setup with failures.
+ */
+ @Test
+ fun testFailures() = runBlockingSimulation {
+ val seed = 1
+ val chan = Channel<Unit>(Channel.CONFLATED)
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ val traceReader = createTestTraceReader(0.25, seed)
+ val environmentReader = createTestEnvironmentReader("single")
+
+ val meterProvider = createMeterProvider(clock)
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
+ val failureDomain =
+ createFailureDomain(
+ this,
+ clock,
+ seed,
+ 24.0 * 7,
+ scheduler,
+ chan
+ )
+
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+
+ failureDomain.cancel()
+ }
+
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(25336984869, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(23668547517, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(368151656, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
new file mode 100644
index 00000000..51fc6366
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json
@@ -0,0 +1,21 @@
+[
+ {
+ "vms": [
+ "141",
+ "379",
+ "851",
+ "116"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "205",
+ "116",
+ "463"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
index ce7a812c..ee76d38f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
index 1d7ce882..9b1cde13 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet
Binary files differ