diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-04-14 15:26:22 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-04-22 20:04:21 +0200 |
| commit | 938f60832d6a500fee74b5f44838287c5432a74e (patch) | |
| tree | 27ccf67f33030dfb8a91cee9731fc1dca229402f | |
| parent | 0f1be7a820d5e3b279e68209a5bb6219d176b732 (diff) | |
feat(trace/opendc): Incorporate interference model in trace format
This change updates the OpenDC VM trace format to incorporate the VM
interference model in the trace format itself. This makes sense since
the model is tightly coupled to the actual trace that is being
simulated.
This approach has as benefit that we can directly load the
interference model from the workload trace, without having to resolve
the model seperately (as we did before).
7 files changed, 416 insertions, 2 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/InterferenceGroupColumns.kt new file mode 100644 index 00000000..81758e5f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/InterferenceGroupColumns.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("InterferenceGroupColumns") +package org.opendc.trace + +/** + * Members of the interference group. + */ +@JvmField +public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members") + +/** + * Target load after which the interference occurs. + */ +@JvmField +public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target") + +/** + * Performance score when the interference occurs. + */ +@JvmField +public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt index bb9d93e2..0d9c2b74 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt @@ -42,3 +42,8 @@ public const val TABLE_RESOURCES: String = "resources" * A table containing all resource states in a workload. */ public const val TABLE_RESOURCE_STATES: String = "resource_states" + +/** + * A table containing the groups of resources that interfere when run on the same execution platform. + */ +public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups" diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt new file mode 100644 index 00000000..72ad207e --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.opendc.trace.* +import shaded.parquet.com.fasterxml.jackson.core.JsonParseException +import shaded.parquet.com.fasterxml.jackson.core.JsonParser +import shaded.parquet.com.fasterxml.jackson.core.JsonToken + +/** + * A [TableReader] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + } + + return if (parser.nextToken() != JsonToken.END_ARRAY) { + parseGroup(parser) + true + } else { + reset() + false + } + } + + override fun resolve(column: TableColumn<*>): Int { + return when (column) { + INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS + INTERFERENCE_GROUP_TARGET -> COL_TARGET + INTERFERENCE_GROUP_SCORE -> COL_SCORE + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + return when (index) { + COL_MEMBERS, COL_TARGET, COL_SCORE -> false + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun get(index: Int): Any { + return when (index) { + COL_MEMBERS -> members + COL_TARGET -> targetLoad + COL_SCORE -> score + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDouble(index: Int): Double { + return when (index) { + COL_TARGET -> targetLoad + COL_SCORE -> score + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun close() { + parser.close() + } + + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private var members = emptySet<String>() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 + + /** + * Reset the state. + */ + private fun reset() { + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf<String>() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + this.members = members + this.targetLoad = targetLoad + this.score = score + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + members.add(parser.text) + } + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt new file mode 100644 index 00000000..f8ce0ef6 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.opendc.trace.* +import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator + +/** + * A [TableWriter] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter { + /** + * A flag to indicate whether a row has been started. + */ + private var isRowActive = false + + init { + generator.writeStartArray() + } + + override fun startRow() { + // Reset state + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + + // Mark row as active + isRowActive = true + } + + override fun endRow() { + check(isRowActive) { "No active row" } + + generator.writeStartObject() + generator.writeArrayFieldStart("vms") + for (member in members) { + generator.writeString(member) + } + generator.writeEndArray() + generator.writeNumberField("minServerLoad", targetLoad) + generator.writeNumberField("performanceScore", score) + generator.writeEndObject() + } + + override fun resolve(column: TableColumn<*>): Int { + return when (column) { + INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS + INTERFERENCE_GROUP_TARGET -> COL_TARGET + INTERFERENCE_GROUP_SCORE -> COL_SCORE + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + check(isRowActive) { "No active row" } + + @Suppress("UNCHECKED_CAST") + when (index) { + COL_MEMBERS -> members = value as Set<String> + COL_TARGET -> targetLoad = (value as Number).toDouble() + COL_SCORE -> score = (value as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInt(index: Int, value: Int) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDouble(index: Int, value: Double) { + check(isRowActive) { "No active row" } + + when (index) { + COL_TARGET -> targetLoad = (value as Number).toDouble() + COL_SCORE -> score = (value as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun flush() { + generator.flush() + } + + override fun close() { + generator.writeEndArray() + generator.close() + } + + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private var members = emptySet<String>() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 886f3d54..0df72ede 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -34,14 +34,22 @@ import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding +import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files import java.nio.file.Path +import kotlin.io.path.exists /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. */ public class OdcVmTraceFormat : TraceFormat { /** + * A [JsonFactory] that is used to parse the JSON-based interference model. + */ + private val jsonFactory = JsonFactory() + + /** * The name of this trace format. */ override val name: String = "opendc-vm" @@ -58,7 +66,7 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) override fun getDetails(path: Path, table: String): TableDetails { return when (table) { @@ -82,6 +90,13 @@ public class OdcVmTraceFormat : TraceFormat { ), listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) ) + TABLE_INTERFERENCE_GROUPS -> TableDetails( + listOf( + INTERFERENCE_GROUP_MEMBERS, + INTERFERENCE_GROUP_TARGET, + INTERFERENCE_GROUP_SCORE, + ) + ) else -> throw IllegalArgumentException("Table $table not supported") } } @@ -96,6 +111,15 @@ public class OdcVmTraceFormat : TraceFormat { val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) OdcVmResourceStateTableReader(reader) } + TABLE_INTERFERENCE_GROUPS -> { + val modelPath = path.resolve("interference-model.json") + val parser = if (modelPath.exists()) + jsonFactory.createParser(modelPath.toFile()) + else + jsonFactory.createParser("[]") // If model does not exist, return empty model + + OdcVmInterferenceJsonTableReader(parser) + } else -> throw IllegalArgumentException("Table $table not supported") } } @@ -122,6 +146,10 @@ public class OdcVmTraceFormat : TraceFormat { .build() OdcVmResourceStateTableWriter(writer, schema) } + TABLE_INTERFERENCE_GROUPS -> { + val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) + OdcVmInterferenceJsonTableWriter(generator) + } else -> throw IllegalArgumentException("Table $table not supported") } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index bfe0f881..2b678ff5 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -41,7 +41,7 @@ internal class OdcVmTraceFormatTest { fun testTables() { val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS), format.getTables(path)) } @Test @@ -93,4 +93,33 @@ internal class OdcVmTraceFormatTest { reader.close() } + + @Test + fun testInterferenceGroups() { + val path = Paths.get("src/test/resources/trace-v2.1") + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, + { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, + { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertFalse(reader.nextRow()) } + ) + + reader.close() + } + + @Test + fun testInterferenceGroupsEmpty() { + val path = Paths.get("src/test/resources/trace-v2.0") + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + + assertFalse(reader.nextRow()) + reader.close() + } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json new file mode 100644 index 00000000..6a0616d9 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json @@ -0,0 +1,20 @@ +[ + { + "vms": [ + "1019", + "1023", + "1052" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "1023", + "1052", + "1073" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] |
