diff options
Diffstat (limited to 'opendc-experiments')
9 files changed, 212 insertions, 110 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index fa9fa2fc..d7df4454 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -255,6 +255,8 @@ suspend fun processTrace( offset = entry.start - clock.millis() } + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) @@ -275,7 +277,7 @@ suspend fun processTrace( override fun onStateChanged(server: Server, newState: ServerState) { monitor.reportVmStateChange(clock.millis(), server, newState) - if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { + if (newState == ServerState.TERMINATED) { cont.resume(Unit) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index f520a28c..7fb2f83c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -74,7 +74,7 @@ public class ExperimentMetricExporter( m.overcommissionedBurst = v.toLong() } - mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> + mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v.toLong() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 5ad75565..0f49ecd2 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -53,8 +53,7 @@ public class ParquetTraceReader( this.zip(listOf(workload)) } } - .map { sampleWorkload(it.first, workload, it.second, seed) } - .flatten() + .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) } .iterator() override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index 7cd1f159..d7daa35b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument import com.github.ajalt.clikt.parameters.groups.OptionGroup import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import me.tongfei.progressbar.ProgressBar @@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.format.trace.bitbrains.BitbrainsTraceReader import org.opendc.format.util.LocalOutputFile +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader -import java.util.Random +import java.util.* import kotlin.math.max import kotlin.math.min @@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { metaSchema: Schema, metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf<Fragment>() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - + val fragments = mutableListOf<Fragment>() + BitbrainsTraceReader(traceDirectory).use { reader -> + reader.forEach { entry -> + val trace = (entry.workload as SimTraceWorkload).trace var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) + trace.forEach { fragment -> + val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + fragments.add( + Fragment( + entry.name, + fragment.timestamp, + flops, + fragment.duration, + fragment.usage, + fragment.cores + ) + ) + maxTime = max(maxTime, fragment.timestamp + fragment.duration) } val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) + metaRecord.put("id", entry.name) + metaRecord.put("submissionTime", entry.start) metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) + metaRecord.put("maxCores", entry.meta["cores"]) + metaRecord.put("requiredMemory", entry.meta["required-memory"]) metaWriter.write(metaRecord) } - - return allFragments + } + return fragments } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 393fb88d..a3300b71 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -39,12 +39,15 @@ import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader +import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation import java.io.File +import java.util.* /** * An integration test suite for the SC20 experiments. @@ -63,6 +66,9 @@ class CapelinIntegrationTest { monitor = TestExperimentReporter() } + /** + * Test a large simulation setup. + */ @Test fun testLarge() = runBlockingSimulation { val failures = false @@ -114,13 +120,16 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(155252275351, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(155086837645, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(725049, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(219751355711, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(206351165081, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(1148906334, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } + /** + * Test a small simulation setup. + */ @Test fun testSmall() = runBlockingSimulation { val seed = 1 @@ -151,9 +160,105 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(29454904468, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(29355293349, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(0, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + ) + } + + /** + * Test a small simulation setup with interference. + */ + @Test + fun testInterference() = runBlockingSimulation { + val seed = 1 + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + val traceReader = createTestTraceReader(0.25, seed) + val environmentReader = createTestEnvironmentReader("single") + + val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) + val performanceInterferenceModel = + PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) } + + val meterProvider = createMeterProvider(clock) + + withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } + } + + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(13885404, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + ) + } + + /** + * Test a small simulation setup with failures. + */ + @Test + fun testFailures() = runBlockingSimulation { + val seed = 1 + val chan = Channel<Unit>(Channel.CONFLATED) + val allocationPolicy = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + val traceReader = createTestTraceReader(0.25, seed) + val environmentReader = createTestEnvironmentReader("single") + + val meterProvider = createMeterProvider(clock) + + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> + val failureDomain = + createFailureDomain( + this, + clock, + seed, + 24.0 * 7, + scheduler, + chan + ) + + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } + + failureDomain.cancel() + } + + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(25336984869, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(23668547517, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(368151656, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json new file mode 100644 index 00000000..51fc6366 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json @@ -0,0 +1,21 @@ +[ + { + "vms": [ + "141", + "379", + "851", + "116" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "205", + "116", + "463" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet Binary files differindex ce7a812c..ee76d38f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet Binary files differindex 1d7ce882..9b1cde13 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-experiments/opendc-experiments-radice/build.gradle.kts new file mode 100644 index 00000000..c1515165 --- /dev/null +++ b/opendc-experiments/opendc-experiments-radice/build.gradle.kts @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiments for the Risk Analysis work" + +/* Build configuration */ +plugins { + `experiment-conventions` + `testing-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcHarness.opendcHarnessApi) + implementation(projects.opendcFormat) + implementation(projects.opendcSimulator.opendcSimulatorCore) + implementation(projects.opendcSimulator.opendcSimulatorCompute) + implementation(projects.opendcSimulator.opendcSimulatorFailures) + implementation(projects.opendcCompute.opendcComputeSimulator) + implementation(projects.opendcTelemetry.opendcTelemetrySdk) + + implementation(libs.kotlin.logging) + implementation(libs.config) + implementation(libs.progressbar) + implementation(libs.clikt) + + implementation(libs.parquet) + testImplementation(libs.log4j.slf4j) +} |
