From 9dba898ba5b1ff55824569ac397c757e7c882794 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 16:07:39 +0200 Subject: refactor: Make trace converter more reusable --- .../com/atlarge/opendc/experiments/sc20/TraceConverter.kt | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt index d005a157..c62f59f9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -36,10 +36,14 @@ import kotlin.math.max import kotlin.math.min /** - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + * A script to convert a trace in text format into a Parquet trace. */ -fun main() { +fun main(args: Array) { + if (args.size < 2) { + println("error: expected ") + return + } + val metaSchema = SchemaBuilder .record("meta") .namespace("com.atlarge.opendc.format.sc20") @@ -69,8 +73,8 @@ fun main() { val provisionedMemoryCol = 20 val traceInterval = 5 * 60 * 1000L - val dest = File("../traces/solvinity/small-parquet") - val traceDirectory = File("../traces/solvinity/small") + val dest = File(args[0]) + val traceDirectory = File(args[1]) val vms = traceDirectory.walk() .filterNot { it.isDirectory } -- cgit v1.2.3 From 4aa1ed9d20c7a87c6b5388ddf33b25769f91f20b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 21:31:38 +0200 Subject: refactor: Rename VmTraceReader to BitbrainsTraceReader --- .../format/trace/bitbrains/BitbrainsTraceReader.kt | 173 +++++++++++++++++++++ .../opendc/format/trace/sc20/Sc20TraceReader.kt | 2 - .../opendc/format/trace/vm/VmTraceReader.kt | 173 --------------------- 3 files changed, 173 insertions(+), 175 deletions(-) create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt delete mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt new file mode 100644 index 00000000..5220af9b --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -0,0 +1,173 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.bitbrains + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.UUID + +/** + * A [TraceReader] for the public VM workload trace format. + * + * @param traceDirectory The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +class BitbrainsTraceReader( + traceDirectory: File, + performanceInterferenceModel: PerformanceInterferenceModel +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + + var timestampCol = 0 + var coreCol = 0 + var cpuUsageCol = 0 + var provisionedMemoryCol = 0 + val traceInterval = 5 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .forEach { vmFile -> + println(vmFile) + val flopsHistory = mutableListOf() + var vmId = -1L + var cores = -1 + var requiredMemory = -1L + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(";\t") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + timestampCol = header["Timestamp [ms]"]!! + coreCol = header["CPU cores"]!! + cpuUsageCol = header["CPU usage [MHZ]"]!! + provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! + return@forEachIndexed + } + + vmId = vmFile.nameWithoutExtension.trim().toLong() + val timestamp = values[timestampCol].trim().toLong() - 5 * 60 + cores = values[coreCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() + + val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() + + if (flopsHistory.isEmpty()) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + } else { + if (flopsHistory.last().flops != flops) { + flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + } else { + val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) + flopsHistory.add( + FlopsHistoryFragment( + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + ) + } + } + } + } + + val uuid = UUID(0L, vmId) + + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() + ) + + val vmWorkload = VmWorkload( + uuid, "VM Workload $vmId", UnnamedUser, + VmImage( + uuid, + vmId.toString(), + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + flopsHistory.asSequence(), + cores, + requiredMemory + ) + ) + entries[vmId] = TraceEntryImpl( + flopsHistory.firstOrNull()?.tick ?: -1, + vmWorkload + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 2e2159ba..c53cd569 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -117,7 +117,6 @@ class Sc20TraceReader( reader.lineSequence() .chunked(128) .forEach { lines -> - // val res = ArrayList(lines.size) for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { @@ -149,7 +148,6 @@ class Sc20TraceReader( fragment } } - // yieldAll(res) } if (last != null) { diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt deleted file mode 100644 index fe1049d9..00000000 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ /dev/null @@ -1,173 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.format.trace.vm - -import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment -import com.atlarge.opendc.compute.core.image.VmImage -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.core.User -import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.format.trace.TraceEntry -import com.atlarge.opendc.format.trace.TraceReader -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.UUID - -/** - * A [TraceReader] for the public VM workload trace format. - * - * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -class VmTraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - - var timestampCol = 0 - var coreCol = 0 - var cpuUsageCol = 0 - var provisionedMemoryCol = 0 - val traceInterval = 5 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .forEach { vmFile -> - println(vmFile) - val flopsHistory = mutableListOf() - var vmId = -1L - var cores = -1 - var requiredMemory = -1L - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(";\t") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - timestampCol = header["Timestamp [ms]"]!! - coreCol = header["CPU cores"]!! - cpuUsageCol = header["CPU usage [MHZ]"]!! - provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! - return@forEachIndexed - } - - vmId = vmFile.nameWithoutExtension.trim().toLong() - val timestamp = values[timestampCol].trim().toLong() - 5 * 60 - cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() - - val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - if (flopsHistory.last().flops != flops) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( - FlopsHistoryFragment( - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - ) - } - } - } - } - - val uuid = UUID(0L, vmId) - - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() - ) - - val vmWorkload = VmWorkload( - uuid, "VM Workload $vmId", UnnamedUser, - VmImage( - uuid, - vmId.toString(), - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory.asSequence(), - cores, - requiredMemory - ) - ) - entries[vmId] = TraceEntryImpl( - flopsHistory.firstOrNull()?.tick ?: -1, - vmWorkload - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry -} -- cgit v1.2.3 From 3ed277efba4cf96be00ba6e975d4da7fdbfaa671 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 21:33:39 +0200 Subject: refactor: Modularize experiment code --- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 145 ++------- .../opendc/experiments/sc20/Sc20ParquetMonitor.kt | 149 ++++++++++ .../opendc/experiments/sc20/TestExperiment.kt | 324 ++++++++++++--------- 3 files changed, 364 insertions(+), 254 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index bac0de21..4b8b80a8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,66 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.flow.first -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread - -class Sc20Monitor( - destination: String -) : Closeable { - private val lastServerStates = mutableMapOf>() - private val schema = SchemaBuilder - .record("slice") - .namespace("com.atlarge.opendc.experiments.sc20") - .fields() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("requestedBurst").type().longType().noDefault() - .name("grantedBurst").type().longType().noDefault() - .name("overcommissionedBurst").type().longType().noDefault() - .name("interferedBurst").type().longType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("cpuDemand").type().doubleType().noDefault() - .name("numberOfDeployedImages").type().intType().noDefault() - .name("server").type().stringType().noDefault() - .name("hostState").type().stringType().noDefault() - .name("hostUsage").type().doubleType().noDefault() - .name("powerDraw").type().doubleType().noDefault() - .name("totalSubmittedVms").type().longType().noDefault() - .name("totalQueuedVms").type().longType().noDefault() - .name("totalRunningVms").type().longType().noDefault() - .name("totalFinishedVms").type().longType().noDefault() - .endRecord() - private val writer = AvroParquetWriter.builder(Path(destination)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - private val queue = ArrayBlockingQueue(2048) - private val writerThread = thread(start = true, name = "sc20-writer") { - try { - while (true) { - val record = queue.take() - writer.write(record) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - writer.close() - } - } +interface Sc20Monitor : Closeable { suspend fun onVmStateChanged(server: Server) {} suspend fun serverStateChanged( @@ -70,32 +38,7 @@ class Sc20Monitor( queuedVms: Long, runningVms: Long, finishedVms: Long - ) { - val lastServerState = lastServerStates[server] - if (server.state == ServerState.SHUTOFF && lastServerState != null) { - val duration = simulationContext.clock.millis() - lastServerState.second - onSliceFinish( - simulationContext.clock.millis(), - 0, - 0, - 0, - 0, - 0.0, - 0.0, - 0, - server, - submittedVms, - queuedVms, - runningVms, - finishedVms, - duration - ) - } - - println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") - - lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) - } + ) {} suspend fun onSliceFinish( time: Long, @@ -112,39 +55,5 @@ class Sc20Monitor( runningVms: Long, finishedVms: Long, duration: Long = 5 * 60 * 1000L - ) { - // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.services[BareMetalDriver.Key] - val usage = driver.usage.first() - val powerDraw = driver.powerDraw.first() - - val record = GenericData.Record(schema) - record.put("time", time) - record.put("duration", duration) - record.put("requestedBurst", requestedBurst) - record.put("grantedBurst", grantedBurst) - record.put("overcommissionedBurst", overcommissionedBurst) - record.put("interferedBurst", interferedBurst) - record.put("cpuUsage", cpuUsage) - record.put("cpuDemand", cpuDemand) - record.put("numberOfDeployedImages", numberOfDeployedImages) - record.put("server", hostServer.uid) - record.put("hostState", hostServer.state) - record.put("hostUsage", usage) - record.put("powerDraw", powerDraw) - record.put("totalSubmittedVms", submittedVms) - record.put("totalQueuedVms", queuedVms) - record.put("totalRunningVms", runningVms) - record.put("totalFinishedVms", finishedVms) - - queue.put(record) - } - - override fun close() { - // Busy loop to wait for writer thread to finish - while (queue.isNotEmpty()) { - Thread.sleep(500) - } - writerThread.interrupt() - } + ) {} } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt new file mode 100644 index 00000000..5e554196 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt @@ -0,0 +1,149 @@ +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import kotlinx.coroutines.flow.first +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread + +class Sc20ParquetMonitor( + destination: String +) : Sc20Monitor { + private val lastServerStates = mutableMapOf>() + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + private val queue = ArrayBlockingQueue(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } + + override suspend fun onVmStateChanged(server: Server) {} + + override suspend fun serverStateChanged( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { + val lastServerState = lastServerStates[server] + if (server.state == ServerState.SHUTOFF && lastServerState != null) { + val duration = simulationContext.clock.millis() - lastServerState.second + onSliceFinish( + simulationContext.clock.millis(), + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server, + submittedVms, + queuedVms, + runningVms, + finishedVms, + duration + ) + } + + println("[${simulationContext.clock.millis()}] HOST ${server.uid} ${server.state}") + + lastServerStates[server] = Pair(server.state, simulationContext.clock.millis()) + } + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + // Assume for now that the host is not virtualized and measure the current power draw + val driver = hostServer.services[BareMetalDriver.Key] + val usage = driver.usage.first() + val powerDraw = driver.powerDraw.first() + + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) + + queue.put(record) + } + + override fun close() { + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index a2f609a5..6f1e9aae 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -29,11 +29,14 @@ import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy @@ -43,7 +46,9 @@ import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy import com.atlarge.opendc.core.failure.CorrelatedFaultInjector import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper @@ -53,15 +58,19 @@ import com.xenomachina.argparser.default import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import java.io.File import java.io.FileReader +import java.lang.IllegalArgumentException import java.util.ServiceLoader +import java.util.TreeSet import kotlin.math.max import kotlin.random.Random @@ -101,6 +110,25 @@ class ExperimentParameters(parser: ArgParser) { } } +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + /** * Obtain the [FaultInjector] to use for the experiments. */ @@ -116,141 +144,89 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector { } /** - * Main entry point of the experiment. + * Create the trace reader from which the VM workloads are read. */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array) { - ArgParser(args).parseInto(::ExperimentParameters).run { - println("trace-directory: $traceDirectory") - println("environment-file: $environmentFile") - println("performance-interference-file: $performanceInterferenceFile") - println("vm-placement-file: $vmPlacementFile") - println("selected-vms-file: $selectedVmsFile") - println("seed: $seed") - println("failures: $failures") - println("allocation-policy: $allocationPolicy") - - val start = System.currentTimeMillis() - val monitor = Sc20Monitor(outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - val chan = Channel(Channel.CONFLATED) - - val vmPlacements = if (vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(vmPlacementFile!!).inputStream().buffered()).construct() - } - - val allocationPolicies = mapOf( - "mem" to AvailableMemoryAllocationPolicy(), - "mem-inv" to AvailableMemoryAllocationPolicy(true), - "core-mem" to AvailableCoreMemoryAllocationPolicy(), - "core-mem-inv" to AvailableCoreMemoryAllocationPolicy(true), - "active-servers" to NumberOfActiveServersAllocationPolicy(), - "active-servers-inv" to NumberOfActiveServersAllocationPolicy(true), - "provisioned-cores" to ProvisionedCoresAllocationPolicy(), - "provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true), - "replay" to ReplayAllocationPolicy(vmPlacements), - "random" to RandomAllocationPolicy(Random(seed)) - ) - - if (allocationPolicy !in allocationPolicies) { - println("error: unknown allocation policy $allocationPolicy") - println("Available:") - allocationPolicies.keys.forEach { key -> println(key) } - } - - root.launch { - val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) - .use { it.construct(root) } - - val performanceInterferenceStream = if (performanceInterferenceFile != null) { - File(performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - - val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - - println(simulationContext.clock.instant()) +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - // Wait for the bare metal nodes to be spawned - delay(10) + // Wait for the bare metal nodes to be spawned + delay(10) - val scheduler = SimpleVirtProvisioningService( - allocationPolicies.getValue(allocationPolicy), - simulationContext, - bareMetalProvisioner - ) + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) - // Wait for the hypervisors to be spawned - delay(10) + // Wait for the hypervisors to be spawned + delay(10) - val hypervisors = scheduler.drivers() + bareMetalProvisioner to scheduler +} - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(this) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } - .launchIn(this) + } } - - val failureDomain = if (failures) { - println("ENABLING failures") - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) } - domain - } else { - null } + .launchIn(domain) + } +} +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, vmPlacements: Map, monitor: Sc20Monitor) { + try { + coroutineScope { var submitted = 0L - val finished = Channel(Channel.RENDEZVOUS) - val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) + val finished = Channel(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + while (reader.hasNext()) { val (time, workload) = reader.next() @@ -262,9 +238,8 @@ fun main(args: Array) { println("Could not find placement data in VM placement file for VM $vmId") continue } - val machinesInCluster = - hypervisors.filter { (it as SimpleVirtDriver).server.name.contains(clusterName) } - if (machinesInCluster.isEmpty()) { + val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false + if (machineInCluster) { println("Ignored VM") continue } @@ -294,21 +269,98 @@ fun main(args: Array) { while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { finished.receive() } + } + } finally { + reader.close() + } +} - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - reader.close() - println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") +/** + * Main entry point of the experiment. + */ +@OptIn(ExperimentalCoroutinesApi::class) +fun main(args: Array) { + val cli = ArgParser(args).parseInto(::ExperimentParameters) + println("trace-directory: ${cli.traceDirectory}") + println("environment-file: ${cli.environmentFile}") + println("performance-interference-file: ${cli.performanceInterferenceFile}") + println("selected-vms-file: ${cli.selectedVmsFile}") + println("seed: ${cli.seed}") + println("failures: ${cli.failures}") + println("allocation-policy: ${cli.allocationPolicy}") + + val start = System.currentTimeMillis() + val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel(Channel.CONFLATED) + + val performanceInterferenceModel = try { + val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { + File(cli.performanceInterferenceFile!!).inputStream().buffered() + } else { + object {}.javaClass.getResourceAsStream("/env/performance-interference.json") } + Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + } catch (e: Throwable) { + monitor.close() + throw e + } + val vmPlacements = if (cli.vmPlacementFile == null) { + emptyMap() + } else { + Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() + } + val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) + val traceReader = try { + createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) + } catch (e: Throwable) { + monitor.close() + throw e + } + val allocationPolicy = when (cli.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(cli.seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") + } + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - runBlocking { - system.run() - system.terminate() + val failureDomain = if (cli.failures) { + println("ENABLING failures") + createFailureDomain(cli.seed, bareMetalProvisioner, chan) + } else { + null } - // Explicitly close the monitor to flush its buffer - monitor.close() + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, vmPlacements, monitor) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") } + + runBlocking { + system.run() + system.terminate() + } + + // Explicitly close the monitor to flush its buffer + monitor.close() } -- cgit v1.2.3 From 2b9b1e9e030dccacf9aa549fc49b2e5e382750bf Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 21:34:01 +0200 Subject: bug: Continue scheduling other VMs on unfittig VM --- .../opendc/compute/virt/service/SimpleVirtProvisioningService.kt | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 520f6dc5..2185b372 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -139,10 +139,13 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { unscheduledVms++ + incomingImages -= imageInstance + println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + continue + } else { + break } - - break } try { -- cgit v1.2.3 From 48f6a6f2d42851bc2eeed5b6ef41145740c70286 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 May 2020 22:35:24 +0200 Subject: test: Add initial integration test for SC20 experiments --- opendc/opendc-experiments-sc20/build.gradle.kts | 2 +- .../opendc/experiments/sc20/Sc20Experiment.kt | 366 +++++++++++++++++++++ .../opendc/experiments/sc20/TestExperiment.kt | 366 --------------------- .../opendc/experiments/sc20/Sc20IntegrationTest.kt | 183 +++++++++++ .../src/test/resources/env/topology.txt | 5 + .../src/test/resources/trace/meta.parquet | Bin 0 -> 2148 bytes .../src/test/resources/trace/trace.parquet | Bin 0 -> 1672463 bytes .../sc20/Sc20ClusterEnvironmentReader.kt | 11 +- 8 files changed, 562 insertions(+), 371 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt delete mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt create mode 100644 opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt create mode 100644 opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt create mode 100644 opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet create mode 100644 opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 8611ffa7..ccfa3038 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -31,7 +31,7 @@ plugins { } application { - mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" + mainClassName = "com.atlarge.opendc.experiments.sc20.Sc20ExperimentKt" applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt new file mode 100644 index 00000000..fc4b9058 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt @@ -0,0 +1,366 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy +import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import com.xenomachina.argparser.ArgParser +import com.xenomachina.argparser.default +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import java.io.File +import java.io.FileReader +import java.lang.IllegalArgumentException +import java.util.ServiceLoader +import java.util.TreeSet +import kotlin.math.max +import kotlin.random.Random + +class ExperimentParameters(parser: ArgParser) { + val traceDirectory by parser.storing("path to the trace directory") + val environmentFile by parser.storing("path to the environment file") + val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } + val vmPlacementFile by parser.storing("path to the VM placement file").default { null } + val outputFile by parser.storing("path to where the output should be stored") + .default { "data/results-${System.currentTimeMillis()}.parquet" } + val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } + .default { emptyList() } + val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { + parseVMs(FileReader(File(this)).readText()) + } + .default { emptyList() } + val seed by parser.storing("the random seed") { toInt() } + .default(0) + val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") + val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") + + fun getSelectedVmList(): List { + return if (selectedVms.isEmpty()) { + selectedVmsFile + } else { + selectedVms + } + } + + private fun parseVMs(string: String): List { + // Handle case where VM list contains a VM name with an (escaped) single-quote in it + val sanitizedString = string.replace("\\'", "\\\\[") + .replace("'", "\"") + .replace("\\\\[", "'") + val vms: List = jacksonObjectMapper().readValue(sanitizedString) + return vms + } +} + +/** + * Construct the failure domain for the experiments. + */ +suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel): Domain { + val root = simulationContext.domain + val domain = root.newDomain(name = "failures") + domain.launch { + chan.receive() + val random = Random(seed) + val injectors = mutableMapOf() + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + return domain +} + +/** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain, random: Random): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + // GRID'5000 + return CorrelatedFaultInjector(domain, + iatScale = -1.39, iatShape = 1.03, // Hours + sizeScale = 1.88, sizeShape = 1.25, + dScale = 9.51, dShape = 3.21, // Minutes + random = random + ) +} + +/** + * Create the trace reader from which the VM workloads are read. + */ +fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { + return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) +} + +/** + * Construct the environment for a VM provisioner and return the provisioner instance. + */ +suspend fun createProvisioner( + root: Domain, + environmentReader: EnvironmentReader, + allocationPolicy: AllocationPolicy +): Pair = withContext(root.coroutineContext) { + val environment = environmentReader.use { it.construct(root) } + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) + + // Wait for the hypervisors to be spawned + delay(10) + + bareMetalProvisioner to scheduler +} + +/** + * Attach the specified monitor to the VM provisioner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { + val domain = simulationContext.domain + val hypervisors = scheduler.drivers() + + // Monitor hypervisor events + for (hypervisor in hypervisors) { + // TODO Do not expose VirtDriver directly but use Hypervisor class. + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + hypervisor.server.events + .onEach { event -> + when (event) { + is ServerEvent.StateChanged -> { + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) + } + } + } + .launchIn(domain) + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( + simulationContext.clock.millis(), + event.requestedBurst, + event.grantedBurst, + event.overcommissionedBurst, + event.interferedBurst, + event.cpuUsage, + event.cpuDemand, + event.numberOfDeployedImages, + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms + ) + } + } + .launchIn(domain) + } +} + +/** + * Process the trace. + */ +suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, monitor: Sc20Monitor, vmPlacements: Map = emptyMap()) { + val domain = simulationContext.domain + + try { + var submitted = 0L + val finished = Channel(Channel.CONFLATED) + val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) + + while (reader.hasNext()) { + val (time, workload) = reader.next() + + if (vmPlacements.isNotEmpty()) { + val vmId = workload.name.replace("VM Workload ", "") + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null) { + println("Could not find placement data in VM placement file for VM $vmId") + continue + } + val machineInCluster = hypervisors.ceiling(clusterName)?.contains(clusterName) ?: false + if (machineInCluster) { + println("Ignored VM") + continue + } + } + + submitted++ + delay(max(0, time - simulationContext.clock.millis())) + domain.launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.maxCores, workload.image.requiredMemory) + ) + // Monitor server events + server.events + .onEach { + if (it is ServerEvent.StateChanged) { + monitor.onVmStateChanged(it.server) + } + + delay(1) + finished.send(Unit) + } + .collect() + } + } + + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { + finished.receive() + } + } finally { + reader.close() + } +} + +/** + * Main entry point of the experiment. + */ +@OptIn(ExperimentalCoroutinesApi::class) +fun main(args: Array) { + val cli = ArgParser(args).parseInto(::ExperimentParameters) + println("trace-directory: ${cli.traceDirectory}") + println("environment-file: ${cli.environmentFile}") + println("performance-interference-file: ${cli.performanceInterferenceFile}") + println("selected-vms-file: ${cli.selectedVmsFile}") + println("seed: ${cli.seed}") + println("failures: ${cli.failures}") + println("allocation-policy: ${cli.allocationPolicy}") + + val start = System.currentTimeMillis() + val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) + + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider("test") + val root = system.newDomain("root") + + val chan = Channel(Channel.CONFLATED) + + val performanceInterferenceModel = try { + val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { + File(cli.performanceInterferenceFile!!).inputStream().buffered() + } else { + object {}.javaClass.getResourceAsStream("/env/performance-interference.json") + } + Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + } catch (e: Throwable) { + monitor.close() + throw e + } + val vmPlacements = if (cli.vmPlacementFile == null) { + emptyMap() + } else { + Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() + } + val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) + val traceReader = try { + createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) + } catch (e: Throwable) { + monitor.close() + throw e + } + val allocationPolicy = when (cli.allocationPolicy) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(cli.seed)) + "replay" -> ReplayAllocationPolicy(vmPlacements) + else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") + } + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) + + val failureDomain = if (cli.failures) { + println("ENABLING failures") + createFailureDomain(cli.seed, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, monitor, vmPlacements) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") + } + + runBlocking { + system.run() + system.terminate() + } + + // Explicitly close the monitor to flush its buffer + monitor.close() +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt deleted file mode 100644 index 6f1e9aae..00000000 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ /dev/null @@ -1,366 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.experiments.sc20 - -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver -import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService -import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy -import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy -import com.atlarge.opendc.core.failure.CorrelatedFaultInjector -import com.atlarge.opendc.core.failure.FailureDomain -import com.atlarge.opendc.core.failure.FaultInjector -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import com.atlarge.opendc.format.trace.TraceReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import com.xenomachina.argparser.ArgParser -import com.xenomachina.argparser.default -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import java.io.File -import java.io.FileReader -import java.lang.IllegalArgumentException -import java.util.ServiceLoader -import java.util.TreeSet -import kotlin.math.max -import kotlin.random.Random - -class ExperimentParameters(parser: ArgParser) { - val traceDirectory by parser.storing("path to the trace directory") - val environmentFile by parser.storing("path to the environment file") - val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } - val vmPlacementFile by parser.storing("path to the VM placement file").default { null } - val outputFile by parser.storing("path to where the output should be stored") - .default { "data/results-${System.currentTimeMillis()}.parquet" } - val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } - .default { emptyList() } - val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { - parseVMs(FileReader(File(this)).readText()) - } - .default { emptyList() } - val seed by parser.storing("the random seed") { toInt() } - .default(0) - val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") - val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") - - fun getSelectedVmList(): List { - return if (selectedVms.isEmpty()) { - selectedVmsFile - } else { - selectedVms - } - } - - private fun parseVMs(string: String): List { - // Handle case where VM list contains a VM name with an (escaped) single-quote in it - val sanitizedString = string.replace("\\'", "\\\\[") - .replace("'", "\"") - .replace("\\\\[", "'") - val vms: List = jacksonObjectMapper().readValue(sanitizedString) - return vms - } -} - -/** - * Construct the failure domain for the experiments. - */ -suspend fun createFailureDomain(seed: Int, bareMetalProvisioner: ProvisioningService, chan: Channel): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String - val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } - injector.enqueue(node.metadata["driver"] as FailureDomain) - } - } - return domain -} - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -fun createFaultInjector(domain: Domain, random: Random): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector(domain, - iatScale = -1.39, iatShape = 1.03, // Hours - sizeScale = 1.88, sizeShape = 1.25, - dScale = 9.51, dShape = 3.21, // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInterferenceModel, vms: List, seed: Int): Sc20ParquetTraceReader { - return Sc20ParquetTraceReader(path, performanceInterferenceModel, vms, Random(seed)) -} - -/** - * Construct the environment for a VM provisioner and return the provisioner instance. - */ -suspend fun createProvisioner( - root: Domain, - environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): Pair = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val scheduler = SimpleVirtProvisioningService(allocationPolicy, simulationContext, bareMetalProvisioner) - - // Wait for the hypervisors to be spawned - delay(10) - - bareMetalProvisioner to scheduler -} - -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) { - val domain = simulationContext.domain - val hypervisors = scheduler.drivers() - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - hypervisor.server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) - } - } - } - .launchIn(domain) - hypervisor.events - .onEach { event -> - when (event) { - is HypervisorEvent.SliceFinished -> monitor.onSliceFinish( - simulationContext.clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.hostServer, - scheduler.submittedVms, - scheduler.queuedVms, - scheduler.runningVms, - scheduler.finishedVms - ) - } - } - .launchIn(domain) - } -} - -/** - * Process the trace. - */ -suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtProvisioningService, chan: Channel, vmPlacements: Map, monitor: Sc20Monitor) { - try { - coroutineScope { - var submitted = 0L - val finished = Channel(Channel.CONFLATED) - val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name }) - - while (reader.hasNext()) { - val (time, workload) = reader.next() - - if (vmPlacements.isNotEmpty()) { - val vmId = workload.name.replace("VM Workload ", "") - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null) { - println("Could not find placement data in VM placement file for VM $vmId") - continue - } - val machineInCluster = hypervisors.ceiling(clusterName)?.let { it.contains(clusterName) } ?: false - if (machineInCluster) { - println("Ignored VM") - continue - } - } - - submitted++ - delay(max(0, time - simulationContext.clock.millis())) - launch { - chan.send(Unit) - val server = scheduler.deploy( - workload.image.name, workload.image, - Flavor(workload.image.maxCores, workload.image.requiredMemory) - ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.onVmStateChanged(it.server) - } - - finished.send(Unit) - } - .collect() - } - } - - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { - finished.receive() - } - } - } finally { - reader.close() - } -} - -/** - * Main entry point of the experiment. - */ -@OptIn(ExperimentalCoroutinesApi::class) -fun main(args: Array) { - val cli = ArgParser(args).parseInto(::ExperimentParameters) - println("trace-directory: ${cli.traceDirectory}") - println("environment-file: ${cli.environmentFile}") - println("performance-interference-file: ${cli.performanceInterferenceFile}") - println("selected-vms-file: ${cli.selectedVmsFile}") - println("seed: ${cli.seed}") - println("failures: ${cli.failures}") - println("allocation-policy: ${cli.allocationPolicy}") - - val start = System.currentTimeMillis() - val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile) - - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") - - val chan = Channel(Channel.CONFLATED) - - val performanceInterferenceModel = try { - val performanceInterferenceStream = if (cli.performanceInterferenceFile != null) { - File(cli.performanceInterferenceFile!!).inputStream().buffered() - } else { - object {}.javaClass.getResourceAsStream("/env/performance-interference.json") - } - Sc20PerformanceInterferenceReader(performanceInterferenceStream) - .construct() - } catch (e: Throwable) { - monitor.close() - throw e - } - val vmPlacements = if (cli.vmPlacementFile == null) { - emptyMap() - } else { - Sc20VmPlacementReader(File(cli.vmPlacementFile!!).inputStream().buffered()).construct() - } - val environmentReader = Sc20ClusterEnvironmentReader(File(cli.environmentFile)) - val traceReader = try { - createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed) - } catch (e: Throwable) { - monitor.close() - throw e - } - val allocationPolicy = when (cli.allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(cli.seed)) - "replay" -> ReplayAllocationPolicy(vmPlacements) - else -> throw IllegalArgumentException("Unknown allocation policy: ${cli.allocationPolicy}") - } - - root.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner(root, environmentReader, allocationPolicy) - - val failureDomain = if (cli.failures) { - println("ENABLING failures") - createFailureDomain(cli.seed, bareMetalProvisioner, chan) - } else { - null - } - - attachMonitor(scheduler, monitor) - processTrace(traceReader, scheduler, chan, vmPlacements, monitor) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") - } - - runBlocking { - system.run() - system.terminate() - } - - // Explicitly close the monitor to flush its buffer - monitor.close() -} diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt new file mode 100644 index 00000000..dd0931e4 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -0,0 +1,183 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationEngine +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.io.File +import java.util.ServiceLoader + +/** + * An integration test suite for the SC20 experiments. + */ +class Sc20IntegrationTest { + /** + * The simulation engine to use. + */ + private lateinit var simulationEngine: SimulationEngine + + /** + * The root simulation domain to run in. + */ + private lateinit var root: Domain + + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestSc20Monitor + + /** + * Setup the experimental environment. + */ + @BeforeEach + fun setUp() { + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + simulationEngine = provider("test") + root = simulationEngine.newDomain("root") + monitor = TestSc20Monitor() + } + + /** + * Tear down the experimental environment. + */ + @AfterEach + fun tearDown() = runBlocking { + simulationEngine.terminate() + } + + @Test + fun smoke() { + val failures = false + val seed = 0 + val chan = Channel(Channel.CONFLATED) + val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val traceReader = createTestTraceReader() + val environmentReader = createTestEnvironmentReader() + lateinit var scheduler: SimpleVirtProvisioningService + + root.launch { + val res = createProvisioner(root, environmentReader, allocationPolicy) + val bareMetalProvisioner = res.first + scheduler = res.second + + val failureDomain = if (failures) { + println("ENABLING failures") + createFailureDomain(seed, bareMetalProvisioner, chan) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace(traceReader, scheduler, chan, monitor) + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + runSimulation() + + // Note that these values have been verified beforehand + assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") + assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") + assertEquals(207379117949, monitor.totalRequestedBurst) + assertEquals(207378478631, monitor.totalGrantedBurst) + assertEquals(639360, monitor.totalOvercommissionedBurst) + assertEquals(0, monitor.totalInterferedBurst) + } + + /** + * Run the simulation. + */ + private fun runSimulation() = runBlocking { + simulationEngine.run() + } + + /** + * Obtain the trace reader for the test. + */ + private fun createTestTraceReader(): TraceReader { + val performanceInterferenceStream = object {}.javaClass.getResourceAsStream("/env/performance-interference.json") + val performanceInterferenceModel = Sc20PerformanceInterferenceReader(performanceInterferenceStream) + .construct() + return createTraceReader(File("src/test/resources/trace"), performanceInterferenceModel, emptyList(), 0) + } + + /** + * Obtain the environment reader for the test. + */ + private fun createTestEnvironmentReader(): EnvironmentReader { + val stream = object {}.javaClass.getResourceAsStream("/env/topology.txt") + return Sc20ClusterEnvironmentReader(stream) + } + + class TestSc20Monitor : Sc20Monitor { + var totalRequestedBurst = 0L + var totalGrantedBurst = 0L + var totalOvercommissionedBurst = 0L + var totalInterferedBurst = 0L + + override suspend fun onSliceFinish( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, + duration: Long + ) { + totalRequestedBurst += requestedBurst + totalGrantedBurst += grantedBurst + totalOvercommissionedBurst += overcommissionedBurst + totalInterferedBurst += interferedBurst + } + override fun close() {} + } +} diff --git a/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/test/resources/env/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet new file mode 100644 index 00000000..ce7a812c Binary files /dev/null and b/opendc/opendc-experiments-sc20/src/test/resources/trace/meta.parquet differ diff --git a/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet new file mode 100644 index 00000000..1d7ce882 Binary files /dev/null and b/opendc/opendc-experiments-sc20/src/test/resources/trace/trace.parquet differ diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 2ef0db97..e34ee2dc 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -38,9 +38,9 @@ import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.format.environment.EnvironmentReader -import java.io.BufferedReader import java.io.File -import java.io.FileReader +import java.io.FileInputStream +import java.io.InputStream import java.util.Random import java.util.UUID @@ -50,8 +50,11 @@ import java.util.UUID * @param environmentFile The file describing the physical cluster. */ class Sc20ClusterEnvironmentReader( - private val environmentFile: File + private val input: InputStream ) : EnvironmentReader { + + constructor(file: File) : this(FileInputStream(file)) + @Suppress("BlockingMethodInNonBlockingContext") override suspend fun construct(dom: Domain): Environment { var clusterIdCol = 0 @@ -70,7 +73,7 @@ class Sc20ClusterEnvironmentReader( val nodes = mutableListOf() val random = Random(0) - BufferedReader(FileReader(environmentFile)).use { reader -> + input.bufferedReader().use { reader -> reader.lineSequence() .filter { line -> // Ignore comments in the file -- cgit v1.2.3