summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-api
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 12:04:15 +0200
committerGitHub <noreply@github.com>2021-09-21 12:04:15 +0200
commit322d91db03a7d74a00ec623ce624f979c0b77c03 (patch)
tree73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-trace/opendc-trace-api
parent453c25c4b453fa0af26bebbd8863abfb79218119 (diff)
parent68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (diff)
merge: Add support for trace writing
This pull request extends the trace API to support writing new traces. - Unify columns of different tables - Support column lookup via index - Use index lookup in trace loader - Add property for describing partition keys - Simplify TraceFormat SPI interface - Add support for writing traces **Breaking API Changes** - `TraceFormat` SPI interface has been redesigned.
Diffstat (limited to 'opendc-trace/opendc-trace-api')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt22
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt54
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt14
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt33
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt107
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt151
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt28
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt41
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt55
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt56
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt37
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt54
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt110
13 files changed, 642 insertions, 120 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
index 219002e0..f1977945 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
@@ -29,28 +29,40 @@ import java.time.Instant
* Identifier of the resource.
*/
@JvmField
-public val RESOURCE_ID: TableColumn<String> = stringColumn("resource:id")
+public val RESOURCE_ID: TableColumn<String> = column("resource:id")
+
+/**
+ * The cluster to which the resource belongs.
+ */
+@JvmField
+public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id")
/**
* Start time for the resource.
*/
@JvmField
-public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:start_time", Instant::class.java)
+public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time")
/**
* End time for the resource.
*/
@JvmField
-public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java)
+public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time")
/**
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_CPU_COUNT: TableColumn<Int> = intColumn("resource:cpu_count")
+public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count")
+
+/**
+ * Total CPU capacity of the resource in MHz.
+ */
+@JvmField
+public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity")
/**
* Memory capacity for the resource in KB.
*/
@JvmField
-public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity")
+public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
index b683923b..44762da5 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceStateColumns.kt
@@ -27,103 +27,73 @@ import java.time.Duration
import java.time.Instant
/**
- * Identifier of the resource.
- */
-@JvmField
-public val RESOURCE_STATE_ID: TableColumn<String> = stringColumn("resource_state:id")
-
-/**
- * The cluster to which the resource belongs.
- */
-@JvmField
-public val RESOURCE_STATE_CLUSTER_ID: TableColumn<String> = stringColumn("resource_state:cluster_id")
-
-/**
* Timestamp for the state.
*/
@JvmField
-public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = TableColumn("resource_state:timestamp", Instant::class.java)
+public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp")
/**
* Duration for the state.
*/
@JvmField
-public val RESOURCE_STATE_DURATION: TableColumn<Duration> = TableColumn("resource_state:duration", Duration::class.java)
+public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration")
/**
* A flag to indicate that the resource is powered on.
*/
@JvmField
-public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = booleanColumn("resource_state:powered_on")
-
-/**
- * Number of CPUs for the resource.
- */
-@JvmField
-public val RESOURCE_STATE_CPU_COUNT: TableColumn<Int> = intColumn("resource_state:cpu_count")
-
-/**
- * Total CPU capacity of the resource in MHz.
- */
-@JvmField
-public val RESOURCE_STATE_CPU_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:cpu_capacity")
+public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on")
/**
* Total CPU usage of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = doubleColumn("resource_state:cpu_usage")
+public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage")
/**
* Total CPU usage of the resource in percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_usage_pct")
+public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct")
/**
* Total CPU demand of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = doubleColumn("resource_state:cpu_demand")
+public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand")
/**
* CPU ready percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = doubleColumn("resource_state:cpu_ready_pct")
-
-/**
- * Memory capacity of the resource in KB.
- */
-@JvmField
-public val RESOURCE_STATE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource_state:mem_capacity")
+public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct")
/**
* Memory usage of the resource in KB.
*/
@JvmField
-public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = doubleColumn("resource_state:mem_usage")
+public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage")
/**
* Disk read throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = doubleColumn("resource_state:disk_read")
+public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read")
/**
* Disk write throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = doubleColumn("resource_state:disk_write")
+public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write")
/**
* Network receive throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_RX: TableColumn<Double> = doubleColumn("resource_state:net_rx")
+public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx")
/**
* Network transmit throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_TX: TableColumn<Double> = doubleColumn("resource_state:net_tx")
+public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index 6aca2051..b0181cbc 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -32,14 +32,14 @@ public interface Table {
public val name: String
/**
- * A flag to indicate that the table is synthetic (derived from another table).
+ * The list of columns supported in this table.
*/
- public val isSynthetic: Boolean
+ public val columns: List<TableColumn<*>>
/**
- * The list of columns supported in this table.
+ * The columns by which the table is partitioned.
*/
- public val columns: List<TableColumn<*>>
+ public val partitionKeys: List<TableColumn<*>>
/**
* Open a [TableReader] for this table.
@@ -47,7 +47,9 @@ public interface Table {
public fun newReader(): TableReader
/**
- * Open a [TableReader] for [partition] of the table.
+ * Open a [TableWriter] for this table.
+ *
+ * @throws UnsupportedOperationException if writing is not supported by the table.
*/
- public fun newReader(partition: String): TableReader
+ public fun newWriter(): TableWriter
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
index 64920498..31a58360 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumns.kt
@@ -24,36 +24,11 @@
package org.opendc.trace
/**
- * Construct a [TableColumn] with [Any] type.
+ * Construct a [TableColumn] with the specified [name] and type [T].
*/
-public fun objectColumn(name: String): TableColumn<Any> = TableColumn(name, Any::class.java)
+public inline fun <reified T> column(name: String): TableColumn<T> = column(name, T::class.java)
/**
- * Construct a [TableColumn] with a [String] type.
+ * Construct a [TableColumn] with the specified [name] and [type].
*/
-public fun stringColumn(name: String): TableColumn<String> = TableColumn(name, String::class.java)
-
-/**
- * Construct a [TableColumn] with a [Number] type.
- */
-public fun numberColumn(name: String): TableColumn<Number> = TableColumn(name, Number::class.java)
-
-/**
- * Construct a [TableColumn] with an [Int] type.
- */
-public fun intColumn(name: String): TableColumn<Int> = TableColumn(name, Int::class.java)
-
-/**
- * Construct a [TableColumn] with a [Long] type.
- */
-public fun longColumn(name: String): TableColumn<Long> = TableColumn(name, Long::class.java)
-
-/**
- * Construct a [TableColumn] with a [Double] type.
- */
-public fun doubleColumn(name: String): TableColumn<Double> = TableColumn(name, Double::class.java)
-
-/**
- * Construct a [TableColumn] with a [Boolean] type.
- */
-public fun booleanColumn(name: String): TableColumn<Boolean> = TableColumn(name, Boolean::class.java)
+public fun <T> column(name: String, type: Class<T>): TableColumn<T> = TableColumn(name, type)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
index b5e7669f..8a796e6c 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
@@ -34,34 +34,129 @@ public interface TableReader : AutoCloseable {
public fun nextRow(): Boolean
/**
+ * Resolve the index of the specified [column] for this reader.
+ *
+ * @param column The column to lookup.
+ * @return The zero-based index of the column or a negative value if the column is not present in this table.
+ */
+ public fun resolve(column: TableColumn<*>): Int
+
+ /**
* Determine whether the [TableReader] supports the specified [column].
*/
- public fun hasColumn(column: TableColumn<*>): Boolean
+ public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+
+ /**
+ * Determine whether the specified [column] has a `null` value for the current row.
+ *
+ * @param index The zero-based index of the column to check for a null value.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
+ */
+ public fun isNull(index: Int): Boolean
+
+ /**
+ * Obtain the object value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The object value of the column.
+ */
+ public fun get(index: Int): Any?
+
+ /**
+ * Obtain the boolean value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The boolean value of the column or `false` if the column is `null`.
+ */
+ public fun getBoolean(index: Int): Boolean
+
+ /**
+ * Obtain the integer value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The integer value of the column or `0` if the column is `null`.
+ */
+ public fun getInt(index: Int): Int
+
+ /**
+ * Obtain the double value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The long value of the column or `0` if the column is `null`.
+ */
+ public fun getLong(index: Int): Long
+
+ /**
+ * Obtain the double value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The double value of the column or [Double.NaN] if the column is `null`.
+ */
+ public fun getDouble(index: Int): Double
+
+ /**
+ * Determine whether the specified [column] has a `null` value for the current row.
+ *
+ * @param column The column to lookup.
+ * @throws IllegalArgumentException if the column is not valid for this table.
+ * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
+ */
+ public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column))
/**
* Obtain the value of the current column with type [T].
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The object value of the column.
*/
- public fun <T> get(column: TableColumn<T>): T
+ public fun <T> get(column: TableColumn<T>): T {
+ // This cast should always succeed since the resolve the index of the typed column
+ @Suppress("UNCHECKED_CAST")
+ return get(resolve(column)) as T
+ }
/**
* Read the specified [column] as boolean.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The boolean value of the column or `false` if the column is `null`.
*/
- public fun getBoolean(column: TableColumn<Boolean>): Boolean
+ public fun getBoolean(column: TableColumn<Boolean>): Boolean = getBoolean(resolve(column))
/**
* Read the specified [column] as integer.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The integer value of the column or `0` if the column is `null`.
*/
- public fun getInt(column: TableColumn<Int>): Int
+ public fun getInt(column: TableColumn<Int>): Int = getInt(resolve(column))
/**
* Read the specified [column] as long.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The long value of the column or `0` if the column is `null`.
*/
- public fun getLong(column: TableColumn<Long>): Long
+ public fun getLong(column: TableColumn<Long>): Long = getLong(resolve(column))
/**
* Read the specified [column] as double.
+ *
+ * @param column The column to obtain the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The double value of the column or [Double.NaN] if the column is `null`.
*/
- public fun getDouble(column: TableColumn<Double>): Double
+ public fun getDouble(column: TableColumn<Double>): Double = getDouble(resolve(column))
/**
* Closes the reader so that no further iteration or data access can be made.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
new file mode 100644
index 00000000..423ce86a
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+/**
+ * Base class for writing workload traces.
+ */
+public interface TableWriter : AutoCloseable {
+ /**
+ * Start a new row in the table.
+ */
+ public fun startRow()
+
+ /**
+ * Flush the current row to the table.
+ */
+ public fun endRow()
+
+ /**
+ * Resolve the index of the specified [column] for this writer.
+ *
+ * @param column The column to lookup.
+ * @return The zero-based index of the column or a negative value if the column is not present in this table.
+ */
+ public fun resolve(column: TableColumn<*>): Int
+
+ /**
+ * Determine whether the [TableReader] supports the specified [column].
+ */
+ public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+
+ /**
+ * Set [column] to [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun set(index: Int, value: Any)
+
+ /**
+ * Set [column] to boolean [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The boolean value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setBoolean(index: Int, value: Boolean)
+
+ /**
+ * Set [column] to integer [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The integer value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInt(index: Int, value: Int)
+
+ /**
+ * Set [column] to long [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The long value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setLong(index: Int, value: Long)
+
+ /**
+ * Set [column] to double [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The double value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setDouble(index: Int, value: Double)
+
+ /**
+ * Set [column] to [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value)
+
+ /**
+ * Set [column] to boolean [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The boolean value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value)
+
+ /**
+ * Set [column] to integer [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The integer value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value)
+
+ /**
+ * Set [column] to long [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The long value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value)
+
+ /**
+ * Set [column] to double [value].
+ *
+ * @param column The column to set the value for.
+ * @param value The double value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value)
+
+ /**
+ * Flush any buffered content to the underlying target.
+ */
+ public fun flush()
+
+ /**
+ * Close the writer so that no more rows can be written.
+ */
+ public override fun close()
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
index 46920dce..d103bce4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
@@ -30,72 +30,70 @@ import java.time.Instant
* A column containing the task identifier.
*/
@JvmField
-public val TASK_ID: TableColumn<String> = stringColumn("task:id")
+public val TASK_ID: TableColumn<String> = column("task:id")
/**
* A column containing the identifier of the workflow.
*/
@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id")
+public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id")
/**
- * A column containing the submit time of the task.
+ * A column containing the submission time of the task.
*/
@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java)
+public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time")
/**
* A column containing the wait time of the task.
*/
@JvmField
-public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java)
+public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time")
/**
* A column containing the runtime time of the task.
*/
@JvmField
-public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java)
+public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime")
/**
* A column containing the parents of a task.
*/
-@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>)
+public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents")
/**
* A column containing the children of a task.
*/
-@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>)
+public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children")
/**
* A column containing the requested CPUs of a task.
*/
@JvmField
-public val TASK_REQ_NCPUS: TableColumn<Int> = intColumn("task:req_ncpus")
+public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus")
/**
* A column containing the allocated CPUs of a task.
*/
@JvmField
-public val TASK_ALLOC_NCPUS: TableColumn<Int> = intColumn("task:alloc_ncpus")
+public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus")
/**
* A column containing the status of a task.
*/
@JvmField
-public val TASK_STATUS: TableColumn<Int> = intColumn("task:status")
+public val TASK_STATUS: TableColumn<Int> = column("task:status")
/**
* A column containing the group id of a task.
*/
@JvmField
-public val TASK_GROUP_ID: TableColumn<Int> = intColumn("task:group_id")
+public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id")
/**
* A column containing the user id of a task.
*/
@JvmField
-public val TASK_USER_ID: TableColumn<Int> = intColumn("task:user_id")
+public val TASK_USER_ID: TableColumn<Int> = column("task:user_id")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
index 0ae45e86..64e8f272 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
@@ -22,9 +22,9 @@
package org.opendc.trace
+import org.opendc.trace.internal.TraceImpl
import org.opendc.trace.spi.TraceFormat
import java.io.File
-import java.net.URL
import java.nio.file.Path
/**
@@ -48,31 +48,48 @@ public interface Trace {
public companion object {
/**
- * Open a [Trace] at the specified [url] in the given [format].
+ * Open a [Trace] at the specified [path] in the given [format].
*
+ * @param path The path to the trace.
+ * @param format The format of the trace to open.
* @throws IllegalArgumentException if [format] is not supported.
*/
- public fun open(url: URL, format: String): Trace {
- val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
- return provider.open(url)
- }
+ @JvmStatic
+ public fun open(path: File, format: String): Trace = open(path.toPath(), format)
/**
* Open a [Trace] at the specified [path] in the given [format].
*
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to open.
* @throws IllegalArgumentException if [format] is not supported.
*/
- public fun open(path: File, format: String): Trace {
- return open(path.toURI().toURL(), format)
+ @JvmStatic
+ public fun open(path: Path, format: String): Trace {
+ val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
+ return TraceImpl(provider, path)
}
/**
- * Open a [Trace] at the specified [path] in the given [format].
+ * Create a [Trace] at the specified [path] in the given [format].
*
- * @throws IllegalArgumentException if [format] is not supported.
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to create.
*/
- public fun open(path: Path, format: String): Trace {
- return open(path.toUri().toURL(), format)
+ @JvmStatic
+ public fun create(path: File, format: String): Trace = create(path.toPath(), format)
+
+ /**
+ * Create a [Trace] at the specified [path] in the given [format].
+ *
+ * @param path The [Path] to the trace.
+ * @param format The format of the trace to create.
+ */
+ @JvmStatic
+ public fun create(path: Path, format: String): Trace {
+ val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
+ provider.create(path)
+ return TraceImpl(provider, path)
}
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
new file mode 100644
index 00000000..24551edb
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.internal
+
+import org.opendc.trace.Table
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import java.util.*
+
+/**
+ * Internal implementation of [Table].
+ */
+internal class TableImpl(val trace: TraceImpl, override val name: String) : Table {
+ /**
+ * The details of this table.
+ */
+ private val details = trace.format.getDetails(trace.path, name)
+
+ override val columns: List<TableColumn<*>>
+ get() = details.columns
+
+ override val partitionKeys: List<TableColumn<*>>
+ get() = details.partitionKeys
+
+ override fun newReader(): TableReader = trace.format.newReader(trace.path, name)
+
+ override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name)
+
+ override fun toString(): String = "Table[name=$name]"
+
+ override fun hashCode(): Int = Objects.hash(trace, name)
+
+ override fun equals(other: Any?): Boolean = other is TableImpl && trace == other.trace && name == other.name
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt
new file mode 100644
index 00000000..fd9536ab
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TraceImpl.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.internal
+
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import org.opendc.trace.spi.TraceFormat
+import java.nio.file.Path
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Internal implementation of the [Trace] interface.
+ */
+internal class TraceImpl(val format: TraceFormat, val path: Path) : Trace {
+ /**
+ * A map containing the [TableImpl] instances associated with the trace.
+ */
+ private val tableMap = ConcurrentHashMap<String, TableImpl>()
+
+ override val tables: List<String> = format.getTables(path)
+
+ init {
+ for (table in tables) {
+ tableMap.computeIfAbsent(table) { TableImpl(this, it) }
+ }
+ }
+
+ override fun containsTable(name: String): Boolean = tableMap.containsKey(name)
+
+ override fun getTable(name: String): Table? = tableMap[name]
+
+ override fun hashCode(): Int = Objects.hash(format, path)
+
+ override fun equals(other: Any?): Boolean = other is TraceImpl && format == other.format && path == other.path
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt
new file mode 100644
index 00000000..1a9b9ee1
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.spi
+
+import org.opendc.trace.Table
+import org.opendc.trace.TableColumn
+
+/**
+ * A class used by the [TraceFormat] interface for describing the metadata of a [Table].
+ *
+ * @param columns The available columns in the table.
+ * @param partitionKeys The table columns that act as partition keys for the table.
+ */
+public data class TableDetails(
+ val columns: List<TableColumn<*>>,
+ val partitionKeys: List<TableColumn<*>> = emptyList()
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index 54029fcf..f2e610db 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -22,8 +22,9 @@
package org.opendc.trace.spi
-import org.opendc.trace.Trace
-import java.net.URL
+import org.opendc.trace.TableReader
+import org.opendc.trace.TableWriter
+import java.nio.file.Path
import java.util.*
/**
@@ -36,11 +37,52 @@ public interface TraceFormat {
public val name: String
/**
- * Open a new [Trace] with this provider.
+ * Construct an empty trace at [path].
*
- * @param url A reference to the trace.
+ * @param path The path where to create the empty trace.
+ * @throws IllegalArgumentException If [path] is invalid.
+ * @throws UnsupportedOperationException If the table does not support trace creation.
*/
- public fun open(url: URL): Trace
+ public fun create(path: Path)
+
+ /**
+ * Return the name of the tables available in the trace at the specified [path].
+ *
+ * @param path The path to the trace.
+ * @return The list of tables available in the trace.
+ */
+ public fun getTables(path: Path): List<String>
+
+ /**
+ * Return the details of [table] in the trace at the specified [path].
+ *
+ * @param path The path to the trace.
+ * @param table The name of the table to obtain the details for.
+ * @throws IllegalArgumentException If [table] does not exist.
+ * @return The [TableDetails] for the specified [table].
+ */
+ public fun getDetails(path: Path, table: String): TableDetails
+
+ /**
+ * Open a [TableReader] for the specified [table].
+ *
+ * @param path The path to the trace to open.
+ * @param table The name of the table to open a [TableReader] for.
+ * @throws IllegalArgumentException If [table] does not exist.
+ * @return A [TableReader] instance for the table.
+ */
+ public fun newReader(path: Path, table: String): TableReader
+
+ /**
+ * Open a [TableWriter] for the specified [table].
+ *
+ * @param path The path to the trace to open.
+ * @param table The name of the table to open a [TableWriter] for.
+ * @throws IllegalArgumentException If [table] does not exist.
+ * @throws UnsupportedOperationException If the format does not support writing.
+ * @return A [TableWriter] instance for the table.
+ */
+ public fun newWriter(path: Path, table: String): TableWriter
/**
* A helper object for resolving providers.
@@ -49,6 +91,7 @@ public interface TraceFormat {
/**
* A list of [TraceFormat] that are available on this system.
*/
+ @JvmStatic
public val installedProviders: List<TraceFormat> by lazy {
val loader = ServiceLoader.load(TraceFormat::class.java)
loader.toList()
@@ -57,6 +100,7 @@ public interface TraceFormat {
/**
* Obtain a [TraceFormat] implementation by [name].
*/
+ @JvmStatic
public fun byName(name: String): TraceFormat? = installedProviders.find { it.name == name }
}
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
new file mode 100644
index 00000000..dafc0798
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util
+
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+
+/**
+ * A helper class to chain multiple [TableReader]s.
+ */
+public abstract class CompositeTableReader : TableReader {
+ /**
+ * A flag to indicate that the reader has starting, meaning the user called [nextRow] at least once
+ * (and in turn [nextReader]).
+ */
+ private var hasStarted = false
+
+ /**
+ * The active [TableReader] instance.
+ */
+ private var delegate: TableReader? = null
+
+ /**
+ * Obtain the next [TableReader] instance to read from or `null` if there are no more readers to read from.
+ */
+ protected abstract fun nextReader(): TableReader?
+
+ override fun nextRow(): Boolean {
+ if (!hasStarted) {
+ assert(delegate == null) { "Duplicate initialization" }
+ delegate = nextReader()
+ hasStarted = true
+ }
+
+ var delegate = delegate
+
+ while (delegate != null) {
+ if (delegate.nextRow()) {
+ break
+ }
+
+ delegate.close()
+ delegate = nextReader()
+ this.delegate = delegate
+ }
+
+ return delegate != null
+ }
+
+ override fun resolve(column: TableColumn<*>): Int {
+ val delegate = delegate
+ return delegate?.resolve(column) ?: -1
+ }
+
+ override fun isNull(index: Int): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.isNull(index)
+ }
+
+ override fun get(index: Int): Any? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.get(index)
+ }
+
+ override fun getBoolean(index: Int): Boolean {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getBoolean(index)
+ }
+
+ override fun getInt(index: Int): Int {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInt(index)
+ }
+
+ override fun getLong(index: Int): Long {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getLong(index)
+ }
+
+ override fun getDouble(index: Int): Double {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDouble(index)
+ }
+
+ override fun close() {
+ delegate?.close()
+ }
+
+ override fun toString(): String = "CompositeTableReader"
+}