diff options
7 files changed, 416 insertions, 2 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/InterferenceGroupColumns.kt new file mode 100644 index 00000000..81758e5f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/InterferenceGroupColumns.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("InterferenceGroupColumns") +package org.opendc.trace + +/** + * Members of the interference group. + */ +@JvmField +public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members") + +/** + * Target load after which the interference occurs. + */ +@JvmField +public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target") + +/** + * Performance score when the interference occurs. + */ +@JvmField +public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt index bb9d93e2..0d9c2b74 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Tables.kt @@ -42,3 +42,8 @@ public const val TABLE_RESOURCES: String = "resources" * A table containing all resource states in a workload. */ public const val TABLE_RESOURCE_STATES: String = "resource_states" + +/** + * A table containing the groups of resources that interfere when run on the same execution platform. + */ +public const val TABLE_INTERFERENCE_GROUPS: String = "interference_groups" diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt new file mode 100644 index 00000000..72ad207e --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.opendc.trace.* +import shaded.parquet.com.fasterxml.jackson.core.JsonParseException +import shaded.parquet.com.fasterxml.jackson.core.JsonParser +import shaded.parquet.com.fasterxml.jackson.core.JsonToken + +/** + * A [TableReader] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + } + + return if (parser.nextToken() != JsonToken.END_ARRAY) { + parseGroup(parser) + true + } else { + reset() + false + } + } + + override fun resolve(column: TableColumn<*>): Int { + return when (column) { + INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS + INTERFERENCE_GROUP_TARGET -> COL_TARGET + INTERFERENCE_GROUP_SCORE -> COL_SCORE + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + return when (index) { + COL_MEMBERS, COL_TARGET, COL_SCORE -> false + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun get(index: Int): Any { + return when (index) { + COL_MEMBERS -> members + COL_TARGET -> targetLoad + COL_SCORE -> score + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDouble(index: Int): Double { + return when (index) { + COL_TARGET -> targetLoad + COL_SCORE -> score + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun close() { + parser.close() + } + + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private var members = emptySet<String>() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 + + /** + * Reset the state. + */ + private fun reset() { + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf<String>() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + this.members = members + this.targetLoad = targetLoad + this.score = score + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + members.add(parser.text) + } + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt new file mode 100644 index 00000000..f8ce0ef6 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.opendc.trace.* +import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator + +/** + * A [TableWriter] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter { + /** + * A flag to indicate whether a row has been started. + */ + private var isRowActive = false + + init { + generator.writeStartArray() + } + + override fun startRow() { + // Reset state + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + + // Mark row as active + isRowActive = true + } + + override fun endRow() { + check(isRowActive) { "No active row" } + + generator.writeStartObject() + generator.writeArrayFieldStart("vms") + for (member in members) { + generator.writeString(member) + } + generator.writeEndArray() + generator.writeNumberField("minServerLoad", targetLoad) + generator.writeNumberField("performanceScore", score) + generator.writeEndObject() + } + + override fun resolve(column: TableColumn<*>): Int { + return when (column) { + INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS + INTERFERENCE_GROUP_TARGET -> COL_TARGET + INTERFERENCE_GROUP_SCORE -> COL_SCORE + else -> -1 + } + } + + override fun set(index: Int, value: Any) { + check(isRowActive) { "No active row" } + + @Suppress("UNCHECKED_CAST") + when (index) { + COL_MEMBERS -> members = value as Set<String> + COL_TARGET -> targetLoad = (value as Number).toDouble() + COL_SCORE -> score = (value as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInt(index: Int, value: Int) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDouble(index: Int, value: Double) { + check(isRowActive) { "No active row" } + + when (index) { + COL_TARGET -> targetLoad = (value as Number).toDouble() + COL_SCORE -> score = (value as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun flush() { + generator.flush() + } + + override fun close() { + generator.writeEndArray() + generator.close() + } + + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private var members = emptySet<String>() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 886f3d54..0df72ede 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -34,14 +34,22 @@ import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding +import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files import java.nio.file.Path +import kotlin.io.path.exists /** * A [TraceFormat] implementation of the OpenDC virtual machine trace format. */ public class OdcVmTraceFormat : TraceFormat { /** + * A [JsonFactory] that is used to parse the JSON-based interference model. + */ + private val jsonFactory = JsonFactory() + + /** * The name of this trace format. */ override val name: String = "opendc-vm" @@ -58,7 +66,7 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) override fun getDetails(path: Path, table: String): TableDetails { return when (table) { @@ -82,6 +90,13 @@ public class OdcVmTraceFormat : TraceFormat { ), listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) ) + TABLE_INTERFERENCE_GROUPS -> TableDetails( + listOf( + INTERFERENCE_GROUP_MEMBERS, + INTERFERENCE_GROUP_TARGET, + INTERFERENCE_GROUP_SCORE, + ) + ) else -> throw IllegalArgumentException("Table $table not supported") } } @@ -96,6 +111,15 @@ public class OdcVmTraceFormat : TraceFormat { val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) OdcVmResourceStateTableReader(reader) } + TABLE_INTERFERENCE_GROUPS -> { + val modelPath = path.resolve("interference-model.json") + val parser = if (modelPath.exists()) + jsonFactory.createParser(modelPath.toFile()) + else + jsonFactory.createParser("[]") // If model does not exist, return empty model + + OdcVmInterferenceJsonTableReader(parser) + } else -> throw IllegalArgumentException("Table $table not supported") } } @@ -122,6 +146,10 @@ public class OdcVmTraceFormat : TraceFormat { .build() OdcVmResourceStateTableWriter(writer, schema) } + TABLE_INTERFERENCE_GROUPS -> { + val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) + OdcVmInterferenceJsonTableWriter(generator) + } else -> throw IllegalArgumentException("Table $table not supported") } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index bfe0f881..2b678ff5 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -41,7 +41,7 @@ internal class OdcVmTraceFormatTest { fun testTables() { val path = Paths.get("src/test/resources/trace-v2.1") - assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), format.getTables(path)) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS), format.getTables(path)) } @Test @@ -93,4 +93,33 @@ internal class OdcVmTraceFormatTest { reader.close() } + + @Test + fun testInterferenceGroups() { + val path = Paths.get("src/test/resources/trace-v2.1") + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, + { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, + { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertFalse(reader.nextRow()) } + ) + + reader.close() + } + + @Test + fun testInterferenceGroupsEmpty() { + val path = Paths.get("src/test/resources/trace-v2.0") + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + + assertFalse(reader.nextRow()) + reader.close() + } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json new file mode 100644 index 00000000..6a0616d9 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/test/resources/trace-v2.1/interference-model.json @@ -0,0 +1,20 @@ +[ + { + "vms": [ + "1019", + "1023", + "1052" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "1023", + "1052", + "1073" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] |
