From 55a4c8208cc44ac626f7b8c61a19d5ec725ec936 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 11:48:18 +0200 Subject: refactor(trace): Unify columns of different tables This change unifies columns of different tables used by trace formats. This concretely means that instead of having columns specific per table (e.g., RESOURCE_ID and RESOURCE_STATE_ID), with this changes these columns are shared between the tables with a single definition (RESOURCE_ID). --- .../kotlin/org/opendc/trace/ResourceColumns.kt | 22 +++++++-- .../org/opendc/trace/ResourceStateColumns.kt | 54 +++++----------------- .../main/kotlin/org/opendc/trace/TableColumns.kt | 33 ++----------- .../main/kotlin/org/opendc/trace/TaskColumns.kt | 28 ++++++----- 4 files changed, 46 insertions(+), 91 deletions(-) (limited to 'opendc-trace/opendc-trace-api') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 219002e0..f1977945 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -29,28 +29,40 @@ import java.time.Instant * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn = stringColumn("resource:id") +public val RESOURCE_ID: TableColumn = column("resource:id") + +/** + * The cluster to which the resource belongs. + */ +@JvmField +public val RESOURCE_CLUSTER_ID: TableColumn = column("resource:cluster_id") /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn = TableColumn("resource:start_time", Instant::class.java) +public val RESOURCE_START_TIME: TableColumn = column("resource:start_time") /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn = TableColumn("resource:stop_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn = column("resource:stop_time") /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn = intColumn("resource:cpu_count") +public val RESOURCE_CPU_COUNT: TableColumn = column("resource:cpu_count") + +/** + * Total CPU capacity of the resource in MHz. + */ +@JvmField +public val RESOURCE_CPU_CAPACITY: TableColumn = column("resource:cpu_capacity") /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn = doubleColumn("resource:mem_capacity") +public val RESOURCE_MEM_CAPACITY: TableColumn = column("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index b683923b..44762da5 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -26,104 +26,74 @@ package org.opendc.trace import java.time.Duration import java.time.Instant -/** - * Identifier of the resource. - */ -@JvmField -public val RESOURCE_STATE_ID: TableColumn = stringColumn("resource_state:id") - -/** - * The cluster to which the resource belongs. - */ -@JvmField -public val RESOURCE_STATE_CLUSTER_ID: TableColumn = stringColumn("resource_state:cluster_id") - /** * Timestamp for the state. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn = TableColumn("resource_state:timestamp", Instant::class.java) +public val RESOURCE_STATE_TIMESTAMP: TableColumn = column("resource_state:timestamp") /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn = TableColumn("resource_state:duration", Duration::class.java) +public val RESOURCE_STATE_DURATION: TableColumn = column("resource_state:duration") /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn = booleanColumn("resource_state:powered_on") - -/** - * Number of CPUs for the resource. - */ -@JvmField -public val RESOURCE_STATE_CPU_COUNT: TableColumn = intColumn("resource_state:cpu_count") - -/** - * Total CPU capacity of the resource in MHz. - */ -@JvmField -public val RESOURCE_STATE_CPU_CAPACITY: TableColumn = doubleColumn("resource_state:cpu_capacity") +public val RESOURCE_STATE_POWERED_ON: TableColumn = column("resource_state:powered_on") /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn = doubleColumn("resource_state:cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: TableColumn = column("resource_state:cpu_usage") /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn = doubleColumn("resource_state:cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn = column("resource_state:cpu_usage_pct") /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn = doubleColumn("resource_state:cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: TableColumn = column("resource_state:cpu_demand") /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn = doubleColumn("resource_state:cpu_ready_pct") - -/** - * Memory capacity of the resource in KB. - */ -@JvmField -public val RESOURCE_STATE_MEM_CAPACITY: TableColumn = doubleColumn("resource_state:mem_capacity") +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn = column("resource_state:cpu_ready_pct") /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn = doubleColumn("resource_state:mem_usage") +public val RESOURCE_STATE_MEM_USAGE: TableColumn = column("resource_state:mem_usage") /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn = doubleColumn("resource_state:disk_read") +public val RESOURCE_STATE_DISK_READ: TableColumn = column("resource_state:disk_read") /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn = doubleColumn("resource_state:disk_write") +public val RESOURCE_STATE_DISK_WRITE: TableColumn = column("resource_state:disk_write") /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn = doubleColumn("resource_state:net_rx") +public val RESOURCE_STATE_NET_RX: TableColumn = column("resource_state:net_rx") /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn = doubleColumn("resource_state:net_tx") +public val RESOURCE_STATE_NET_TX: TableColumn = column("resource_state:net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt index 64920498..31a58360 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt @@ -24,36 +24,11 @@ package org.opendc.trace /** - * Construct a [TableColumn] with [Any] type. + * Construct a [TableColumn] with the specified [name] and type [T]. */ -public fun objectColumn(name: String): TableColumn = TableColumn(name, Any::class.java) +public inline fun column(name: String): TableColumn = column(name, T::class.java) /** - * Construct a [TableColumn] with a [String] type. + * Construct a [TableColumn] with the specified [name] and [type]. */ -public fun stringColumn(name: String): TableColumn = TableColumn(name, String::class.java) - -/** - * Construct a [TableColumn] with a [Number] type. - */ -public fun numberColumn(name: String): TableColumn = TableColumn(name, Number::class.java) - -/** - * Construct a [TableColumn] with an [Int] type. - */ -public fun intColumn(name: String): TableColumn = TableColumn(name, Int::class.java) - -/** - * Construct a [TableColumn] with a [Long] type. - */ -public fun longColumn(name: String): TableColumn = TableColumn(name, Long::class.java) - -/** - * Construct a [TableColumn] with a [Double] type. - */ -public fun doubleColumn(name: String): TableColumn = TableColumn(name, Double::class.java) - -/** - * Construct a [TableColumn] with a [Boolean] type. - */ -public fun booleanColumn(name: String): TableColumn = TableColumn(name, Boolean::class.java) +public fun column(name: String, type: Class): TableColumn = TableColumn(name, type) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 46920dce..d103bce4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -30,72 +30,70 @@ import java.time.Instant * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn = stringColumn("task:id") +public val TASK_ID: TableColumn = column("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn = stringColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn = column("task:workflow_id") /** - * A column containing the submit time of the task. + * A column containing the submission time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn = TableColumn("task:submit_time", type = Instant::class.java) +public val TASK_SUBMIT_TIME: TableColumn = column("task:submit_time") /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn = TableColumn("task:wait_time", type = Instant::class.java) +public val TASK_WAIT_TIME: TableColumn = column("task:wait_time") /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn = TableColumn("task:runtime", type = Duration::class.java) +public val TASK_RUNTIME: TableColumn = column("task:runtime") /** * A column containing the parents of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn> = TableColumn("task:parents", type = Set::class.java as Class>) +public val TASK_PARENTS: TableColumn> = column("task:parents") /** * A column containing the children of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn> = TableColumn("task:children", type = Set::class.java as Class>) +public val TASK_CHILDREN: TableColumn> = column("task:children") /** * A column containing the requested CPUs of a task. */ @JvmField -public val TASK_REQ_NCPUS: TableColumn = intColumn("task:req_ncpus") +public val TASK_REQ_NCPUS: TableColumn = column("task:req_ncpus") /** * A column containing the allocated CPUs of a task. */ @JvmField -public val TASK_ALLOC_NCPUS: TableColumn = intColumn("task:alloc_ncpus") +public val TASK_ALLOC_NCPUS: TableColumn = column("task:alloc_ncpus") /** * A column containing the status of a task. */ @JvmField -public val TASK_STATUS: TableColumn = intColumn("task:status") +public val TASK_STATUS: TableColumn = column("task:status") /** * A column containing the group id of a task. */ @JvmField -public val TASK_GROUP_ID: TableColumn = intColumn("task:group_id") +public val TASK_GROUP_ID: TableColumn = column("task:group_id") /** * A column containing the user id of a task. */ @JvmField -public val TASK_USER_ID: TableColumn = intColumn("task:user_id") +public val TASK_USER_ID: TableColumn = column("task:user_id") -- cgit v1.2.3 From 768bfa0d2ae763e359d74612385ce43c41afb432 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:12:10 +0200 Subject: feat(trace): Support column lookup via index This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces. --- .../main/kotlin/org/opendc/trace/TableReader.kt | 107 ++++++++++++++++++-- .../org/opendc/trace/util/CompositeTableReader.kt | 110 +++++++++++++++++++++ 2 files changed, 211 insertions(+), 6 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt (limited to 'opendc-trace/opendc-trace-api') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt index b5e7669f..8a796e6c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -33,35 +33,130 @@ public interface TableReader : AutoCloseable { */ public fun nextRow(): Boolean + /** + * Resolve the index of the specified [column] for this reader. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + /** * Determine whether the [TableReader] supports the specified [column]. */ - public fun hasColumn(column: TableColumn<*>): Boolean + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param index The zero-based index of the column to check for a null value. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(index: Int): Boolean + + /** + * Obtain the object value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The object value of the column. + */ + public fun get(index: Int): Any? + + /** + * Obtain the boolean value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. + */ + public fun getBoolean(index: Int): Boolean + + /** + * Obtain the integer value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. + */ + public fun getInt(index: Int): Int + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. + */ + public fun getLong(index: Int): Long + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. + */ + public fun getDouble(index: Int): Double + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param column The column to lookup. + * @throws IllegalArgumentException if the column is not valid for this table. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column)) /** * Obtain the value of the current column with type [T]. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The object value of the column. */ - public fun get(column: TableColumn): T + public fun get(column: TableColumn): T { + // This cast should always succeed since the resolve the index of the typed column + @Suppress("UNCHECKED_CAST") + return get(resolve(column)) as T + } /** * Read the specified [column] as boolean. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. */ - public fun getBoolean(column: TableColumn): Boolean + public fun getBoolean(column: TableColumn): Boolean = getBoolean(resolve(column)) /** * Read the specified [column] as integer. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. */ - public fun getInt(column: TableColumn): Int + public fun getInt(column: TableColumn): Int = getInt(resolve(column)) /** * Read the specified [column] as long. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. */ - public fun getLong(column: TableColumn): Long + public fun getLong(column: TableColumn): Long = getLong(resolve(column)) /** * Read the specified [column] as double. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. */ - public fun getDouble(column: TableColumn): Double + public fun getDouble(column: TableColumn): Double = getDouble(resolve(column)) /** * Closes the reader so that no further iteration or data access can be made. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt new file mode 100644 index 00000000..dafc0798 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader + +/** + * A helper class to chain multiple [TableReader]s. + */ +public abstract class CompositeTableReader : TableReader { + /** + * A flag to indicate that the reader has starting, meaning the user called [nextRow] at least once + * (and in turn [nextReader]). + */ + private var hasStarted = false + + /** + * The active [TableReader] instance. + */ + private var delegate: TableReader? = null + + /** + * Obtain the next [TableReader] instance to read from or `null` if there are no more readers to read from. + */ + protected abstract fun nextReader(): TableReader? + + override fun nextRow(): Boolean { + if (!hasStarted) { + assert(delegate == null) { "Duplicate initialization" } + delegate = nextReader() + hasStarted = true + } + + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextReader() + this.delegate = delegate + } + + return delegate != null + } + + override fun resolve(column: TableColumn<*>): Int { + val delegate = delegate + return delegate?.resolve(column) ?: -1 + } + + override fun isNull(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.isNull(index) + } + + override fun get(index: Int): Any? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(index) + } + + override fun getBoolean(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(index) + } + + override fun getInt(index: Int): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(index) + } + + override fun getLong(index: Int): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(index) + } + + override fun getDouble(index: Int): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(index) + } + + override fun close() { + delegate?.close() + } + + override fun toString(): String = "CompositeTableReader" +} -- cgit v1.2.3 From 140aafdaa711b0fdeacf99b9c7e70b706b8490f4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:40:13 +0200 Subject: feat(trace): Add property for describing partition keys --- .../opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'opendc-trace/opendc-trace-api') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 6aca2051..164f5084 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -41,6 +41,11 @@ public interface Table { */ public val columns: List> + /** + * The columns by which the table is partitioned. + */ + public val partitionKeys: List> + /** * Open a [TableReader] for this table. */ -- cgit v1.2.3 From c7fff03408ee3109d0a39a96c043584a2d8f67ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 22:04:23 +0200 Subject: refactor(trace): Simplify TraceFormat SPI interface This change simplifies the TraceFormat SPI interface by reducing the number of interfaces that implementors need to implement to only TraceFormat. --- .../src/main/kotlin/org/opendc/trace/Table.kt | 10 ---- .../src/main/kotlin/org/opendc/trace/Trace.kt | 19 +++----- .../kotlin/org/opendc/trace/internal/TableImpl.kt | 52 ++++++++++++++++++++ .../kotlin/org/opendc/trace/internal/TraceImpl.kt | 56 ++++++++++++++++++++++ .../kotlin/org/opendc/trace/spi/TableDetails.kt | 37 ++++++++++++++ .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 33 +++++++++++-- 6 files changed, 179 insertions(+), 28 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt (limited to 'opendc-trace/opendc-trace-api') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 164f5084..031ee269 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -31,11 +31,6 @@ public interface Table { */ public val name: String - /** - * A flag to indicate that the table is synthetic (derived from another table). - */ - public val isSynthetic: Boolean - /** * The list of columns supported in this table. */ @@ -50,9 +45,4 @@ public interface Table { * Open a [TableReader] for this table. */ public fun newReader(): TableReader - - /** - * Open a [TableReader] for [partition] of the table. - */ - public fun newReader(partition: String): TableReader } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 0ae45e86..6d0014cb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,9 +22,9 @@ package org.opendc.trace +import org.opendc.trace.internal.TraceImpl import org.opendc.trace.spi.TraceFormat import java.io.File -import java.net.URL import java.nio.file.Path /** @@ -47,32 +47,25 @@ public interface Trace { public fun getTable(name: String): Table? public companion object { - /** - * Open a [Trace] at the specified [url] in the given [format]. - * - * @throws IllegalArgumentException if [format] is not supported. - */ - public fun open(url: URL, format: String): Trace { - val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } - return provider.open(url) - } - /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The path to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: File, format: String): Trace { - return open(path.toURI().toURL(), format) + return open(path.toPath(), format) } /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The [Path] to the trace. * @throws IllegalArgumentException if [format] is not supported. */ public fun open(path: Path, format: String): Trace { - return open(path.toUri().toURL(), format) + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return TraceImpl(provider, path) } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt new file mode 100644 index 00000000..fd0a0f04 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.internal + +import org.opendc.trace.Table +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.util.* + +/** + * Internal implementation of [Table]. + */ +internal class TableImpl(val trace: TraceImpl, override val name: String) : Table { + /** + * The details of this table. + */ + private val details = trace.format.getDetails(trace.path, name) + + override val columns: List> + get() = details.columns + + override val partitionKeys: List> + get() = details.partitionKeys + + override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + + override fun toString(): String = "Table[name=$name]" + + override fun hashCode(): Int = Objects.hash(trace, name) + + override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt new file mode 100644 index 00000000..fd9536ab --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.internal + +import org.opendc.trace.Table +import org.opendc.trace.Trace +import org.opendc.trace.spi.TraceFormat +import java.nio.file.Path +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +/** + * Internal implementation of the [Trace] interface. + */ +internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace { + /** + * A map containing the [TableImpl] instances associated with the trace. + */ + private val tableMap = ConcurrentHashMap() + + override val tables: List = format.getTables(path) + + init { + for (table in tables) { + tableMap.computeIfAbsent(table) { TableImpl(this, it) } + } + } + + override fun containsTable(name: String): Boolean = tableMap.containsKey(name) + + override fun getTable(name: String): Table? = tableMap[name] + + override fun hashCode(): Int = Objects.hash(format, path) + + override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt new file mode 100644 index 00000000..1a9b9ee1 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.spi + +import org.opendc.trace.Table +import org.opendc.trace.TableColumn + +/** + * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. + * + * @param columns The available columns in the table. + * @param partitionKeys The table columns that act as partition keys for the table. + */ +public data class TableDetails( + val columns: List>, + val partitionKeys: List> = emptyList() +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 54029fcf..e04dd948 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,8 +22,8 @@ package org.opendc.trace.spi -import org.opendc.trace.Trace -import java.net.URL +import org.opendc.trace.TableReader +import java.nio.file.Path import java.util.* /** @@ -36,11 +36,32 @@ public interface TraceFormat { public val name: String /** - * Open a new [Trace] with this provider. + * Return the name of the tables available in the trace at the specified [path]. * - * @param url A reference to the trace. + * @param path The path to the trace. + * @return The list of tables available in the trace. */ - public fun open(url: URL): Trace + public fun getTables(path: Path): List + + /** + * Return the details of [table] in the trace at the specified [path]. + * + * @param path The path to the trace. + * @param table The name of the table to obtain the details for. + * @throws IllegalArgumentException If [table] does not exist. + * @return The [TableDetails] for the specified [table]. + */ + public fun getDetails(path: Path, table: String): TableDetails + + /** + * Open a [TableReader] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableReader] for. + * @throws IllegalArgumentException If [table] does not exist. + * @return A [TableReader] instance for the table. + */ + public fun newReader(path: Path, table: String): TableReader /** * A helper object for resolving providers. @@ -49,6 +70,7 @@ public interface TraceFormat { /** * A list of [TraceFormat] that are available on this system. */ + @JvmStatic public val installedProviders: List by lazy { val loader = ServiceLoader.load(TraceFormat::class.java) loader.toList() @@ -57,6 +79,7 @@ public interface TraceFormat { /** * Obtain a [TraceFormat] implementation by [name]. */ + @JvmStatic public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name } } } -- cgit v1.2.3 From 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 21 Sep 2021 11:34:34 +0200 Subject: feat(trace): Add support for writing traces This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added. --- .../src/main/kotlin/org/opendc/trace/Table.kt | 7 + .../main/kotlin/org/opendc/trace/TableWriter.kt | 151 +++++++++++++++++++++ .../src/main/kotlin/org/opendc/trace/Trace.kt | 30 +++- .../kotlin/org/opendc/trace/internal/TableImpl.kt | 3 + .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 21 +++ 5 files changed, 209 insertions(+), 3 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt (limited to 'opendc-trace/opendc-trace-api') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 031ee269..b0181cbc 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -45,4 +45,11 @@ public interface Table { * Open a [TableReader] for this table. */ public fun newReader(): TableReader + + /** + * Open a [TableWriter] for this table. + * + * @throws UnsupportedOperationException if writing is not supported by the table. + */ + public fun newWriter(): TableWriter } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt new file mode 100644 index 00000000..423ce86a --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * Base class for writing workload traces. + */ +public interface TableWriter : AutoCloseable { + /** + * Start a new row in the table. + */ + public fun startRow() + + /** + * Flush the current row to the table. + */ + public fun endRow() + + /** + * Resolve the index of the specified [column] for this writer. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + + /** + * Determine whether the [TableReader] supports the specified [column]. + */ + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Set [column] to [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun set(index: Int, value: Any) + + /** + * Set [column] to boolean [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(index: Int, value: Boolean) + + /** + * Set [column] to integer [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(index: Int, value: Int) + + /** + * Set [column] to long [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(index: Int, value: Long) + + /** + * Set [column] to double [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(index: Int, value: Double) + + /** + * Set [column] to [value]. + * + * @param column The column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun set(column: TableColumn, value: T): Unit = set(resolve(column), value) + + /** + * Set [column] to boolean [value]. + * + * @param column The column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(column: TableColumn, value: Boolean): Unit = setBoolean(resolve(column), value) + + /** + * Set [column] to integer [value]. + * + * @param column The column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(column: TableColumn, value: Int): Unit = setInt(resolve(column), value) + + /** + * Set [column] to long [value]. + * + * @param column The column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(column: TableColumn, value: Long): Unit = setLong(resolve(column), value) + + /** + * Set [column] to double [value]. + * + * @param column The column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(column: TableColumn, value: Double): Unit = setDouble(resolve(column), value) + + /** + * Flush any buffered content to the underlying target. + */ + public fun flush() + + /** + * Close the writer so that no more rows can be written. + */ + public override fun close() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 6d0014cb..64e8f272 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -51,21 +51,45 @@ public interface Trace { * Open a [Trace] at the specified [path] in the given [format]. * * @param path The path to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ - public fun open(path: File, format: String): Trace { - return open(path.toPath(), format) - } + @JvmStatic + public fun open(path: File, format: String): Trace = open(path.toPath(), format) /** * Open a [Trace] at the specified [path] in the given [format]. * * @param path The [Path] to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ + @JvmStatic public fun open(path: Path, format: String): Trace { val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } return TraceImpl(provider, path) } + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: File, format: String): Trace = create(path.toPath(), format) + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: Path, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + provider.create(path) + return TraceImpl(provider, path) + } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index fd0a0f04..24551edb 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -25,6 +25,7 @@ package org.opendc.trace.internal import org.opendc.trace.Table import org.opendc.trace.TableColumn import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter import java.util.* /** @@ -44,6 +45,8 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) + override fun toString(): String = "Table[name=$name]" override fun hashCode(): Int = Objects.hash(trace, name) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index e04dd948..f2e610db 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -23,6 +23,7 @@ package org.opendc.trace.spi import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter import java.nio.file.Path import java.util.* @@ -35,6 +36,15 @@ public interface TraceFormat { */ public val name: String + /** + * Construct an empty trace at [path]. + * + * @param path The path where to create the empty trace. + * @throws IllegalArgumentException If [path] is invalid. + * @throws UnsupportedOperationException If the table does not support trace creation. + */ + public fun create(path: Path) + /** * Return the name of the tables available in the trace at the specified [path]. * @@ -63,6 +73,17 @@ public interface TraceFormat { */ public fun newReader(path: Path, table: String): TableReader + /** + * Open a [TableWriter] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableWriter] for. + * @throws IllegalArgumentException If [table] does not exist. + * @throws UnsupportedOperationException If the format does not support writing. + * @return A [TableWriter] instance for the table. + */ + public fun newWriter(path: Path, table: String): TableWriter + /** * A helper object for resolving providers. */ -- cgit v1.2.3