summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-azure/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-20 15:12:10 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-20 15:12:10 +0200
commit768bfa0d2ae763e359d74612385ce43c41afb432 (patch)
tree1ca870d5dd75f7250e91ae7dec41a5e68a77856f /opendc-trace/opendc-trace-azure/src
parent55a4c8208cc44ac626f7b8c61a19d5ec725ec936 (diff)
feat(trace): Support column lookup via index
This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces.
Diffstat (limited to 'opendc-trace/opendc-trace-azure/src')
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt54
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt48
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt62
3 files changed, 63 insertions, 101 deletions
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
index e6b89465..8f2f5cc9 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.azure
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
+import org.opendc.trace.util.CompositeTableReader
import java.nio.file.Files
import java.nio.file.Path
import java.util.stream.Collectors
@@ -55,57 +56,8 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa
override fun newReader(): TableReader {
val it = partitions.iterator()
- return object : TableReader {
- var delegate: TableReader? = nextDelegate()
-
- override fun nextRow(): Boolean {
- var delegate = delegate
-
- while (delegate != null) {
- if (delegate.nextRow()) {
- break
- }
-
- delegate.close()
- delegate = nextDelegate()
- this.delegate = delegate
- }
-
- return delegate != null
- }
-
- override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false
-
- override fun <T> get(column: TableColumn<T>): T {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(column)
- }
-
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getBoolean(column)
- }
-
- override fun getInt(column: TableColumn<Int>): Int {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getInt(column)
- }
-
- override fun getLong(column: TableColumn<Long>): Long {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getLong(column)
- }
-
- override fun getDouble(column: TableColumn<Double>): Double {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.getDouble(column)
- }
-
- override fun close() {
- delegate?.close()
- }
-
- private fun nextDelegate(): TableReader? {
+ return object : CompositeTableReader() {
+ override fun nextReader(): TableReader? {
return if (it.hasNext()) {
val (_, path) = it.next()
return AzureResourceStateTableReader(factory.createParser(path.toFile()))
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index 6c1cb770..da8181fe 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -60,42 +60,37 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_STATE_TIMESTAMP -> timestamp
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column")
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_TIMESTAMP -> timestamp
+ COL_CPU_USAGE_PCT -> cpuUsagePct
+ else -> throw IllegalArgumentException("Invalid column index")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
throw IllegalArgumentException("Invalid column")
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_CPU_USAGE_PCT -> cpuUsagePct
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -133,6 +128,15 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
cpuUsagePct = Double.NaN
}
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_CPU_USAGE_PCT = 2
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index 5ea97483..a6352613 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -62,49 +62,42 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..columns.size) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any? = when (column) {
- RESOURCE_ID -> id
- RESOURCE_START_TIME -> startTime
- RESOURCE_STOP_TIME -> stopTime
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_ID -> id
+ COL_START_TIME -> startTime
+ COL_STOP_TIME -> stopTime
+ COL_CPU_COUNT -> getInt(index)
+ COL_MEM_CAPACITY -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- RESOURCE_CPU_COUNT -> cpuCores
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_CPU_COUNT -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
- return when (column) {
- RESOURCE_MEM_CAPACITY -> memCapacity
+ override fun getDouble(index: Int): Double {
+ return when (index) {
+ COL_MEM_CAPACITY -> memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -138,7 +131,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
/**
* Reset the state.
*/
- fun reset() {
+ private fun reset() {
id = null
startTime = null
stopTime = null
@@ -146,6 +139,19 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
memCapacity = Double.NaN
}
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY
+ )
+
companion object {
/**
* The [CsvSchema] that is used to parse the trace.