diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-20 15:12:10 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-20 15:12:10 +0200 |
| commit | 768bfa0d2ae763e359d74612385ce43c41afb432 (patch) | |
| tree | 1ca870d5dd75f7250e91ae7dec41a5e68a77856f /opendc-trace/opendc-trace-azure | |
| parent | 55a4c8208cc44ac626f7b8c61a19d5ec725ec936 (diff) | |
feat(trace): Support column lookup via index
This change adds support for looking up the column value through the
column index. This enables faster lookup when processing very large
traces.
Diffstat (limited to 'opendc-trace/opendc-trace-azure')
3 files changed, 63 insertions, 101 deletions
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt index e6b89465..8f2f5cc9 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTable.kt @@ -24,6 +24,7 @@ package org.opendc.trace.azure import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* +import org.opendc.trace.util.CompositeTableReader import java.nio.file.Files import java.nio.file.Path import java.util.stream.Collectors @@ -55,57 +56,8 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa override fun newReader(): TableReader { val it = partitions.iterator() - return object : TableReader { - var delegate: TableReader? = nextDelegate() - - override fun nextRow(): Boolean { - var delegate = delegate - - while (delegate != null) { - if (delegate.nextRow()) { - break - } - - delegate.close() - delegate = nextDelegate() - this.delegate = delegate - } - - return delegate != null - } - - override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false - - override fun <T> get(column: TableColumn<T>): T { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(column) - } - - override fun getBoolean(column: TableColumn<Boolean>): Boolean { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getBoolean(column) - } - - override fun getInt(column: TableColumn<Int>): Int { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getInt(column) - } - - override fun getLong(column: TableColumn<Long>): Long { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getLong(column) - } - - override fun getDouble(column: TableColumn<Double>): Double { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.getDouble(column) - } - - override fun close() { - delegate?.close() - } - - private fun nextDelegate(): TableReader? { + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { return if (it.hasNext()) { val (_, path) = it.next() return AzureResourceStateTableReader(factory.createParser(path.toFile())) diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index 6c1cb770..da8181fe 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -60,42 +60,37 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..columns.size) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_STATE_TIMESTAMP -> timestamp - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column") + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_TIMESTAMP -> timestamp + COL_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column index") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { throw IllegalArgumentException("Invalid column") } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + override fun getDouble(index: Int): Double { + return when (index) { + COL_CPU_USAGE_PCT -> cpuUsagePct else -> throw IllegalArgumentException("Invalid column") } } @@ -133,6 +128,15 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta cpuUsagePct = Double.NaN } + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_CPU_USAGE_PCT = 2 + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index 5ea97483..a6352613 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -62,49 +62,42 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in 0..columns.size) { "Invalid column index" } + return false } - override fun <T> get(column: TableColumn<T>): T { - val res: Any? = when (column) { - RESOURCE_ID -> id - RESOURCE_START_TIME -> startTime - RESOURCE_STOP_TIME -> stopTime - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + override fun get(index: Int): Any? { + return when (index) { + COL_ID -> id + COL_START_TIME -> startTime + COL_STOP_TIME -> stopTime + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { - return when (column) { - RESOURCE_CPU_COUNT -> cpuCores + override fun getInt(index: Int): Int { + return when (index) { + COL_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { - return when (column) { - RESOURCE_MEM_CAPACITY -> memCapacity + override fun getDouble(index: Int): Double { + return when (index) { + COL_MEM_CAPACITY -> memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -138,7 +131,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe /** * Reset the state. */ - fun reset() { + private fun reset() { id = null startTime = null stopTime = null @@ -146,6 +139,19 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe memCapacity = Double.NaN } + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. |
