summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-06 16:21:21 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-07 15:46:53 +0200
commit2358257c1080b7ce78270535f82f0b960d48261a (patch)
treebced69c02698e85f995aa9935ddcfb54df23a64f /opendc-trace/opendc-trace-opendc/src
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
refactor(trace/api): Introduce type system for trace API
This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt60
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt55
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt96
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt73
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt97
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt73
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt33
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt46
10 files changed, 369 insertions, 172 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
index eb91e305..920daeea 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
@@ -26,9 +26,13 @@ import org.opendc.trace.*
import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import org.opendc.trace.util.convertTo
import shaded.parquet.com.fasterxml.jackson.core.JsonParseException
import shaded.parquet.com.fasterxml.jackson.core.JsonParser
import shaded.parquet.com.fasterxml.jackson.core.JsonToken
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the OpenDC VM interference JSON format.
@@ -59,8 +63,14 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- override fun resolve(column: TableColumn<*>): Int {
- return when (column) {
+ private val COL_MEMBERS = 0
+ private val COL_TARGET = 1
+ private val COL_SCORE = 2
+
+ private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String)
+
+ override fun resolve(name: String): Int {
+ return when (name) {
INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
INTERFERENCE_GROUP_TARGET -> COL_TARGET
INTERFERENCE_GROUP_SCORE -> COL_SCORE
@@ -75,43 +85,65 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- override fun get(index: Int): Any {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getDouble(index: Int): Double {
return when (index) {
- COL_MEMBERS -> members
COL_TARGET -> targetLoad
COL_SCORE -> score
else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getString(index: Int): String? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getInt(index: Int): Int {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getLong(index: Int): Long {
+ override fun getInstant(index: Int): Instant? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getDouble(index: Int): Double {
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
return when (index) {
- COL_TARGET -> targetLoad
- COL_SCORE -> score
+ COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType)
else -> throw IllegalArgumentException("Invalid column $index")
}
}
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
override fun close() {
parser.close()
}
- private val COL_MEMBERS = 0
- private val COL_TARGET = 1
- private val COL_SCORE = 2
-
private var members = emptySet<String>()
private var targetLoad = Double.POSITIVE_INFINITY
private var score = 1.0
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
index 64bc4356..d726e890 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
@@ -27,6 +27,9 @@ import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC VM interference JSON format.
@@ -65,8 +68,8 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
generator.writeEndObject()
}
- override fun resolve(column: TableColumn<*>): Int {
- return when (column) {
+ override fun resolve(name: String): Int {
+ return when (name) {
INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
INTERFERENCE_GROUP_TARGET -> COL_TARGET
INTERFERENCE_GROUP_SCORE -> COL_SCORE
@@ -74,40 +77,66 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
}
}
- override fun set(index: Int, value: Any) {
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setInt(index: Int, value: Int) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setDouble(index: Int, value: Double) {
check(isRowActive) { "No active row" }
- @Suppress("UNCHECKED_CAST")
when (index) {
- COL_MEMBERS -> members = value as Set<String>
COL_TARGET -> targetLoad = (value as Number).toDouble()
COL_SCORE -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column index $index")
+ else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setString(index: Int, value: String) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setInt(index: Int, value: Int) {
+ override fun setUUID(index: Int, value: UUID) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setLong(index: Int, value: Long) {
+ override fun setInstant(index: Int, value: Instant) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setDouble(index: Int, value: Double) {
+ override fun setDuration(index: Int, value: Duration) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
check(isRowActive) { "No active row" }
+ @Suppress("UNCHECKED_CAST")
when (index) {
- COL_TARGET -> targetLoad = (value as Number).toDouble()
- COL_SCORE -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column $index")
+ COL_MEMBERS -> members = value as Set<String>
+ else -> throw IllegalArgumentException("Invalid column index $index")
}
}
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
override fun flush() {
generator.flush()
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index 7a01b881..599f46f1 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -26,6 +26,9 @@ import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
@@ -48,24 +51,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_DURATION -> COL_DURATION
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- COL_ID -> record.id
- COL_TIMESTAMP -> record.timestamp
- COL_DURATION -> record.duration
- COL_CPU_COUNT -> record.cpuCount
- COL_CPU_USAGE -> record.cpuUsage
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..COL_CPU_USAGE) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -84,6 +89,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
@@ -92,23 +101,52 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_ID -> record.id
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_TIMESTAMP -> record.timestamp
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_DURATION -> record.duration
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun close() {
reader.close()
}
override fun toString(): String = "OdcVmResourceStateTableReader"
-
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_DURATION = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_USAGE = 4
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_DURATION to COL_DURATION,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 97af5b59..f5e8b863 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -28,6 +28,7 @@ import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
@@ -64,17 +65,14 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
lastTimestamp = _timestamp
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
-
- override fun set(index: Int, value: Any) {
- check(_isActive) { "No active row" }
-
- when (index) {
- COL_ID -> _id = value as String
- COL_TIMESTAMP -> _timestamp = value as Instant
- COL_DURATION -> _duration = value as Duration
- COL_CPU_COUNT -> _cpuCount = value as Int
- COL_CPU_USAGE -> _cpuUsage = value as Double
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_DURATION -> COL_DURATION
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ else -> -1
}
}
@@ -94,6 +92,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun setDouble(index: Int, value: Double) {
check(_isActive) { "No active row" }
when (index) {
@@ -102,6 +104,47 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
}
}
+ override fun setString(index: Int, value: String) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_ID -> _id = value
+ }
+ }
+
+ override fun setUUID(index: Int, value: UUID) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(index: Int, value: Instant) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_TIMESTAMP -> _timestamp = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setDuration(index: Int, value: Duration) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_DURATION -> _duration = value
+ }
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun flush() {
// Not available
}
@@ -121,12 +164,4 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
private val COL_DURATION = 2
private val COL_CPU_COUNT = 3
private val COL_CPU_USAGE = 4
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_DURATION to COL_DURATION,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 6102332f..88f9b781 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -26,6 +26,9 @@ import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
@@ -48,25 +51,28 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_CAPACITY = 4
+ private val COL_MEM_CAPACITY = 5
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- COL_ID -> record.id
- COL_START_TIME -> record.startTime
- COL_STOP_TIME -> record.stopTime
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_CAPACITY -> getDouble(index)
- COL_MEM_CAPACITY -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -86,6 +92,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
@@ -96,25 +106,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_ID -> record.id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_START_TIME -> record.startTime
+ COL_STOP_TIME -> record.stopTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
reader.close()
}
override fun toString(): String = "OdcVmResourceTableReader"
-
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_CAPACITY = 4
- private val COL_MEM_CAPACITY = 5
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index cae65faa..8117c3cd 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -26,7 +26,9 @@ import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.Resource
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
@@ -59,18 +61,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
-
- override fun set(index: Int, value: Any) {
- check(_isActive) { "No active row" }
- when (index) {
- COL_ID -> _id = value as String
- COL_START_TIME -> _startTime = value as Instant
- COL_STOP_TIME -> _stopTime = value as Instant
- COL_CPU_COUNT -> _cpuCount = value as Int
- COL_CPU_CAPACITY -> _cpuCapacity = value as Double
- COL_MEM_CAPACITY -> _memCapacity = value as Double
- else -> throw IllegalArgumentException("Invalid column index $index")
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
}
}
@@ -90,6 +89,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun setDouble(index: Int, value: Double) {
check(_isActive) { "No active row" }
when (index) {
@@ -99,6 +102,43 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
}
}
+ override fun setString(index: Int, value: String) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_ID -> _id = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setUUID(index: Int, value: UUID) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(index: Int, value: Instant) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_START_TIME -> _startTime = value
+ COL_STOP_TIME -> _stopTime = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setDuration(index: Int, value: Duration) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun flush() {
// Not available
}
@@ -113,13 +153,4 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
private val COL_CPU_COUNT = 3
private val COL_CPU_CAPACITY = 4
private val COL_MEM_CAPACITY = 5
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index d45910c6..a9c5b934 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -73,36 +73,35 @@ public class OdcVmTraceFormat : TraceFormat {
return when (table) {
TABLE_RESOURCES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_CPU_COUNT,
- RESOURCE_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY,
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
+ TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
)
)
TABLE_RESOURCE_STATES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_DURATION,
- RESOURCE_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE,
- ),
- listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double),
+ )
)
TABLE_INTERFERENCE_GROUPS -> TableDetails(
listOf(
- INTERFERENCE_GROUP_MEMBERS,
- INTERFERENCE_GROUP_TARGET,
- INTERFERENCE_GROUP_SCORE,
+ TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
+ TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
)
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
index 0d70446d..8a8ed790 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -33,11 +33,11 @@ import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [Resource] objects.
*/
-internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() {
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf<String, TableColumn<*>>(
+ private val fieldMap = mapOf(
"id" to RESOURCE_ID,
"submissionTime" to RESOURCE_START_TIME,
"start_time" to RESOURCE_START_TIME,
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
index 97aa00b2..78adc649 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -33,11 +33,11 @@ import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [ResourceState] objects.
*/
-internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() {
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf<String, TableColumn<*>>(
+ private val fieldMap = mapOf(
"id" to RESOURCE_ID,
"time" to RESOURCE_STATE_TIMESTAMP,
"timestamp" to RESOURCE_STATE_TIMESTAMP,
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index 1f4f6195..ae6e62d8 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -67,14 +67,14 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1023", reader.get(RESOURCE_ID)) },
+ { assertEquals("1023", reader.getString(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1052", reader.get(RESOURCE_ID)) },
+ { assertEquals("1052", reader.getString(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1073", reader.get(RESOURCE_ID)) },
+ { assertEquals("1073", reader.getString(RESOURCE_ID)) },
{ assertFalse(reader.nextRow()) }
)
@@ -87,9 +87,9 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCES)
writer.startRow()
- writer.set(RESOURCE_ID, "1019")
- writer.set(RESOURCE_START_TIME, Instant.EPOCH)
- writer.set(RESOURCE_STOP_TIME, Instant.EPOCH)
+ writer.setString(RESOURCE_ID, "1019")
+ writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH)
+ writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH)
writer.setInt(RESOURCE_CPU_COUNT, 1)
writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
@@ -100,9 +100,9 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
{ assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
@@ -124,8 +124,8 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
{ assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
)
@@ -138,8 +138,8 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
writer.startRow()
- writer.set(RESOURCE_ID, "1019")
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
+ writer.setString(RESOURCE_ID, "1019")
+ writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
writer.setInt(RESOURCE_CPU_COUNT, 1)
writer.endRow()
@@ -149,8 +149,8 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
{ assertFalse(reader.nextRow()) },
@@ -170,13 +170,13 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
- { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
- { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertEquals(setOf("1019", "1023", "1052"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.8830158730158756, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
- { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
- { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
{ assertFalse(reader.nextRow()) }
)