summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-20 15:12:10 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-20 15:12:10 +0200
commit768bfa0d2ae763e359d74612385ce43c41afb432 (patch)
tree1ca870d5dd75f7250e91ae7dec41a5e68a77856f /opendc-trace
parent55a4c8208cc44ac626f7b8c61a19d5ec725ec936 (diff)
feat(trace): Support column lookup via index
This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces.
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt107
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt110
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt54
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt48
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt62
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt54
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt94
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt54
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt104
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt33
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt69
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt82
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt82
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt72
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt62
-rw-r--r--opendc-trace/opendc-trace-wtf/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt132
17 files changed, 688 insertions, 533 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
index b5e7669f..8a796e6c 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
@@ -34,34 +34,129 @@ public interface TableReader : AutoCloseable {
public fun nextRow(): Boolean
/**
+ * Resolve the index of the specified [column] for this reader.
+ *
+ * @param column The column to lookup.
+ * @return The zero-based index of the column or a negative value if the column is not present in this table.
+ */
+ public fun resolve(column: TableColumn<*>): Int
+
+ /**
* Determine whether the [TableReader] supports the specified [column].
*/
- public fun hasColumn(column: TableColumn<*>): Boolean
+ public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+
+ /**
+ * Determine whether the specified [column] has a `null` value for the current row.
+ *
+ * @param index The zero-based index of the column to check for a null value.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
+ */
+ public fun isNull(index: Int): Boolean
+
+ /**
+ * Obtain the object value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The object value of the column.
+ */
+ public fun get(index: Int): Any?
+
+ /**
+ * Obtain the boolean value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The boolean value of the column or `false` if the column is `null`.
+ */
+ public fun getBoolean(index: Int): Boolean
+
+ /**
+ * Obtain the integer value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The integer value of the column or `0` if the column is `null`.
+ */
+ public fun getInt(index: Int): Int
+
+ /**
+ * Obtain the double value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The long value of the column or `0` if the column is `null`.
+ */
+ public fun getLong(index: Int): Long
+
+ /**
+ * Obtain the double value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The double value of the column or [Double.NaN] if the column is `null`.
+ */
+ public fun getDouble(index: Int): Double
+
+ /**
+ * Determine whether the specified [column] has a `null` value for the current row.
+ *
+ * @param column The column to lookup.
+ * @throws IllegalArgumentException if the column is not valid for this table.
+ * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
+ */
+ public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column))
/**
* Obtain the value of the current column with type [T].
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The object value of the column.
*/
- public fun <T> get(column: TableColumn<T>): T
+ public fun <T> get(column: TableColumn<T>): T {
+ // This cast should always succeed since the resolve the index of the typed column
+ @Suppress("UNCHECKED_CAST")
+ return get(resolve(column)) as T
+ }
/**
* Read the specified [column] as boolean.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The boolean value of the column or `false` if the column is `null`.
*/
- public fun getBoolean(column: TableColumn<Boolean>): Boolean
+ public fun getBoolean(column: TableColumn<Boolean>): Boolean = getBoolean(resolve(column))
/**
* Read the specified [column] as integer.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The integer value of the column or `0` if the column is `null`.
*/
- public fun getInt(column: TableColumn<Int>): Int
+ public fun getInt(column: TableColumn<Int>): Int = getInt(resolve(column))
/**
* Read the specified [column] as long.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The long value of the column or `0` if the column is `null`.
*/
- public fun getLong(column: TableColumn<Long>): Long
+ public fun getLong(column: TableColumn<Long>): Long = getLong(resolve(column))
/**
* Read the specified [column] as double.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The double value of the column or [Double.NaN] if the column is `null`.
*/
- public fun getDouble(column: TableColumn<Double>): Double
+ public fun getDouble(column: TableColumn<Double>): Double = getDouble(resolve(column))
/**
* Closes the reader so that no further iteration or data access can be made.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
new file mode 100644
index 00000000..dafc0798
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+
+/**
+ * A helper class to chain multiple [TableReader]s.
+ */
+public abstract class CompositeTableReader : TableReader {
+ /**
+ * A flag to indicate that the reader has starting, meaning the user called [nextRow] at least once
+ * (and in turn [nextReader]).
+ */
+ private var hasStarted = false
+
+ /**
+ * The active [TableReader] instance.
+ */
+ private var delegate: TableReader? = null
+
+ /**
+ * Obtain the next [TableReader] instance to read from or `null` if there are no more readers to read from.
+ */
+ protected abstract fun nextReader(): TableReader?
+
+ override fun nextRow(): Boolean {
+ if (!hasStarted) {
+ assert(delegate == null) { "Duplicate initialization" }
+ delegate = nextReader()
+ hasStarted = true
+ }
+
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextReader()
+ this.delegate = delegate
+ }
+
+ return delegate != null
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ val delegate = delegate
+ return delegate?.resolve(column) ?: -1
+ }
+
+ override fun isNull(index: Int): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.isNull(index)
+ }
+
+ override fun get(index: Int): Any? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(index)
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(index)
+ }
+
+ override fun getInt(index: Int): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(index)
+ }
+
+ override fun getLong(index: Int): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(index)
+ }
+
+ override fun getDouble(index: Int): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(index)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ override fun toString(): String = "CompositeTableReader"
+}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
index e6b89465..8f2f5cc9 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
+import org.opendc.trace.util.CompositeTableReader
import java.nio.file.Files
import java.nio.file.Path
import java.util.stream.Collectors
@@ -55,57 +56,8 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa
override fun newReader(): TableReader {
val it = partitions.iterator()
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- this.delegate = delegate
- }
-
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
return if (it.hasNext()) {
val (_, path) = it.next()
return AzureResourceStateTableReader(factory.createParser(path.toFile()))
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index 6c1cb770..da8181fe 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -60,42 +60,37 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_TIMESTAMP -> timestamp
+ COL_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column index")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
throw IllegalArgumentException("Invalid column")
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_CPU_USAGE_PCT -> cpuUsagePct
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -133,6 +128,15 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
cpuUsagePct = Double.NaN
}
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_CPU_USAGE_PCT = 2
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index 5ea97483..a6352613 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -62,49 +62,42 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_START_TIME -> startTime
- RESOURCE_STOP_TIME -> stopTime
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_START_TIME -> startTime
+ COL_STOP_TIME -> stopTime
+ COL_CPU_COUNT -> getInt(index)
+ COL_MEM_CAPACITY -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_CPU_COUNT -> cpuCores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_MEM_CAPACITY -> memCapacity
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_MEM_CAPACITY -> memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -138,7 +131,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
/**
* Reset the state.
*/
- fun reset() {
+ private fun reset() {
id = null
startTime = null
stopTime = null
@@ -146,6 +139,19 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
memCapacity = Double.NaN
}
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
index 44a6c26e..ab768608 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTable.kt
@@ -23,6 +23,7 @@
package org.opendc.trace.bitbrains
import org.opendc.trace.*
+import org.opendc.trace.util.CompositeTableReader
import java.nio.file.Files
import java.nio.file.Path
import java.util.stream.Collectors
@@ -64,57 +65,8 @@ internal class BitbrainsExResourceStateTable(path: Path) : Table {
override fun newReader(): TableReader {
val it = partitions.iterator()
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- this.delegate = delegate
- }
-
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
return if (it.hasNext()) {
val (_, path) = it.next()
val reader = path.bufferedReader()
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
index 5619e839..c1b6f5ba 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -88,70 +88,53 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_CLUSTER_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_CPU_DEMAND -> true
- RESOURCE_STATE_CPU_READY_PCT -> true
- RESOURCE_MEM_CAPACITY -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_MAX) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_CLUSTER_ID -> cluster
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_CPU_CAPACITY -> getDouble(RESOURCE_CPU_CAPACITY)
- RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
- RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
- RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ)
- RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_CLUSTER_ID -> cluster
+ COL_TIMESTAMP -> timestamp
+ COL_NCPUS -> getInt(index)
+ COL_POWERED_ON -> getInt(index)
+ COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_CPU_READY_PCT, COL_CPU_DEMAND, COL_MEM_CAPACITY, COL_DISK_READ, COL_DISK_WRITE -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- return when (column) {
- RESOURCE_STATE_POWERED_ON -> poweredOn
+ override fun getBoolean(index: Int): Boolean {
+ return when (index) {
+ COL_POWERED_ON -> poweredOn
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_CPU_COUNT -> cpuCores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
- RESOURCE_STATE_CPU_DEMAND -> cpuDemand
- RESOURCE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_CPU_CAPACITY -> cpuCapacity
+ COL_CPU_USAGE -> cpuUsage
+ COL_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
+ COL_CPU_READY_PCT -> cpuReadyPct
+ COL_CPU_DEMAND -> cpuDemand
+ COL_MEM_CAPACITY -> memCapacity
+ COL_DISK_READ -> diskRead
+ COL_DISK_WRITE -> diskWrite
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -209,4 +192,21 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
private val COL_CPU_CAPACITY = 18
private val COL_ID = 19
private val COL_MEM_CAPACITY = 20
+ private val COL_CPU_USAGE_PCT = 21
+ private val COL_MAX = COL_CPU_USAGE_PCT + 1
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_CLUSTER_ID to COL_CLUSTER_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_CPU_COUNT to COL_NCPUS,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT,
+ RESOURCE_STATE_CPU_DEMAND to COL_CPU_DEMAND,
+ RESOURCE_STATE_CPU_READY_PCT to COL_CPU_READY_PCT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ RESOURCE_STATE_DISK_READ to COL_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE
+ )
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
index f68e61dc..6b6ac9da 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.bitbrains
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
+import org.opendc.trace.util.CompositeTableReader
import java.nio.file.Files
import java.nio.file.Path
import java.util.stream.Collectors
@@ -65,57 +66,8 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path
override fun newReader(): TableReader {
val it = partitions.iterator()
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- this.delegate = delegate
- }
-
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
return if (it.hasNext()) {
val (partition, path) = it.next()
return BitbrainsResourceStateTableReader(partition, factory.createParser(path.toFile()))
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 54be5dea..3a8839b4 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -111,71 +111,49 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_MEM_CAPACITY -> true
- RESOURCE_STATE_MEM_USAGE -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- RESOURCE_STATE_NET_RX -> true
- RESOURCE_STATE_NET_TX -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> partition
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_CPU_COUNT -> cpuCores
- RESOURCE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- RESOURCE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_MEM_USAGE -> memUsage
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
- RESOURCE_STATE_NET_RX -> netReceived
- RESOURCE_STATE_NET_TX -> netTransmitted
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> partition
+ COL_TIMESTAMP -> timestamp
+ COL_CPU_COUNT -> getInt(index)
+ COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_MEM_CAPACITY, COL_MEM_USAGE, COL_DISK_READ, COL_DISK_WRITE, COL_NET_RX, COL_NET_TX -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_CPU_COUNT -> cpuCores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_CPU_CAPACITY -> cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- RESOURCE_MEM_CAPACITY -> memCapacity
- RESOURCE_STATE_MEM_USAGE -> memUsage
- RESOURCE_STATE_DISK_READ -> diskRead
- RESOURCE_STATE_DISK_WRITE -> diskWrite
- RESOURCE_STATE_NET_RX -> netReceived
- RESOURCE_STATE_NET_TX -> netTransmitted
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_CPU_CAPACITY -> cpuCapacity
+ COL_CPU_USAGE -> cpuUsage
+ COL_CPU_USAGE_PCT -> cpuUsagePct
+ COL_MEM_CAPACITY -> memCapacity
+ COL_MEM_USAGE -> memUsage
+ COL_DISK_READ -> diskRead
+ COL_DISK_WRITE -> diskWrite
+ COL_NET_RX -> netReceived
+ COL_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -249,6 +227,34 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
netTransmitted = Double.NaN
}
+ private val COL_TIMESTAMP = 0
+ private val COL_CPU_COUNT = 1
+ private val COL_CPU_CAPACITY = 2
+ private val COL_CPU_USAGE = 3
+ private val COL_CPU_USAGE_PCT = 4
+ private val COL_MEM_CAPACITY = 5
+ private val COL_MEM_USAGE = 6
+ private val COL_DISK_READ = 7
+ private val COL_DISK_WRITE = 8
+ private val COL_NET_RX = 9
+ private val COL_NET_TX = 10
+ private val COL_ID = 11
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ RESOURCE_STATE_MEM_USAGE to COL_MEM_USAGE,
+ RESOURCE_STATE_DISK_READ to COL_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE,
+ RESOURCE_STATE_NET_RX to COL_NET_RX,
+ RESOURCE_STATE_NET_TX to COL_NET_TX
+ )
+
/**
* The type of the timestamp in the trace.
*/
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
index 146c04f0..3701994a 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
@@ -43,13 +43,14 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
val parser = factory.createParser(path.toFile())
val reader = BitbrainsResourceStateTableReader(name, parser)
+ val idCol = reader.resolve(RESOURCE_ID)
try {
if (!reader.nextRow()) {
continue
}
- id = reader.get(RESOURCE_ID)
+ id = reader.get(idCol) as String
return true
} finally {
reader.close()
@@ -59,36 +60,33 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
return false
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
throw IllegalArgumentException("Invalid column")
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -105,4 +103,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
private fun reset() {
id = null
}
+
+ private val COL_ID = 0
+ private val columns = mapOf(RESOURCE_ID to COL_ID)
}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 39eb5520..aa4c543b 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -67,52 +67,43 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_WORKFLOW_ID -> true
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- TASK_WORKFLOW_ID -> workflowId
- TASK_ID -> jobId
- TASK_SUBMIT_TIME -> submitTime
- TASK_RUNTIME -> runtime
- TASK_REQ_NCPUS -> nProcs
- TASK_ALLOC_NCPUS -> reqNProcs
- TASK_PARENTS -> dependencies
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_JOB_ID -> jobId
+ COL_WORKFLOW_ID -> workflowId
+ COL_SUBMIT_TIME -> submitTime
+ COL_RUNTIME -> runtime
+ COL_REQ_NPROC -> getInt(index)
+ COL_NPROC -> getInt(index)
+ COL_DEPS -> dependencies
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- TASK_REQ_NCPUS -> nProcs
- TASK_ALLOC_NCPUS -> reqNProcs
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NPROC -> reqNProcs
+ COL_NPROC -> nProcs
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -180,6 +171,24 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
dependencies = emptySet()
}
+ private val COL_WORKFLOW_ID = 0
+ private val COL_JOB_ID = 1
+ private val COL_SUBMIT_TIME = 2
+ private val COL_RUNTIME = 3
+ private val COL_NPROC = 4
+ private val COL_REQ_NPROC = 5
+ private val COL_DEPS = 6
+
+ private val columns = mapOf(
+ TASK_ID to COL_JOB_ID,
+ TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_RUNTIME to COL_RUNTIME,
+ TASK_ALLOC_NCPUS to COL_NPROC,
+ TASK_REQ_NCPUS to COL_REQ_NPROC,
+ TASK_PARENTS to COL_DEPS
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index e4b18735..b5043f82 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_DURATION -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_STATE_CPU_USAGE -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_ID -> record[COL_ID].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long)
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
+ return when (index) {
+ COL_ID -> record[AVRO_COL_ID].toString()
+ COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long)
+ COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long)
+ COL_CPU_COUNT -> getInt(index)
+ COL_CPU_USAGE -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
+ return when (index) {
+ COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble()
+ return when (index) {
+ COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
*/
private fun initColumns(schema: Schema) {
try {
- COL_ID = schema.getField("id").pos()
- COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
- COL_DURATION = schema.getField("duration").pos()
- COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ AVRO_COL_DURATION = schema.getField("duration").pos()
+ AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
} catch (e: NullPointerException) {
// This happens when the field we are trying to access does not exist
throw IllegalArgumentException("Invalid schema", e)
}
}
- private var COL_ID = -1
- private var COL_TIMESTAMP = -1
- private var COL_DURATION = -1
- private var COL_CPU_COUNT = -1
- private var COL_CPU_USAGE = -1
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_TIMESTAMP = -1
+ private var AVRO_COL_DURATION = -1
+ private var AVRO_COL_CPU_COUNT = -1
+ private var AVRO_COL_CPU_USAGE = -1
+
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_DURATION to COL_DURATION,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index c52da62d..d93929aa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_ID -> record[COL_ID].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long)
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
+ return when (index) {
+ COL_ID -> record[AVRO_COL_ID].toString()
+ COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long)
+ COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long)
+ COL_CPU_COUNT -> getInt(index)
+ COL_MEM_CAPACITY -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
+ return when (index) {
+ COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble()
+ return when (index) {
+ COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
*/
private fun initColumns(schema: Schema) {
try {
- COL_ID = schema.getField("id").pos()
- COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
} catch (e: NullPointerException) {
// This happens when the field we are trying to access does not exist
throw IllegalArgumentException("Invalid schema")
}
}
- private var COL_ID = -1
- private var COL_START_TIME = -1
- private var COL_STOP_TIME = -1
- private var COL_CPU_COUNT = -1
- private var COL_MEM_CAPACITY = -1
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_START_TIME = -1
+ private var AVRO_COL_STOP_TIME = -1
+ private var AVRO_COL_CPU_COUNT = -1
+ private var AVRO_COL_MEM_CAPACITY = -1
+
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ )
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 3f49c770..2f6ea6ee 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -69,64 +69,43 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- TASK_STATUS -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in columns.values) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any = when (column) {
- TASK_ID -> fields[COL_JOB_ID]
- TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10))
- TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10))
- TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10))
- TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
- TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS)
- TASK_PARENTS -> {
- val parent = fields[COL_PARENT_JOB].toLong(10)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_JOB_ID -> fields[index]
+ COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
+ COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
+ COL_PARENT_JOB -> {
+ val parent = fields[index].toLong(10)
if (parent < 0) emptySet() else setOf(parent)
}
- TASK_STATUS -> getInt(TASK_STATUS)
- TASK_GROUP_ID -> getInt(TASK_GROUP_ID)
- TASK_USER_ID -> getInt(TASK_USER_ID)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10)
- TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10)
- TASK_STATUS -> fields[COL_STATUS].toInt(10)
- TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10)
- TASK_USER_ID -> fields[COL_USER_ID].toInt(10)
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -155,4 +134,17 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
private val COL_PART_NUM = 15
private val COL_PARENT_JOB = 16
private val COL_PARENT_THINK_TIME = 17
+
+ private val columns = mapOf(
+ TASK_ID to COL_JOB_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_WAIT_TIME to COL_WAIT_TIME,
+ TASK_RUNTIME to COL_RUN_TIME,
+ TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS,
+ TASK_REQ_NCPUS to COL_REQ_NCPUS,
+ TASK_STATUS to COL_STATUS,
+ TASK_USER_ID to COL_USER_ID,
+ TASK_GROUP_ID to COL_GROUP_ID,
+ TASK_PARENTS to COL_PARENT_JOB
+ )
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
index 4408ba5c..7f378d80 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -94,49 +94,41 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
return hasJob
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_WORKFLOW_ID -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_PARENTS -> true
- TASK_CHILDREN -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column value" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- TASK_ID -> id
- TASK_WORKFLOW_ID -> workflowId
- TASK_RUNTIME -> runtime
- TASK_PARENTS -> parents
- TASK_CHILDREN -> children
- TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_WORKFLOW_ID -> workflowId
+ COL_RUNTIME -> runtime
+ COL_PARENTS -> parents
+ COL_CHILDREN -> children
+ COL_NPROC -> getInt(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- TASK_REQ_NCPUS -> cores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_NPROC -> cores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -231,4 +223,20 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
children = null
cores = -1
}
+
+ private val COL_ID = 0
+ private val COL_WORKFLOW_ID = 1
+ private val COL_RUNTIME = 3
+ private val COL_NPROC = 4
+ private val COL_PARENTS = 5
+ private val COL_CHILDREN = 6
+
+ private val columns = mapOf(
+ TASK_ID to COL_ID,
+ TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
+ TASK_RUNTIME to COL_RUNTIME,
+ TASK_REQ_NCPUS to COL_NPROC,
+ TASK_PARENTS to COL_PARENTS,
+ TASK_CHILDREN to COL_CHILDREN,
+ )
}
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts
index 5051c7b0..e4f0ab3a 100644
--- a/opendc-trace/opendc-trace-wtf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts
@@ -34,4 +34,6 @@ dependencies {
api(projects.opendcTrace.opendcTraceApi)
implementation(projects.opendcTrace.opendcTraceParquet)
+
+ testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 5e2463f8..45ec25dd 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.wtf
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -37,73 +38,126 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_WORKFLOW_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_PARENTS -> true
- TASK_CHILDREN -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
-
@Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- TASK_ID -> (record["id"] as Long).toString()
- TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString()
- TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long)
- TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long)
- TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long)
- TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- TASK_GROUP_ID -> record["group_id"]
- TASK_USER_ID -> record["user_id"]
+ return when (index) {
+ COL_ID -> (record[AVRO_COL_ID] as Long).toString()
+ COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString()
+ COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long)
+ COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long)
+ COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long)
+ COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
+ COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_GROUP_ID -> record["group_id"] as Int
- TASK_USER_ID -> record["user_id"] as Int
+ return when (index) {
+ COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt()
+ COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int
+ COL_USER_ID -> record[AVRO_COL_USER_ID] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
override fun close() {
reader.close()
}
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos()
+ AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos()
+ AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos()
+ AVRO_COL_RUNTIME = schema.getField("runtime").pos()
+ AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos()
+ AVRO_COL_PARENTS = schema.getField("parents").pos()
+ AVRO_COL_CHILDREN = schema.getField("children").pos()
+ AVRO_COL_GROUP_ID = schema.getField("group_id").pos()
+ AVRO_COL_USER_ID = schema.getField("user_id").pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema", e)
+ }
+ }
+
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_WORKFLOW_ID = -1
+ private var AVRO_COL_SUBMIT_TIME = -1
+ private var AVRO_COL_WAIT_TIME = -1
+ private var AVRO_COL_RUNTIME = -1
+ private var AVRO_COL_REQ_NCPUS = -1
+ private var AVRO_COL_PARENTS = -1
+ private var AVRO_COL_CHILDREN = -1
+ private var AVRO_COL_GROUP_ID = -1
+ private var AVRO_COL_USER_ID = -1
+
+ private val COL_ID = 0
+ private val COL_WORKFLOW_ID = 1
+ private val COL_SUBMIT_TIME = 2
+ private val COL_WAIT_TIME = 3
+ private val COL_RUNTIME = 4
+ private val COL_REQ_NCPUS = 5
+ private val COL_PARENTS = 6
+ private val COL_CHILDREN = 7
+ private val COL_GROUP_ID = 8
+ private val COL_USER_ID = 9
+
+ private val columns = mapOf(
+ TASK_ID to COL_ID,
+ TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_WAIT_TIME to COL_WAIT_TIME,
+ TASK_RUNTIME to COL_RUNTIME,
+ TASK_REQ_NCPUS to COL_REQ_NCPUS,
+ TASK_PARENTS to COL_PARENTS,
+ TASK_CHILDREN to COL_CHILDREN,
+ TASK_GROUP_ID to COL_GROUP_ID,
+ TASK_USER_ID to COL_USER_ID,
+ )
}