diff options
Diffstat (limited to 'opendc-trace/opendc-trace-api')
13 files changed, 642 insertions, 120 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 219002e0..f1977945 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -29,28 +29,40 @@ import java.time.Instant * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn<String> = stringColumn("resource:id") +public val RESOURCE_ID: TableColumn<String> = column("resource:id") + +/** + * The cluster to which the resource belongs. + */ +@JvmField +public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id") /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:start_time", Instant::class.java) +public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time") /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time") /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count") +public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count") + +/** + * Total CPU capacity of the resource in MHz. + */ +@JvmField +public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity") /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity") +public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt index b683923b..44762da5 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt @@ -27,103 +27,73 @@ import java.time.Duration import java.time.Instant /** - * Identifier of the resource. - */ -@JvmField -public val RESOURCE_STATE_ID: TableColumn<String> = stringColumn("resource_state:id") - -/** - * The cluster to which the resource belongs. - */ -@JvmField -public val RESOURCE_STATE_CLUSTER_ID: TableColumn<String> = stringColumn("resource_state:cluster_id") - -/** * Timestamp for the state. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = TableColumn("resource_state:timestamp", Instant::class.java) +public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp") /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn<Duration> = TableColumn("resource_state:duration", Duration::class.java) +public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration") /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("resource_state:powered_on") - -/** - * Number of CPUs for the resource. - */ -@JvmField -public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count") - -/** - * Total CPU capacity of the resource in MHz. - */ -@JvmField -public val RESOURCE_STATE_CPU_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:cpu_capacity") +public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on") /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = doubleColumn("resource_state:cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage") /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct") /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = doubleColumn("resource_state:cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand") /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_ready_pct") - -/** - * Memory capacity of the resource in KB. - */ -@JvmField -public val RESOURCE_STATE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:mem_capacity") +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct") /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = doubleColumn("resource_state:mem_usage") +public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage") /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = doubleColumn("resource_state:disk_read") +public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read") /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = doubleColumn("resource_state:disk_write") +public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write") /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn<Double> = doubleColumn("resource_state:net_rx") +public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx") /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn<Double> = doubleColumn("resource_state:net_tx") +public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 6aca2051..b0181cbc 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -32,14 +32,14 @@ public interface Table { public val name: String /** - * A flag to indicate that the table is synthetic (derived from another table). + * The list of columns supported in this table. */ - public val isSynthetic: Boolean + public val columns: List<TableColumn<*>> /** - * The list of columns supported in this table. + * The columns by which the table is partitioned. */ - public val columns: List<TableColumn<*>> + public val partitionKeys: List<TableColumn<*>> /** * Open a [TableReader] for this table. @@ -47,7 +47,9 @@ public interface Table { public fun newReader(): TableReader /** - * Open a [TableReader] for [partition] of the table. + * Open a [TableWriter] for this table. + * + * @throws UnsupportedOperationException if writing is not supported by the table. */ - public fun newReader(partition: String): TableReader + public fun newWriter(): TableWriter } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt index 64920498..31a58360 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt @@ -24,36 +24,11 @@ package org.opendc.trace /** - * Construct a [TableColumn] with [Any] type. + * Construct a [TableColumn] with the specified [name] and type [T]. */ -public fun objectColumn(name: String): TableColumn<Any> = TableColumn(name, Any::class.java) +public inline fun <reified T> column(name: String): TableColumn<T> = column(name, T::class.java) /** - * Construct a [TableColumn] with a [String] type. + * Construct a [TableColumn] with the specified [name] and [type]. */ -public fun stringColumn(name: String): TableColumn<String> = TableColumn(name, String::class.java) - -/** - * Construct a [TableColumn] with a [Number] type. - */ -public fun numberColumn(name: String): TableColumn<Number> = TableColumn(name, Number::class.java) - -/** - * Construct a [TableColumn] with an [Int] type. - */ -public fun intColumn(name: String): TableColumn<Int> = TableColumn(name, Int::class.java) - -/** - * Construct a [TableColumn] with a [Long] type. - */ -public fun longColumn(name: String): TableColumn<Long> = TableColumn(name, Long::class.java) - -/** - * Construct a [TableColumn] with a [Double] type. - */ -public fun doubleColumn(name: String): TableColumn<Double> = TableColumn(name, Double::class.java) - -/** - * Construct a [TableColumn] with a [Boolean] type. - */ -public fun booleanColumn(name: String): TableColumn<Boolean> = TableColumn(name, Boolean::class.java) +public fun <T> column(name: String, type: Class<T>): TableColumn<T> = TableColumn(name, type) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt index b5e7669f..8a796e6c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -34,34 +34,129 @@ public interface TableReader : AutoCloseable { public fun nextRow(): Boolean /** + * Resolve the index of the specified [column] for this reader. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + + /** * Determine whether the [TableReader] supports the specified [column]. */ - public fun hasColumn(column: TableColumn<*>): Boolean + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param index The zero-based index of the column to check for a null value. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(index: Int): Boolean + + /** + * Obtain the object value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The object value of the column. + */ + public fun get(index: Int): Any? + + /** + * Obtain the boolean value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. + */ + public fun getBoolean(index: Int): Boolean + + /** + * Obtain the integer value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. + */ + public fun getInt(index: Int): Int + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. + */ + public fun getLong(index: Int): Long + + /** + * Obtain the double value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. + */ + public fun getDouble(index: Int): Double + + /** + * Determine whether the specified [column] has a `null` value for the current row. + * + * @param column The column to lookup. + * @throws IllegalArgumentException if the column is not valid for this table. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + */ + public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column)) /** * Obtain the value of the current column with type [T]. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The object value of the column. */ - public fun <T> get(column: TableColumn<T>): T + public fun <T> get(column: TableColumn<T>): T { + // This cast should always succeed since the resolve the index of the typed column + @Suppress("UNCHECKED_CAST") + return get(resolve(column)) as T + } /** * Read the specified [column] as boolean. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The boolean value of the column or `false` if the column is `null`. */ - public fun getBoolean(column: TableColumn<Boolean>): Boolean + public fun getBoolean(column: TableColumn<Boolean>): Boolean = getBoolean(resolve(column)) /** * Read the specified [column] as integer. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The integer value of the column or `0` if the column is `null`. */ - public fun getInt(column: TableColumn<Int>): Int + public fun getInt(column: TableColumn<Int>): Int = getInt(resolve(column)) /** * Read the specified [column] as long. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The long value of the column or `0` if the column is `null`. */ - public fun getLong(column: TableColumn<Long>): Long + public fun getLong(column: TableColumn<Long>): Long = getLong(resolve(column)) /** * Read the specified [column] as double. + * + * @param column The column to obtain the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The double value of the column or [Double.NaN] if the column is `null`. */ - public fun getDouble(column: TableColumn<Double>): Double + public fun getDouble(column: TableColumn<Double>): Double = getDouble(resolve(column)) /** * Closes the reader so that no further iteration or data access can be made. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt new file mode 100644 index 00000000..423ce86a --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * Base class for writing workload traces. + */ +public interface TableWriter : AutoCloseable { + /** + * Start a new row in the table. + */ + public fun startRow() + + /** + * Flush the current row to the table. + */ + public fun endRow() + + /** + * Resolve the index of the specified [column] for this writer. + * + * @param column The column to lookup. + * @return The zero-based index of the column or a negative value if the column is not present in this table. + */ + public fun resolve(column: TableColumn<*>): Int + + /** + * Determine whether the [TableReader] supports the specified [column]. + */ + public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + + /** + * Set [column] to [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun set(index: Int, value: Any) + + /** + * Set [column] to boolean [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(index: Int, value: Boolean) + + /** + * Set [column] to integer [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(index: Int, value: Int) + + /** + * Set [column] to long [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(index: Int, value: Long) + + /** + * Set [column] to double [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(index: Int, value: Double) + + /** + * Set [column] to [value]. + * + * @param column The column to set the value for. + * @param value The value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value) + + /** + * Set [column] to boolean [value]. + * + * @param column The column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value) + + /** + * Set [column] to integer [value]. + * + * @param column The column to set the value for. + * @param value The integer value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value) + + /** + * Set [column] to long [value]. + * + * @param column The column to set the value for. + * @param value The long value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value) + + /** + * Set [column] to double [value]. + * + * @param column The column to set the value for. + * @param value The double value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value) + + /** + * Flush any buffered content to the underlying target. + */ + public fun flush() + + /** + * Close the writer so that no more rows can be written. + */ + public override fun close() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 46920dce..d103bce4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -30,72 +30,70 @@ import java.time.Instant * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn<String> = stringColumn("task:id") +public val TASK_ID: TableColumn<String> = column("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id") /** - * A column containing the submit time of the task. + * A column containing the submission time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java) +public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time") /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java) +public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time") /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java) +public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime") /** * A column containing the parents of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>) +public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents") /** * A column containing the children of a task. */ -@Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>) +public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children") /** * A column containing the requested CPUs of a task. */ @JvmField -public val TASK_REQ_NCPUS: TableColumn<Int> = intColumn("task:req_ncpus") +public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus") /** * A column containing the allocated CPUs of a task. */ @JvmField -public val TASK_ALLOC_NCPUS: TableColumn<Int> = intColumn("task:alloc_ncpus") +public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus") /** * A column containing the status of a task. */ @JvmField -public val TASK_STATUS: TableColumn<Int> = intColumn("task:status") +public val TASK_STATUS: TableColumn<Int> = column("task:status") /** * A column containing the group id of a task. */ @JvmField -public val TASK_GROUP_ID: TableColumn<Int> = intColumn("task:group_id") +public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id") /** * A column containing the user id of a task. */ @JvmField -public val TASK_USER_ID: TableColumn<Int> = intColumn("task:user_id") +public val TASK_USER_ID: TableColumn<Int> = column("task:user_id") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 0ae45e86..64e8f272 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,9 +22,9 @@ package org.opendc.trace +import org.opendc.trace.internal.TraceImpl import org.opendc.trace.spi.TraceFormat import java.io.File -import java.net.URL import java.nio.file.Path /** @@ -48,31 +48,48 @@ public interface Trace { public companion object { /** - * Open a [Trace] at the specified [url] in the given [format]. + * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The path to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ - public fun open(url: URL, format: String): Trace { - val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } - return provider.open(url) - } + @JvmStatic + public fun open(path: File, format: String): Trace = open(path.toPath(), format) /** * Open a [Trace] at the specified [path] in the given [format]. * + * @param path The [Path] to the trace. + * @param format The format of the trace to open. * @throws IllegalArgumentException if [format] is not supported. */ - public fun open(path: File, format: String): Trace { - return open(path.toURI().toURL(), format) + @JvmStatic + public fun open(path: Path, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return TraceImpl(provider, path) } /** - * Open a [Trace] at the specified [path] in the given [format]. + * Create a [Trace] at the specified [path] in the given [format]. * - * @throws IllegalArgumentException if [format] is not supported. + * @param path The [Path] to the trace. + * @param format The format of the trace to create. */ - public fun open(path: Path, format: String): Trace { - return open(path.toUri().toURL(), format) + @JvmStatic + public fun create(path: File, format: String): Trace = create(path.toPath(), format) + + /** + * Create a [Trace] at the specified [path] in the given [format]. + * + * @param path The [Path] to the trace. + * @param format The format of the trace to create. + */ + @JvmStatic + public fun create(path: Path, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + provider.create(path) + return TraceImpl(provider, path) } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt new file mode 100644 index 00000000..24551edb --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.internal + +import org.opendc.trace.Table +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import java.util.* + +/** + * Internal implementation of [Table]. + */ +internal class TableImpl(val trace: TraceImpl, override val name: String) : Table { + /** + * The details of this table. + */ + private val details = trace.format.getDetails(trace.path, name) + + override val columns: List<TableColumn<*>> + get() = details.columns + + override val partitionKeys: List<TableColumn<*>> + get() = details.partitionKeys + + override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + + override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) + + override fun toString(): String = "Table[name=$name]" + + override fun hashCode(): Int = Objects.hash(trace, name) + + override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt new file mode 100644 index 00000000..fd9536ab --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.internal + +import org.opendc.trace.Table +import org.opendc.trace.Trace +import org.opendc.trace.spi.TraceFormat +import java.nio.file.Path +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +/** + * Internal implementation of the [Trace] interface. + */ +internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace { + /** + * A map containing the [TableImpl] instances associated with the trace. + */ + private val tableMap = ConcurrentHashMap<String, TableImpl>() + + override val tables: List<String> = format.getTables(path) + + init { + for (table in tables) { + tableMap.computeIfAbsent(table) { TableImpl(this, it) } + } + } + + override fun containsTable(name: String): Boolean = tableMap.containsKey(name) + + override fun getTable(name: String): Table? = tableMap[name] + + override fun hashCode(): Int = Objects.hash(format, path) + + override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt new file mode 100644 index 00000000..1a9b9ee1 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.spi + +import org.opendc.trace.Table +import org.opendc.trace.TableColumn + +/** + * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. + * + * @param columns The available columns in the table. + * @param partitionKeys The table columns that act as partition keys for the table. + */ +public data class TableDetails( + val columns: List<TableColumn<*>>, + val partitionKeys: List<TableColumn<*>> = emptyList() +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 54029fcf..f2e610db 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,8 +22,9 @@ package org.opendc.trace.spi -import org.opendc.trace.Trace -import java.net.URL +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import java.nio.file.Path import java.util.* /** @@ -36,11 +37,52 @@ public interface TraceFormat { public val name: String /** - * Open a new [Trace] with this provider. + * Construct an empty trace at [path]. * - * @param url A reference to the trace. + * @param path The path where to create the empty trace. + * @throws IllegalArgumentException If [path] is invalid. + * @throws UnsupportedOperationException If the table does not support trace creation. */ - public fun open(url: URL): Trace + public fun create(path: Path) + + /** + * Return the name of the tables available in the trace at the specified [path]. + * + * @param path The path to the trace. + * @return The list of tables available in the trace. + */ + public fun getTables(path: Path): List<String> + + /** + * Return the details of [table] in the trace at the specified [path]. + * + * @param path The path to the trace. + * @param table The name of the table to obtain the details for. + * @throws IllegalArgumentException If [table] does not exist. + * @return The [TableDetails] for the specified [table]. + */ + public fun getDetails(path: Path, table: String): TableDetails + + /** + * Open a [TableReader] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableReader] for. + * @throws IllegalArgumentException If [table] does not exist. + * @return A [TableReader] instance for the table. + */ + public fun newReader(path: Path, table: String): TableReader + + /** + * Open a [TableWriter] for the specified [table]. + * + * @param path The path to the trace to open. + * @param table The name of the table to open a [TableWriter] for. + * @throws IllegalArgumentException If [table] does not exist. + * @throws UnsupportedOperationException If the format does not support writing. + * @return A [TableWriter] instance for the table. + */ + public fun newWriter(path: Path, table: String): TableWriter /** * A helper object for resolving providers. @@ -49,6 +91,7 @@ public interface TraceFormat { /** * A list of [TraceFormat] that are available on this system. */ + @JvmStatic public val installedProviders: List<TraceFormat> by lazy { val loader = ServiceLoader.load(TraceFormat::class.java) loader.toList() @@ -57,6 +100,7 @@ public interface TraceFormat { /** * Obtain a [TraceFormat] implementation by [name]. */ + @JvmStatic public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name } } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt new file mode 100644 index 00000000..dafc0798 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader + +/** + * A helper class to chain multiple [TableReader]s. + */ +public abstract class CompositeTableReader : TableReader { + /** + * A flag to indicate that the reader has starting, meaning the user called [nextRow] at least once + * (and in turn [nextReader]). + */ + private var hasStarted = false + + /** + * The active [TableReader] instance. + */ + private var delegate: TableReader? = null + + /** + * Obtain the next [TableReader] instance to read from or `null` if there are no more readers to read from. + */ + protected abstract fun nextReader(): TableReader? + + override fun nextRow(): Boolean { + if (!hasStarted) { + assert(delegate == null) { "Duplicate initialization" } + delegate = nextReader() + hasStarted = true + } + + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextReader() + this.delegate = delegate + } + + return delegate != null + } + + override fun resolve(column: TableColumn<*>): Int { + val delegate = delegate + return delegate?.resolve(column) ?: -1 + } + + override fun isNull(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.isNull(index) + } + + override fun get(index: Int): Any? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(index) + } + + override fun getBoolean(index: Int): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(index) + } + + override fun getInt(index: Int): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(index) + } + + override fun getLong(index: Int): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(index) + } + + override fun getDouble(index: Int): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(index) + } + + override fun close() { + delegate?.close() + } + + override fun toString(): String = "CompositeTableReader" +} |
