summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-azure
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-09 10:31:41 +0200
committerGitHub <noreply@github.com>2022-06-09 10:31:41 +0200
commitd146814bbbb86bfcb19ccb94250424703e9179e5 (patch)
treebf20f51b434d56e60ad013568ac1a32b912a3b5e /opendc-trace/opendc-trace-azure
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
parent9d759c9bc987965fae8b0c16c000772c546bf3a2 (diff)
merge: Introduce schema for trace API (#88)
This pull request updates the OpenDC trace API to support proper specification of a schema of the tables exposed by the traces. This functionality makes it easier for the API consumer to understand the types exposed by the API. ## Implementation Notes :hammer_and_pick: * Introduce type system for trace API * Add benchmarks for odcvm trace format * Add benchmarks for Azure trace format * Add conformance suite for OpenDC trace API ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Removal of typed `TableColumn`. Instead, `TableColumn` instances are now used to describe the columns belonging to some table. * `TableReader` and `TableWriter` do not support accessing arbitrary objects anymore. Instead, only the types supported by the type system are exposed.
Diffstat (limited to 'opendc-trace/opendc-trace-azure')
-rw-r--r--opendc-trace/opendc-trace-azure/build.gradle.kts3
-rw-r--r--opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt74
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt92
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt108
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt21
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt44
6 files changed, 280 insertions, 62 deletions
diff --git a/opendc-trace/opendc-trace-azure/build.gradle.kts b/opendc-trace/opendc-trace-azure/build.gradle.kts
index d4fe045e..ee53c583 100644
--- a/opendc-trace/opendc-trace-azure/build.gradle.kts
+++ b/opendc-trace/opendc-trace-azure/build.gradle.kts
@@ -25,9 +25,12 @@ description = "Support for Azure VM traces in OpenDC"
/* Build configuration */
plugins {
`kotlin-library-conventions`
+ `benchmark-conventions`
}
dependencies {
api(projects.opendcTrace.opendcTraceApi)
implementation(libs.jackson.dataformat.csv)
+
+ testImplementation(projects.opendcTrace.opendcTraceTestkit)
}
diff --git a/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt
new file mode 100644
index 00000000..4fcdce30
--- /dev/null
+++ b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.azure
+
+import org.opendc.trace.conv.*
+import org.opendc.trace.spi.TraceFormat
+import org.openjdk.jmh.annotations.*
+import org.openjdk.jmh.infra.Blackhole
+import java.nio.file.Path
+import java.util.concurrent.TimeUnit
+
+/**
+ * Benchmarks for parsing traces in the Azure VM format.
+ */
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+class AzureTraceBenchmarks {
+ private lateinit var path: Path
+ private lateinit var format: TraceFormat
+
+ @Setup
+ fun setUp() {
+ path = Path.of("src/test/resources/trace")
+ format = AzureTraceFormat()
+ }
+
+ @Benchmark
+ fun benchmarkResourcesReader(bh: Blackhole) {
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
+ try {
+ val idColumn = reader.resolve(RESOURCE_ID)
+ while (reader.nextRow()) {
+ bh.consume(reader.getString(idColumn))
+ }
+ } finally {
+ reader.close()
+ }
+ }
+
+ @Benchmark
+ fun benchmarkResourceStatesReader(bh: Blackhole) {
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
+ try {
+ val idColumn = reader.resolve(RESOURCE_ID)
+ while (reader.nextRow()) {
+ bh.consume(reader.getString(idColumn))
+ }
+ } finally {
+ reader.close()
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index 3132b1d9..c0c67329 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -29,17 +29,28 @@ import org.opendc.trace.*
import org.opendc.trace.conv.RESOURCE_ID
import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableReader] for the Azure v1 VM resource state table.
*/
internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
init {
parser.schema = schema
}
override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+ }
+
reset()
if (!nextStart()) {
@@ -63,20 +74,22 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_CPU_USAGE_PCT = 2
- override fun isNull(index: Int): Boolean {
- require(index in 0..columns.size) { "Invalid column index" }
- return false
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- return when (index) {
- COL_ID -> id
- COL_TIMESTAMP -> timestamp
- COL_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column index")
- }
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_CPU_USAGE_PCT) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -91,18 +104,66 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
+ checkActive()
return when (index) {
COL_CPU_USAGE_PCT -> cpuUsagePct
else -> throw IllegalArgumentException("Invalid column")
}
}
+ override fun getString(index: Int): String? {
+ checkActive()
+ return when (index) {
+ COL_ID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ checkActive()
+ return when (index) {
+ COL_TIMESTAMP -> timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
parser.close()
}
/**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
* Advance the parser until the next object start.
*/
private fun nextStart(): Boolean {
@@ -131,15 +192,6 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
cpuUsagePct = Double.NaN
}
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_CPU_USAGE_PCT = 2
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT
- )
-
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index 154a37e4..a8451301 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -27,17 +27,28 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableReader] for the Azure v1 VM resources table.
*/
internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader {
+ /**
+ * A flag to indicate whether a single row has been read already.
+ */
+ private var isStarted = false
+
init {
parser.schema = schema
}
override fun nextRow(): Boolean {
+ if (!isStarted) {
+ isStarted = true
+ }
+
reset()
if (!nextStart()) {
@@ -63,22 +74,26 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
- override fun isNull(index: Int): Boolean {
- require(index in 0..columns.size) { "Invalid column index" }
- return false
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- return when (index) {
- COL_ID -> id
- COL_START_TIME -> startTime
- COL_STOP_TIME -> stopTime
- COL_CPU_COUNT -> getInt(index)
- COL_MEM_CAPACITY -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -86,6 +101,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
}
override fun getInt(index: Int): Int {
+ checkActive()
return when (index) {
COL_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
@@ -93,21 +109,74 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
}
override fun getLong(index: Int): Long {
+ checkActive()
+ return when (index) {
+ COL_CPU_COUNT -> cpuCores.toLong()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getFloat(index: Int): Float {
throw IllegalArgumentException("Invalid column")
}
override fun getDouble(index: Int): Double {
+ checkActive()
return when (index) {
COL_MEM_CAPACITY -> memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
+ override fun getString(index: Int): String? {
+ checkActive()
+ return when (index) {
+ COL_ID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ checkActive()
+ return when (index) {
+ COL_START_TIME -> startTime
+ COL_STOP_TIME -> stopTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
parser.close()
}
/**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
* Advance the parser until the next object start.
*/
private fun nextStart(): Boolean {
@@ -140,19 +209,6 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
memCapacity = Double.NaN
}
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_MEM_CAPACITY = 4
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY
- )
-
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 73978990..2294e4a4 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -62,26 +62,25 @@ public class AzureTraceFormat : TraceFormat {
return when (table) {
TABLE_RESOURCES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_CPU_COUNT,
- RESOURCE_MEM_CAPACITY
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
)
)
TABLE_RESOURCE_STATES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_CPU_USAGE_PCT
- ),
- listOf(RESOURCE_STATE_TIMESTAMP)
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double),
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index 263d26ce..06ba047a 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -22,15 +22,19 @@
package org.opendc.trace.azure
+import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.*
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
+import org.junit.jupiter.api.Assertions.assertAll
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
import org.opendc.trace.conv.*
+import org.opendc.trace.testkit.TableReaderTestKit
import java.nio.file.Paths
/**
* Test suite for the [AzureTraceFormat] class.
*/
+@DisplayName("Azure VM TraceFormat")
class AzureTraceFormatTest {
private val format = AzureTraceFormat()
@@ -60,7 +64,7 @@ class AzureTraceFormatTest {
val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
+ { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(RESOURCE_ID)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
)
@@ -75,11 +79,41 @@ class AzureTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_ID)) },
- { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(RESOURCE_ID)) },
+ { assertEquals(0, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
{ assertEquals(0.0286979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) }
)
reader.close()
}
+
+ @DisplayName("TableReader for Resources")
+ @Nested
+ inner class ResourcesTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get("src/test/resources/trace")
+
+ columns = format.getDetails(path, TABLE_RESOURCES).columns
+ reader = format.newReader(path, TABLE_RESOURCES, null)
+ }
+ }
+
+ @DisplayName("TableReader for Resource States")
+ @Nested
+ inner class ResourceStatesTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get("src/test/resources/trace")
+
+ columns = format.getDetails(path, TABLE_RESOURCE_STATES).columns
+ reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
+ }
+ }
}