summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-09 10:31:41 +0200
committerGitHub <noreply@github.com>2022-06-09 10:31:41 +0200
commitd146814bbbb86bfcb19ccb94250424703e9179e5 (patch)
treebf20f51b434d56e60ad013568ac1a32b912a3b5e /opendc-trace/opendc-trace-opendc/src
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
parent9d759c9bc987965fae8b0c16c000772c546bf3a2 (diff)
merge: Introduce schema for trace API (#88)
This pull request updates the OpenDC trace API to support proper specification of a schema of the tables exposed by the traces. This functionality makes it easier for the API consumer to understand the types exposed by the API. ## Implementation Notes :hammer_and_pick: * Introduce type system for trace API * Add benchmarks for odcvm trace format * Add benchmarks for Azure trace format * Add conformance suite for OpenDC trace API ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Removal of typed `TableColumn`. Instead, `TableColumn` instances are now used to describe the columns belonging to some table. * `TableReader` and `TableWriter` do not support accessing arbitrary objects anymore. Instead, only the types supported by the type system are exposed.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt87
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt78
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt55
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt96
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt75
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt97
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt73
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt33
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt181
11 files changed, 607 insertions, 176 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
new file mode 100644
index 00000000..b9b22931
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc
+
+import org.opendc.trace.conv.*
+import org.opendc.trace.spi.TraceFormat
+import org.openjdk.jmh.annotations.*
+import org.openjdk.jmh.infra.Blackhole
+import java.nio.file.Path
+import java.util.concurrent.TimeUnit
+
+/**
+ * Benchmarks for parsing traces in the OpenDC vm format.
+ */
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+class OdcVmTraceBenchmarks {
+ private lateinit var path: Path
+ private lateinit var format: TraceFormat
+
+ @Setup
+ fun setUp() {
+ path = Path.of("../../opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small")
+ format = OdcVmTraceFormat()
+ }
+
+ @Benchmark
+ fun benchmarkResourcesReader(bh: Blackhole) {
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
+ try {
+ val idColumn = reader.resolve(RESOURCE_ID)
+ while (reader.nextRow()) {
+ bh.consume(reader.getString(idColumn))
+ }
+ } finally {
+ reader.close()
+ }
+ }
+
+ @Benchmark
+ fun benchmarkResourceStatesReader(bh: Blackhole) {
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
+ try {
+ val idColumn = reader.resolve(RESOURCE_ID)
+ while (reader.nextRow()) {
+ bh.consume(reader.getString(idColumn))
+ }
+ } finally {
+ reader.close()
+ }
+ }
+
+ @Benchmark
+ fun benchmarkInterferenceGroupReader(bh: Blackhole) {
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, null)
+ try {
+ val scoreColumn = reader.resolve(INTERFERENCE_GROUP_SCORE)
+ while (reader.nextRow()) {
+ bh.consume(reader.getDouble(scoreColumn))
+ }
+ } finally {
+ reader.close()
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
index eb91e305..1841c486 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
@@ -26,9 +26,13 @@ import org.opendc.trace.*
import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import org.opendc.trace.util.convertTo
import shaded.parquet.com.fasterxml.jackson.core.JsonParseException
import shaded.parquet.com.fasterxml.jackson.core.JsonParser
import shaded.parquet.com.fasterxml.jackson.core.JsonToken
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the OpenDC VM interference JSON format.
@@ -50,17 +54,24 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- return if (parser.nextToken() != JsonToken.END_ARRAY) {
- parseGroup(parser)
- true
- } else {
+ return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) {
+ parser.close()
reset()
false
+ } else {
+ parseGroup(parser)
+ true
}
}
- override fun resolve(column: TableColumn<*>): Int {
- return when (column) {
+ private val COL_MEMBERS = 0
+ private val COL_TARGET = 1
+ private val COL_SCORE = 2
+
+ private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String)
+
+ override fun resolve(name: String): Int {
+ return when (name) {
INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
INTERFERENCE_GROUP_TARGET -> COL_TARGET
INTERFERENCE_GROUP_SCORE -> COL_SCORE
@@ -75,48 +86,79 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- override fun get(index: Int): Any {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getDouble(index: Int): Double {
+ checkActive()
return when (index) {
- COL_MEMBERS -> members
COL_TARGET -> targetLoad
COL_SCORE -> score
else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getString(index: Int): String? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getInt(index: Int): Int {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getLong(index: Int): Long {
+ override fun getInstant(index: Int): Instant? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getDouble(index: Int): Double {
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ checkActive()
return when (index) {
- COL_TARGET -> targetLoad
- COL_SCORE -> score
+ COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType)
else -> throw IllegalArgumentException("Invalid column $index")
}
}
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
override fun close() {
parser.close()
}
- private val COL_MEMBERS = 0
- private val COL_TARGET = 1
- private val COL_SCORE = 2
-
private var members = emptySet<String>()
private var targetLoad = Double.POSITIVE_INFINITY
private var score = 1.0
/**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
* Reset the state.
*/
private fun reset() {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
index 64bc4356..d726e890 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
@@ -27,6 +27,9 @@ import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC VM interference JSON format.
@@ -65,8 +68,8 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
generator.writeEndObject()
}
- override fun resolve(column: TableColumn<*>): Int {
- return when (column) {
+ override fun resolve(name: String): Int {
+ return when (name) {
INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
INTERFERENCE_GROUP_TARGET -> COL_TARGET
INTERFERENCE_GROUP_SCORE -> COL_SCORE
@@ -74,40 +77,66 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
}
}
- override fun set(index: Int, value: Any) {
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setInt(index: Int, value: Int) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setDouble(index: Int, value: Double) {
check(isRowActive) { "No active row" }
- @Suppress("UNCHECKED_CAST")
when (index) {
- COL_MEMBERS -> members = value as Set<String>
COL_TARGET -> targetLoad = (value as Number).toDouble()
COL_SCORE -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column index $index")
+ else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setString(index: Int, value: String) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setInt(index: Int, value: Int) {
+ override fun setUUID(index: Int, value: UUID) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setLong(index: Int, value: Long) {
+ override fun setInstant(index: Int, value: Instant) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setDouble(index: Int, value: Double) {
+ override fun setDuration(index: Int, value: Duration) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
check(isRowActive) { "No active row" }
+ @Suppress("UNCHECKED_CAST")
when (index) {
- COL_TARGET -> targetLoad = (value as Number).toDouble()
- COL_SCORE -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column $index")
+ COL_MEMBERS -> members = value as Set<String>
+ else -> throw IllegalArgumentException("Invalid column index $index")
}
}
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
override fun flush() {
generator.flush()
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index 7a01b881..b256047f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -26,6 +26,9 @@ import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
@@ -48,24 +51,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_DURATION -> COL_DURATION
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- COL_ID -> record.id
- COL_TIMESTAMP -> record.timestamp
- COL_DURATION -> record.duration
- COL_CPU_COUNT -> record.cpuCount
- COL_CPU_USAGE -> record.cpuUsage
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_CPU_USAGE) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -84,6 +89,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
@@ -92,23 +101,52 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_ID -> record.id
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_TIMESTAMP -> record.timestamp
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_DURATION -> record.duration
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun close() {
reader.close()
}
override fun toString(): String = "OdcVmResourceStateTableReader"
-
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_DURATION = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_USAGE = 4
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_DURATION to COL_DURATION,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 97af5b59..30375de0 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -28,6 +28,7 @@ import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
@@ -64,17 +65,14 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
lastTimestamp = _timestamp
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
-
- override fun set(index: Int, value: Any) {
- check(_isActive) { "No active row" }
-
- when (index) {
- COL_ID -> _id = value as String
- COL_TIMESTAMP -> _timestamp = value as Instant
- COL_DURATION -> _duration = value as Duration
- COL_CPU_COUNT -> _cpuCount = value as Int
- COL_CPU_USAGE -> _cpuUsage = value as Double
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_DURATION -> COL_DURATION
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ else -> -1
}
}
@@ -94,6 +92,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun setDouble(index: Int, value: Double) {
check(_isActive) { "No active row" }
when (index) {
@@ -102,6 +104,49 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
}
}
+ override fun setString(index: Int, value: String) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_ID -> _id = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setUUID(index: Int, value: UUID) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(index: Int, value: Instant) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_TIMESTAMP -> _timestamp = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setDuration(index: Int, value: Duration) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_DURATION -> _duration = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun flush() {
// Not available
}
@@ -121,12 +166,4 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
private val COL_DURATION = 2
private val COL_CPU_COUNT = 3
private val COL_CPU_USAGE = 4
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_DURATION to COL_DURATION,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 6102332f..76fdbca8 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -26,6 +26,9 @@ import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
@@ -48,25 +51,28 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_CAPACITY = 4
+ private val COL_MEM_CAPACITY = 5
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- COL_ID -> record.id
- COL_START_TIME -> record.startTime
- COL_STOP_TIME -> record.stopTime
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_CAPACITY -> getDouble(index)
- COL_MEM_CAPACITY -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -86,6 +92,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
@@ -96,25 +106,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_ID -> record.id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_START_TIME -> record.startTime
+ COL_STOP_TIME -> record.stopTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
reader.close()
}
override fun toString(): String = "OdcVmResourceTableReader"
-
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_CAPACITY = 4
- private val COL_MEM_CAPACITY = 5
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index cae65faa..8117c3cd 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -26,7 +26,9 @@ import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.Resource
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
@@ -59,18 +61,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
-
- override fun set(index: Int, value: Any) {
- check(_isActive) { "No active row" }
- when (index) {
- COL_ID -> _id = value as String
- COL_START_TIME -> _startTime = value as Instant
- COL_STOP_TIME -> _stopTime = value as Instant
- COL_CPU_COUNT -> _cpuCount = value as Int
- COL_CPU_CAPACITY -> _cpuCapacity = value as Double
- COL_MEM_CAPACITY -> _memCapacity = value as Double
- else -> throw IllegalArgumentException("Invalid column index $index")
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
}
}
@@ -90,6 +89,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun setDouble(index: Int, value: Double) {
check(_isActive) { "No active row" }
when (index) {
@@ -99,6 +102,43 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
}
}
+ override fun setString(index: Int, value: String) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_ID -> _id = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setUUID(index: Int, value: UUID) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(index: Int, value: Instant) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_START_TIME -> _startTime = value
+ COL_STOP_TIME -> _stopTime = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setDuration(index: Int, value: Duration) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun flush() {
// Not available
}
@@ -113,13 +153,4 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
private val COL_CPU_COUNT = 3
private val COL_CPU_CAPACITY = 4
private val COL_MEM_CAPACITY = 5
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index d45910c6..a9c5b934 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -73,36 +73,35 @@ public class OdcVmTraceFormat : TraceFormat {
return when (table) {
TABLE_RESOURCES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_CPU_COUNT,
- RESOURCE_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY,
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
+ TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
)
)
TABLE_RESOURCE_STATES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_DURATION,
- RESOURCE_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE,
- ),
- listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double),
+ )
)
TABLE_INTERFERENCE_GROUPS -> TableDetails(
listOf(
- INTERFERENCE_GROUP_MEMBERS,
- INTERFERENCE_GROUP_TARGET,
- INTERFERENCE_GROUP_SCORE,
+ TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
+ TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
)
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
index 0d70446d..8a8ed790 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -33,11 +33,11 @@ import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [Resource] objects.
*/
-internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() {
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf<String, TableColumn<*>>(
+ private val fieldMap = mapOf(
"id" to RESOURCE_ID,
"submissionTime" to RESOURCE_START_TIME,
"start_time" to RESOURCE_START_TIME,
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
index 97aa00b2..78adc649 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -33,11 +33,11 @@ import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [ResourceState] objects.
*/
-internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() {
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf<String, TableColumn<*>>(
+ private val fieldMap = mapOf(
"id" to RESOURCE_ID,
"time" to RESOURCE_STATE_TIMESTAMP,
"timestamp" to RESOURCE_STATE_TIMESTAMP,
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index 1f4f6195..9fdffb2b 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -23,12 +23,20 @@
package org.opendc.trace.opendc
import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.DisplayName
+import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
import org.opendc.trace.conv.*
+import org.opendc.trace.testkit.TableReaderTestKit
+import org.opendc.trace.testkit.TableWriterTestKit
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Instant
@@ -36,6 +44,7 @@ import java.time.Instant
/**
* Test suite for the [OdcVmTraceFormat] implementation.
*/
+@DisplayName("OdcVmTraceFormat")
internal class OdcVmTraceFormatTest {
private val format = OdcVmTraceFormat()
@@ -67,14 +76,14 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1023", reader.get(RESOURCE_ID)) },
+ { assertEquals("1023", reader.getString(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1052", reader.get(RESOURCE_ID)) },
+ { assertEquals("1052", reader.getString(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1073", reader.get(RESOURCE_ID)) },
+ { assertEquals("1073", reader.getString(RESOURCE_ID)) },
{ assertFalse(reader.nextRow()) }
)
@@ -87,9 +96,9 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCES)
writer.startRow()
- writer.set(RESOURCE_ID, "1019")
- writer.set(RESOURCE_START_TIME, Instant.EPOCH)
- writer.set(RESOURCE_STOP_TIME, Instant.EPOCH)
+ writer.setString(RESOURCE_ID, "1019")
+ writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH)
+ writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH)
writer.setInt(RESOURCE_CPU_COUNT, 1)
writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
@@ -100,9 +109,9 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
{ assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
@@ -124,8 +133,8 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
{ assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
)
@@ -138,8 +147,8 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
writer.startRow()
- writer.set(RESOURCE_ID, "1019")
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
+ writer.setString(RESOURCE_ID, "1019")
+ writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
writer.setInt(RESOURCE_CPU_COUNT, 1)
writer.endRow()
@@ -149,8 +158,8 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
{ assertFalse(reader.nextRow()) },
@@ -170,13 +179,13 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
- { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
- { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertEquals(setOf("1019", "1023", "1052"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.8830158730158756, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
- { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
- { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
{ assertFalse(reader.nextRow()) }
)
@@ -191,4 +200,130 @@ internal class OdcVmTraceFormatTest {
assertFalse(reader.nextRow())
reader.close()
}
+
+ @Test
+ fun testInterferenceGroupsWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_INTERFERENCE_GROUPS)
+
+ writer.startRow()
+ writer.setSet(INTERFERENCE_GROUP_MEMBERS, setOf("a", "b", "c"))
+ writer.setDouble(INTERFERENCE_GROUP_TARGET, 0.5)
+ writer.setDouble(INTERFERENCE_GROUP_SCORE, 0.8)
+ writer.endRow()
+ writer.flush()
+
+ writer.startRow()
+ writer.setSet(INTERFERENCE_GROUP_MEMBERS, setOf("a", "b", "d"))
+ writer.setDouble(INTERFERENCE_GROUP_TARGET, 0.5)
+ writer.setDouble(INTERFERENCE_GROUP_SCORE, 0.9)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, null)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(setOf("a", "b", "c"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.8, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
+ { assertTrue(reader.nextRow()) },
+ { assertEquals(setOf("a", "b", "d"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.9, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
+ @DisplayName("TableReader for Resources")
+ @Nested
+ inner class ResourcesTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get("src/test/resources/trace-v2.1")
+
+ columns = format.getDetails(path, TABLE_RESOURCES).columns
+ reader = format.newReader(path, TABLE_RESOURCES, null)
+ }
+ }
+
+ @DisplayName("TableWriter for Resources")
+ @Nested
+ inner class ResourcesTableWriterTest : TableWriterTestKit() {
+ override lateinit var writer: TableWriter
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Files.createTempDirectory("opendc")
+
+ columns = format.getDetails(Paths.get("src/test/resources/trace-v2.1"), TABLE_RESOURCES).columns
+ writer = format.newWriter(path, TABLE_RESOURCES)
+ }
+ }
+
+ @DisplayName("TableReader for Resource States")
+ @Nested
+ inner class ResourceStatesTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get("src/test/resources/trace-v2.1")
+
+ columns = format.getDetails(path, TABLE_RESOURCE_STATES).columns
+ reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
+ }
+ }
+
+ @DisplayName("TableWriter for Resource States")
+ @Nested
+ inner class ResourceStatesTableWriterTest : TableWriterTestKit() {
+ override lateinit var writer: TableWriter
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Files.createTempDirectory("opendc")
+
+ columns = format.getDetails(Paths.get("src/test/resources/trace-v2.1"), TABLE_RESOURCE_STATES).columns
+ writer = format.newWriter(path, TABLE_RESOURCE_STATES)
+ }
+ }
+
+ @DisplayName("TableReader for Interference Groups")
+ @Nested
+ inner class InterferenceGroupsTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get("src/test/resources/trace-v2.1")
+
+ columns = format.getDetails(path, TABLE_INTERFERENCE_GROUPS).columns
+ reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, null)
+ }
+ }
+
+ @DisplayName("TableWriter for Interference Groups")
+ @Nested
+ inner class InterferenceGroupsTableWriterTest : TableWriterTestKit() {
+ override lateinit var writer: TableWriter
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Files.createTempDirectory("opendc")
+
+ columns = format.getDetails(Paths.get("src/test/resources/trace-v2.1"), TABLE_INTERFERENCE_GROUPS).columns
+ writer = format.newWriter(path, TABLE_INTERFERENCE_GROUPS)
+ }
+ }
}